Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Download KisFlow Source
$go get github.com/aceld/kis-flow
KisFlow Developer Documentation
- KisFlow Quick Start (Using Configuration Files) Source Code Example: kis-flow-usage/2-quick_start_with_config at main · aceld/kis-flow-usage
First, let's create a project with the following file structure:
Project Directory
├── Makefile
├── conf
│ ├── flow-CalStuAvgScore.yml
│ ├── func-AvgStuScore.yml
│ └── func-PrintStuAvgScore.yml
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go
Flow
Define the current Flow. The current Flow is named "CalStuAvgScore", which is a data flow for calculating students' average scores.
Define two Functions. Function1 is Calculate
, which is the logic for calculating students' average scores, and Function2
is Expand
, which is for printing the final results.
Config
The configuration files for the Flow and Functions are as follows:
(1) Flow Config
conf/flow-CalStuAvgScore.yml
kistype: flow
status: 1
flow_name: CalStuAvgScore
flows:
- fname: AvgStuScore
- fname: PrintStuAvgScore
(2) Function1 Config
conf/func-AvgStuScore.yml
kistype: func
fname: AvgStuScore
fmode: Calculate
source:
name: Student Scores
must:
- stu_id
(3) Function2 Config
conf/func-PrintStuAvgScore.yml
kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
name: Student Scores
must:
- stu_id
Main
Next is the main logic, which is divided into three steps:
- Load configuration files and get Flow instances.
- Submit data.
- Run the Flow.
main.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
)
func main() {
ctx := context.Background()
// Load configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore")
if flow1 == nil {
panic("flow1 is nil")
}
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
return
}
Function1
The implementation logic of the first calculation process is as follows. AvgStuScoreIn
is the input data type, currently containing three scores, and AvgStuScoreOut
is the output data type, which is the average score.
faas_stu_score_avg.go
package main
import (
"context"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
// AvgStuScore(FaaS) calculates students' average scores
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
out := AvgStuScoreOut{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// Submit result data
_ = flow.CommitRow(out)
}
return nil
}
Function2
The logic for printing is to directly print the data as follows.
faas_stu_score_avg_print.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
type PrintStuAvgScoreOut struct {
serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return nil
}
Output
Finally, run the program and get the following results:
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
Add FlowRouter FlowName=CalStuAvgScore
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
2. KisFlow Quick Start (Using Native Interface, Dynamic Configuration)
Source Code Example: kis-flow-usage/1-quick_start at main · aceld/kis-flow-usage
Project Directory
├── faas_stu_score_avg.go
├── faas_stu_score_avg_print.go
└── main.go
Flow
Main
main.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/common"
"github.com/aceld/kis-flow/config"
"github.com/aceld/kis-flow/flow"
"github.com/aceld/kis-flow/kis"
)
func main() {
ctx := context.Background()
// Create a new flow configuration
myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable)
// Create new function configuration
avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil)
printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)
// Create a new flow
flow1 := flow.NewKisFlow(myFlowConfig1)
// Link functions to the flow
_ = flow1.Link(avgStuScoreConfig, nil)
_ = flow1.Link(printStuScoreConfig, nil)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`)
// Submit a string
_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)
// Run the flow
if err := flow1.Run(ctx); err != nil {
fmt.Println("err: ", err)
}
return
}
func init() {
// Register functions
kis.Pool().FaaS("AvgStuScore", AvgStuScore)
kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
}
Function1
faas_stu_score_avg.go
package main
import (
"context"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
Score3 int `json:"score_3"`
}
type AvgStuScoreOut struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
// AvgStuScore(FaaS) calculates students' average scores
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
for _, row := range rows {
out := AvgStuScoreOut{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
}
// Submit result data
_ = flow.CommitRow(out)
}
return nil
}
Function2
faas_stu_score_avg_print.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
type PrintStuAvgScoreOut struct {
serialize.DefaultSerialize
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return nil
}
Output
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
funcName NewConfig source is nil, funcName = AvgStuScore, use default unNamed Source.
funcName NewConfig source is nil, funcName = PrintStuAvgScore, use default unNamed Source.
stuid: [101], avg score: [90]
stuid: [102], avg score: [76.66666666666667]
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)