DEV Community

Aceld
Aceld

Posted on • Updated on

Case (I) - KisFlow-Golang Stream Real-Time Computing - Quick Start Guide

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications


Download KisFlow Source

$go get github.com/aceld/kis-flow
Enter fullscreen mode Exit fullscreen mode

KisFlow Developer Documentation

  1. KisFlow Quick Start (Using Configuration Files) Source Code Example: kis-flow-usage/2-quick_start_with_config at main · aceld/kis-flow-usage

First, let's create a project with the following file structure:

Project Directory

├── Makefile
├── conf
│   ├── flow-CalStuAvgScore.yml
│   ├── func-AvgStuScore.yml
│   └── func-PrintStuAvgScore.yml
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go
Enter fullscreen mode Exit fullscreen mode

Flow

Define the current Flow. The current Flow is named "CalStuAvgScore", which is a data flow for calculating students' average scores.

Define two Functions. Function1 is Calculate, which is the logic for calculating students' average scores, and Function2 is Expand, which is for printing the final results.

Config

The configuration files for the Flow and Functions are as follows:

(1) Flow Config

conf/flow-CalStuAvgScore.yml

kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
 - fname: AvgStuScore
 - fname: PrintStuAvgScore
Enter fullscreen mode Exit fullscreen mode

(2) Function1 Config

conf/func-AvgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
 name: Student Scores
 must:
 - stu_id
Enter fullscreen mode Exit fullscreen mode

(3) Function2 Config

conf/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
 name: Student Scores
 must:
 - stu_id
Enter fullscreen mode Exit fullscreen mode

Main

Next is the main logic, which is divided into three steps:

  • Load configuration files and get Flow instances.
  • Submit data.
  • Run the Flow.

main.go

package main

import (
    "context"
    "fmt"

    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
)

func main() {
    ctx := context.Background()

    // Load configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    // Get the flow
    flow1 := kis.Pool().GetFlow("CalStuAvgScore")
    if flow1 == nil {
        panic("flow1 is nil")
    }

    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

    // Run the flow
    if err := flow1.Run(ctx); err != nil {
        fmt.Println("err: ", err)
    }

    return
}
Enter fullscreen mode Exit fullscreen mode

Function1

The implementation logic of the first calculation process is as follows. AvgStuScoreIn is the input data type, currently containing three scores, and AvgStuScoreOut is the output data type, which is the average score.

faas_stu_score_avg.go

package main
import (
    "context"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

// AvgStuScore(FaaS) calculates students' average scores
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {
        out := AvgStuScoreOut{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }
        // Submit result data
        _ = flow.CommitRow(out)
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Function2

The logic for printing is to directly print the data as follows.

faas_stu_score_avg_print.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Output

Finally, run the program and get the following results:

Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
Enter fullscreen mode Exit fullscreen mode

2. KisFlow Quick Start (Using Native Interface, Dynamic Configuration)

Source Code Example: kis-flow-usage/1-quick_start at main · aceld/kis-flow-usage

Project Directory

├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go
Enter fullscreen mode Exit fullscreen mode

Flow

Main

main.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/common"
    "github.com/aceld/kis-flow/config"
    "github.com/aceld/kis-flow/flow"
    "github.com/aceld/kis-flow/kis"
)

func main() {
    ctx := context.Background()

    // Create a new flow configuration
    myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)

    // Create new function configuration
    avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
    printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)

    // Create a new flow
    flow1 := flow.NewKisFlow(myFlowConfig1)

    // Link functions to the flow
    _ = flow1.Link(avgStuScoreConfig, nil)
    _ = flow1.Link(printStuScoreConfig, nil)

    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
    // Submit a string
    _ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

    // Run the flow
    if err := flow1.Run(ctx); err != nil {
        fmt.Println("err: ", err)
    }

    return
}

func init() {
    // Register functions
    kis.Pool().FaaS("AvgStuScore", AvgStuScore)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
Enter fullscreen mode Exit fullscreen mode

Function1

faas_stu_score_avg.go

package main

import (
    "context"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn struct {
    serialize.DefaultSerialize
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
    Score3 int `json:"score_3"`
}

type AvgStuScoreOut struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

// AvgStuScore(FaaS) calculates students' average scores
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
    for _, row := range rows {

        out := AvgStuScoreOut{
            StuId:    row.StuId,
            AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
        }

        // Submit result data
        _ = flow.CommitRow(out)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Function2

faas_stu_score_avg_print.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

type PrintStuAvgScoreOut struct {
    serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Output

Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source.
funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source.
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
Enter fullscreen mode Exit fullscreen mode

Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications

Top comments (0)