Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
7.1 Action Abort
KisFlow Action refers to controlling the flow's scheduling logic while executing a Function. KisFlow provides some Action options for developers to choose from. This section introduces the simplest Action, Abort, which terminates the current Flow.
The final usage of Abort is as follows:
func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call AbortFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next(kis.ActionAbort) // Terminate the Flow
}
AbortFuncHandler()
is a business callback method of a Function, defined by the developer. After the current Function is executed, the normal situation is to continue to the next Function. However, if flow.Next(kis.ActionAbort)
is returned as the current Function's return value, the next Function will not be executed. Instead, the scheduling computation flow of the current Flow is directly terminated.
Let's implement the Abort Action mode of KisFlow below.
7.1.1 Abort Interface Definition
First, let's define the Abort()
interface for the Flow.
kis-flow/kis/flow.go
type Flow interface {
// Run schedules the Flow, sequentially scheduling and executing Functions in the Flow
Run(ctx context.Context) error
// Link connects the Functions in the Flow according to the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits Flow data to the Function layer about to be executed
CommitRow(row interface{}) error
// Input obtains the input source data of the currently executing Function in the Flow
Input() common.KisRowArr
// GetName retrieves the name of the Flow
GetName() string
// GetThisFunction obtains the currently executing Function
GetThisFunction() Function
// GetThisFuncConf obtains the configuration of the currently executing Function
GetThisFuncConf() *config.KisFuncConfig
// GetConnector obtains the Connector of the currently executing Function
GetConnector() (Connector, error)
// GetConnConf obtains the configuration of the Connector of the currently executing Function
GetConnConf() (*config.KisConnConfig, error)
// GetConfig obtains the configuration of the current Flow
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName retrieves the configuration of the specified Function in the Flow
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// --- KisFlow Action ---
// Next proceeds to the next Function in the Flow with the specified Action
Next(acts ...ActionFunc) error
}
Here, an interface Next(acts ...ActionFunc) error
is provided, where the parameter is a variadic parameter of type ActionFunc. This is the method related to Actions that we define for KisFlow. The module for the definition of Action is as follows:
7.1.2 Action Module Definition
Action is a configuration module used to control special behaviors in the Flow execution process through Functions. This includes the Abort behavior mentioned above, which is one of the Actions. The module definition for Action is as follows. Create a file action.go
under kis-flow/kis/
and implement it:
kis-flow/kis/action.go
package kis
// Action represents the Actions to be taken during the execution of KisFlow
type Action struct {
// Abort indicates whether to terminate the Flow execution
Abort bool
}
// ActionFunc is a type for KisFlow Functional Option
type ActionFunc func(ops *Action)
// LoadActions loads Actions and sequentially executes the ActionFunc functions
func LoadActions(acts []ActionFunc) Action {
action := Action{}
if acts == nil {
return action
}
for _, act := range acts {
act(&action)
}
return action
}
// ActionAbort sets the Action to terminate the Flow execution
func ActionAbort(action *Action) {
action.Abort = true
}
First, currently, Action has only one behavior, Abort, which is represented by a boolean type. If true, it indicates that the Flow needs to be terminated.
Next, type ActionFunc func(ops *Action)
is a function type where the parameter is a pointer to an Action{}
. The function func ActionAbort(action *Action)
is a specific instance of this function type. The purpose of the ActionAbort()
method is to set the Abort member of the Action struct to true.
Finally, let's look at the func LoadActions(acts []ActionFunc)
Action method. The parameter is an array of ActionFunc functions. LoadActions()
creates a new Action{}
, then sequentially executes the functions in the []ActionFunc array to modify the members of Action{}
. It finally returns the modified Action{}
to the upper layer.
7.1.3 Implementation of the Next Method
Next, we need to implement this interface for the KisFlow module. First, we need to add an Action{}
member to KisFlow, indicating the action to be taken after each Function execution.
kis-flow/flow/kis_flow.go
// KisFlow provides the context for the entire streaming computation
type KisFlow struct {
// Basic information
Id string // Distributed instance ID of the Flow (used internally in KisFlow to distinguish different instances)
Name string // Readable name of the Flow
Conf *config.KisFlowConfig // Flow configuration strategy
// List of Functions
Funcs map[string]kis.Function // All Function objects managed by the current flow, key: FunctionName
FlowHead kis.Function // Head of the Function list owned by the current Flow
FlowTail kis.Function // Tail of the Function list owned by the current Flow
flock sync.RWMutex // Lock for managing list insertion and reading/writing
ThisFunction kis.Function // The KisFunction object currently being executed by the Flow
ThisFunctionId string // ID of the currently executing Function
PrevFunctionId string // ID of the previous Function executed
// Function list parameters
funcParams map[string]config.FParam // Custom fixed configuration parameters of the current Function in the flow, key: Function instance KisID, value: FParam
fplock sync.RWMutex // Lock for managing reading/writing of funcParams
// Data
buffer common.KisRowArr // Internal buffer for temporarily storing input byte data, a single piece of data is an interface{}, multiple pieces of data are []interface{}, i.e., KisBatch
data common.KisDataMap // Data source for each layer of streaming computation
inPut common.KisRowArr // Input data for the currently executing Function
// +++++++++++++++++++++
// KisFlow Action
action kis.Action // Action to be taken by the current Flow
}
Then implement the Next()
interface for KisFlow as follows:
kis-flow/flow/kis_flow.go
// Next proceeds to the next Function in the Flow with the specified Action
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {
// Load Actions passed by the Function FaaS
flow.action = kis.LoadActions(acts)
return nil
}
Each time a developer executes a custom business callback in a Function, flow.Next()
is called to pass the Action at the end. Therefore, Next(acts ...kis.ActionFunc) error
loads the passed Action properties and saves them in flow.action.
7.1.4 Abort to Control Flow Execution
Now that we have an Abort action to control the Flow, we need to add a member to KisFlow to represent this state.
kis-flow/flow/kis_flow.go
// KisFlow provides the context for the entire streaming computation
type KisFlow struct {
// Basic information
Id string // Distributed instance ID of the Flow (used internally in KisFlow to distinguish different instances)
Name string // Readable name of the Flow
Conf *config.KisFlowConfig // Flow configuration strategy
// List of Functions
Funcs map[string]kis.Function // All Function objects managed by the current flow, key: FunctionName
FlowHead kis.Function // Head of the Function list owned by the current Flow
FlowTail kis.Function // Tail of the Function list owned by the current Flow
flock sync.RWMutex // Lock for managing list insertion and reading/writing
ThisFunction kis.Function // The KisFunction object currently being executed by the Flow
ThisFunctionId string // ID of the currently executing Function
PrevFunctionId string // ID of the previous Function executed
// Function list parameters
funcParams map[string]config.FParam // Custom fixed configuration parameters of the current Function in the flow, key: Function instance KisID, value: FParam
fplock sync.RWMutex // Lock for managing reading/writing of funcParams
// Data
buffer common.KisRowArr // Internal buffer for temporarily storing input byte data, a single piece of data is an interface{}, multiple pieces of data are []interface{}, i.e., KisBatch
data common.KisDataMap // Data source for each layer of streaming computation
inPut common.KisRowArr // Input data for the currently executing Function
action kis.Action // Action to be taken by the current Flow
// +++++++++
abort bool // Indicates whether to abort the Flow
}
Each time the flow.Run()
method is executed, the abort variable needs to be reset. Additionally, the loop scheduling needs to check the flow.abort
status.
kis-flow/flow/kis_flow.go
// Run starts the streaming computation of KisFlow, executing the stream from the starting Function
func (flow *KisFlow) Run(ctx context.Context) error {
// +++++++++
// Reset abort
flow.abort = false // Reset the abort state each time scheduling starts
// ... ...
// ... ...
// Stream chain call
for fn != nil && flow.abort != true { // ++++ Do not enter the next loop if abort is set
// ... ...
// ... ...
if err := fn.Call(ctx, flow); err != nil {
// Error
return err
} else {
// Success
// ... ...
fn = fn.Next()
}
}
return nil
}
When Call()
schedules the custom method of the Function, if return flow.Next(ActionAbort)
is called, it will change the Action state of the Flow, thereby controlling the termination of the Flow execution. Finally, the Abort state of the Action is transferred to the Abort state of KisFlow.
Since we have the Abort state, we can add a condition during the Flow execution. If the current Function does not submit its result data (i.e., flow.buffer
is empty), the Flow will not proceed to the next layer and will directly exit the Run()
call.
kis-flow/flow/kis_flow_data.go
// commitCurData submits the result data of the currently executing Function in the Flow
func (flow *KisFlow) commitCurData(ctx context.Context) error {
// Check if there is result data for the current computation; if not, exit the current Flow Run loop
if (len(flow.buffer) == 0) {
// ++++++++++++
flow.abort = true
return nil
}
// ... ...
// ... ...
return nil
}
7.1.5 Capturing and Handling Actions
Next, we will implement a method specifically for handling Actions. This method will be defined in the kis-flow/flow/kis_flow_action.go
file as follows:
kis-flow/flow/kis_flow_action.go
package flow
import (
"context"
"errors"
"fmt"
"kis-flow/kis"
)
// dealAction handles Actions and decides the subsequent flow direction
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
if err := flow.commitCurData(ctx); err != nil {
return nil, err
}
// Update the previous FunctionId cursor
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
// Abort Action forces termination
if flow.action.Abort {
flow.abort = true
}
// Clear Action
flow.action = kis.Action{}
return fn, nil
}
Next, we'll slightly modify the KisFlow Run() process to incorporate the dealAction() method.
kis-flow/flow/kis_flow.go
// Run starts the streaming computation of KisFlow, executing the stream from the starting Function
func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function
fn = flow.FlowHead
flow.abort = false
if flow.Conf.Status == int(common.FlowDisable) {
// Flow is configured to be disabled
return nil
}
// Since no Function has been executed at this point, PrevFunctionId is FirstVirtual as there is no previous Function
flow.PrevFunctionId = common.FunctionIdFirstVirtual
// Submit the original data stream
if err := flow.commitSrcData(ctx); err != nil {
return err
}
// Stream chain call
for fn != nil && flow.abort == false {
// Flow records the current executing Function
fid := fn.GetId()
flow.ThisFunction = fn
flow.ThisFunctionId = fid
// Get the source data to be processed by the current Function
if inputData, err := flow.getCurData(); err != nil {
log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
return err
} else {
flow.inPut = inputData
}
if err := fn.Call(ctx, flow); err != nil {
// Error
return err
} else {
// Success
// +++++++++++++++++++++++++++++++
fn, err = flow.dealAction(ctx, fn)
if err != nil {
return err
}
// +++++++++++++++++++++++++++++++
}
}
return nil
}
7.1.6 Action Abort Unit Test
First, let's create a Function configuration file as follows:
kis-flow/test/load_conf/func/func-AbortFunc.yml
kistype: func
fname: abortFunc
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
The name of the current Function is abortFunc. Next, we implement its FaaS function as follows:
kis-flow/test/faas/faas_abort.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call AbortFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next(kis.ActionAbort)
}
This Function will eventually call flow.Next(kis.ActionAbort)
to terminate the Flow. Next, we create a Flow that uses the above Function as an intermediate Function to test if it will terminate before executing subsequent Functions. The new flow configuration is as follows:
kis-flow/test/load_conf/flow/flow-FlowName2.yml
kistype: flow
status: 1
flow_name: flowName2
flows:
- fname: funcName1
- fname: abortFunc
- fname: funcName3
The name of the current Flow is flowName2
, which contains three Functions: funcName1
, abortFunc
, and funcName3
. If the abort functionality works correctly, funcName3
should not be executed.
Next, we implement the unit test case.
kis-flow/test/kis_action_test.go
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestActionAbort(t *testing.T) {
ctx := context.Background()
// 0. Register Function callbacks
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // Add abortFunc handler
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callbacks
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Load configuration files and build Flow
if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. Get Flow
flow1 := kis.Pool().GetFlow("flowName2")
// 3. Submit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
The following code registers the initial callbacks. You can also write this code in another file to avoid repeating it each time:
// 0. Register Function callbacks
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // Add abortFunc handler
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callbacks
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
Change to the kis-flow/test/
directory and run the following command:
go test -test.v -test.paniconexit0 -test.run TestActionAbort
The result is as follows:
=== RUN TestActionAbort
Add KisPool FuncName=funcName1
Add KisPool FuncName=abortFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
context.Background
====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>}
---> Call AbortFuncHandler ----
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2
--- PASS: TestActionAbort (0.00s)
PASS
ok kis-flow/test 0.487s
From the result, we can see that after executing the AbortFuncHandler
, it did not continue executing and exited the Flow's Run()
method.
7.2 Action DataReuse (Reuse Upper-Level Data)
The Action DataReuse is designed to reuse data from the previous function, meaning the current function's submitted result will not be used. Instead, the result data from the previous function will be reused for the next function as its data source.
Let's implement the Action DataReuse functionality.
7.2.1 Adding DataReuse Action
Add a DataReuse
member to the Action, which is of boolean type.
kis-flow/kis/action.go
// Action KisFlow execution process Actions
type Action struct {
// +++++++++++++
// DataReuse indicates whether to reuse upper-level function data
DataReuse bool
// Abort terminates the execution of the Flow
Abort bool
}
// ActionDataReuse sets the DataReuse option to true
func ActionDataReuse(act *Action) {
act.DataReuse = true
}
Then provide an Action function named ActionDataReuse
, which sets the DataReuse status to true.
7.2.2 Reusing Upper-Level Data to the Next Layer
Here, we need to implement a method for submitting reused data. The logic is as follows:
kis-flow/flow/kis_flow_data.go
// commitReuseData submits reused data from the previous function
func (flow *KisFlow) commitReuseData(ctx context.Context) error {
// Check if the previous layer has result data, if not, exit the current Flow Run loop
if len(flow.data[flow.PrevFunctionId]) == 0 {
flow.abort = true
return nil
}
// The current layer's result data equals the previous layer's result data (reuse upper-level result data)
flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
// Clear the buffer (if ReuseData is selected, all submitted data will not be carried to the next layer)
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
The logic is simple. Unlike commitCurData()
, which submits flow.buffer
data to flow.data[flow.ThisFunctionId]
, commitReuseData()
submits the previous layer's result data to flow.data[flow.ThisFunctionId]
.
7.2.3 Handling the DataReuse Action
Then, add handling for the DataReuse action in the dealAction()
method:
kis-flow/flow/kis_flow_action.go
// dealAction processes actions and determines the next steps for the Flow
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
// ++++++++++++++++
// DataReuse Action
if flow.action.DataReuse {
if err := flow.commitReuseData(ctx); err != nil {
return nil, err
}
} else {
if err := flow.commitCurData(ctx); err != nil {
return nil, err
}
}
// Update the previous function ID cursor
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
// Abort Action force termination
if flow.action.Abort {
flow.abort = true
}
// Clear the Action
flow.action = kis.Action{}
return fn, nil
}
This captures and processes the DataReuse action, deciding whether to reuse upper-level data or to commit current data based on the action settings.
7.2.4 Unit Testing
Let's proceed with unit testing for DataReuse. First, create a function named dataReuseFunc, and create its configuration file:
kis-flow/test/load_conf/func/func-dataReuseFunc.yml
kistype: func
fname: dataReuseFunc
fmode: Calculate
source:
name: User Order Error Rate
must:
- order_id
- user_id
Also, create a new Flow called flowName3
with the following configuration:
kis-flow/test/load_conf/flow/func-FlowName3.yml
kistype: flow
status: 1
flow_name: flowName3
flows:
- fname: funcName1
- fname: dataReuseFunc
- fname: funcName3
For the logic of the dataReuseFunc
function, here's the implementation:
kis-flow/test/faas/faas_data_reuse.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call DataReuseFuncHandler ----")
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
// Calculate result data
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// Commit result data
_ = flow.CommitRow(resultStr)
}
return flow.Next(kis.ActionDataReuse)
}
Finally, implement the test case:
kis-flow/test/kis_action_test.go
func TestActionDataReuse(t *testing.T) {
ctx := context.Background()
// 0. Register Function callback business
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // Adding dataReuseFunc business
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callback business
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Load configuration files and build Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. Get Flow
flow1 := kis.Pool().GetFlow("flowName3")
// 3. Submit raw data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
Navigate to kis-flow/test/
and execute:
go test -test.v -test.paniconexit0 -test.run TestActionDataReuse
=== RUN TestActionDataReuse
Add KisPool FuncName=funcName1
Add KisPool FuncName=dataReuseFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call DataReuseFuncHandler ----
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2
context.Background
====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2
--- PASS: TestActionDataReuse (0.02s)
PASS
ok kis-flow
-flow/test 0.523s
The test execution output provides valuable insights into the functioning of the DataReuse
feature. Initially, the test sets up various functions and connectors and then loads the configurations. After submitting raw data to flowName3
, the flow is executed.
During execution, the data flows through different functions, starting with funcName1
, then dataReuseFunc
, and finally funcName3
. At each step, we observe the input data and the resulting data transformation.
For funcName1
, we see the original data being processed, and then for dataReuseFunc
, the data from funcName1
is reused. Finally, in funcName3
, we observe the reused data being further processed.
This comprehensive unit test ensures that the DataReuse functionality behaves as expected, effectively passing data from one function to another without loss or corruption.
7.3 Action ForceEntryNext (Forcing Entry to the Next Layer)
7.3.1 ForceEntryNext Action Attribute
In the current KisFlow implementation, if the current Function does not commit any data (results data for this layer), the Flow will not proceed to the next layer of Functions after the current one finishes. However, in some streaming computations, it might be necessary to continue executing downward even if there is no data available. Therefore, we can introduce a ForceEntryNext action to trigger this behavior.
Firstly, we add a ForceEntryNext
attribute to the Action:
kis-flow/kis/action.go
// Action KisFlow execution Actions
type Action struct {
// DataReuse indicates whether to reuse data from the upper layer Function
DataReuse bool
// By default, if the current Function calculates 0 rows of data, subsequent Functions will not execute
// ForceEntryNext overrides the above default rule and forces entry to the next layer of Functions even if there's no data
ForceEntryNext bool
// Abort terminates the execution of the Flow
Abort bool
}
// ActionForceEntryNext sets the ForceEntryNext attribute to true
func ActionForceEntryNext(act *Action) {
act.ForceEntryNext = true
}
We also provide a configuration function ActionForceEntryNext()
to modify this attribute's status.
7.3.2 Capturing the Action
In the dealAction()
method, which captures the Action, we add a check for this status. If set, the flow.abort
status needs to be changed to false
, allowing the flow to continue to the next layer.
kis-flow/flow/kis_flow_action.go
// dealAction processes the Action and determines the next steps of the Flow
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
// DataReuse Action
if flow.action.DataReuse {
if err := flow.commitReuseData(ctx); err != nil {
return nil, err
}
} else {
if err := flow.commitCurData(ctx); err != nil {
return nil, err
}
}
// ++++++++++++++++++++++++++++
// ForceEntryNext Action
if flow.action.ForceEntryNext {
if err := flow.commitVoidData(ctx); err != nil {
return nil, err
}
flow.abort = false
}
// Update the previous FunctionId cursor
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
// Abort Action
if flow.action.Abort {
flow.abort = true
}
// Clear the Action
flow.action = kis.Action{}
return fn, nil
}
Here is a detail: we need to call a commitVoidData()
method, which commits empty data. The reason is that if empty data is not committed, the flow.buffer
remains empty, preventing the commit action. This would result in the key flow.data[flow.ThisFunctionId]
not existing, causing a key-not-found exception and panic when flow.getCurData()
is executed. Therefore, empty data needs to be committed to flow.data[flow.ThisFunctionId]
.
The specific implementation of commitVoidData()
is as follows:
kis-flow/flow/kis_flow_data.go
func (flow *KisFlow) commitVoidData(ctx context.Context) error {
if len(flow.buffer) != 0 {
return nil
}
// Create empty data
batch := make(common.KisRowArr, 0)
// Commit the buffer data to the result data of this layer
flow.data[flow.ThisFunctionId] = batch
log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
7.3.3 Unit Test Without Setting ForceEntryNext
First, create a Function configuration for ·noResultFunc· and implement the corresponding callback business function.
kis-flow/test/load_conf/func/func-NoResultFunc.yml
kistype: func
fname: noResultFunc
fmode: Calculate
source:
name: user_order_error_rate
must:
- order_id
- user_id
kis-flow/test/faas/faas_no_result.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call NoResultFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next()
}
In this Function, at the end, we only call flow.Next()
without passing any Action.
Next, create a new Flow FlowName4 with the following configuration:
kis-flow/test/load_conf/flow-FlowName4.yml
kistype: flow
status: 1
flow_name: flowName4
flows:
- fname: funcName1
- fname: noResultFunc
- fname: funcName3
Finally, we write a unit test case code with noResultFunc
in the middle.
kis-flow/test/kis_action_test.go
func TestActionForceEntry(t *testing.T) {
ctx := context.Background()
// 0. Register Function callback business
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // Add noResultFunc business
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callback business
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Load configuration files and build the Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. Get the Flow
flow1 := kis.Pool().GetFlow("flowName4")
// 3. Submit original data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
Navigate to kis-flow/test/
and execute:
go test -test.v -test.paniconexit0 -test.run TestActionForceEntry
The results are as follows:
=== RUN TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2
--- PASS: TestActionForceEntry (0.02s)
PASS
ok kis-flow/test 0.958s
Because noResultFunc
does not generate any result data, the next Function will not be executed. The execution ends with:
---> Call NoResultFuncHandler ----
7.3.4 Unit Testing with ForceEntryNext
Next, we will add the ForceEntryNext
action. In NoResultFuncHandler()
, we add flow.Next(kis.ActionForceEntryNext)
as shown below:
kis-flow/test/faas/faas_no_result.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call NoResultFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next(kis.ActionForceEntryNext)
}
Navigate to the kis-flow/test/
directory and execute:
go test -test.v -test.paniconexit0 -test.run TestActionForceEntry
The results are as follows:
=== RUN TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode=Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2
context.Background
====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName3Handler ----
--- PASS: TestActionForceEntry (0.01s)
PASS
ok kis-flow/test 0.348s
It is observed that the function in the third layer, funcName3Handler
, is executed, but it has no data.
7.4 Action JumpFunc (Flow Jump)
Next, we will implement the JumpFunc Action. JumpFunc allows jumping to a specified FuncName within the current Flow and continuing execution (provided that the target FuncName exists in the current Flow).
Note: JumpFunc can lead to infinite loops, so use it cautiously in business logic.
7.4.1 Adding JumpFunc to Action
First, add a JumpFunc property to the Action. Note that JumpFunc is not a boolean state but a string representing the specific FunctionName to jump to.
kis-flow/kis/action.go
// Action KisFlow execution flow actions
type Action struct {
// DataReuse indicates whether to reuse data from the previous function
DataReuse bool
// By default, Next() will not execute the subsequent function if the current function's result set is empty.
// ForceEntryNext forces the next function to execute even if the current function's result set is empty.
ForceEntryNext bool
// ++++++++++
// JumpFunc specifies the function to jump to for further execution
JumpFunc string
// Abort terminates the flow execution
Abort bool
}
// ActionJumpFunc returns an ActionFunc function that sets the JumpFunc property in Action
// (Note: Can easily cause flow loops leading to infinite loops)
func ActionJumpFunc(funcName string) ActionFunc {
return func(act *Action) {
act.JumpFunc = funcName
}
}
Then provide a method to modify the JumpFunc configuration ActionJumpFunc()
. Note that this method differs from previous ones as it returns an anonymous function and executes it to modify the JumpFunc property in Action.
7.4.2 Capturing the Action
Next, we capture the JumpFunc action by checking if JumpFunc is an empty string.
kis-flow/flow/kis_flow_action.go
// dealAction processes the Action and determines the next step in the Flow
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
// DataReuse Action
if flow.action.DataReuse {
if err := flow.commitReuseData(ctx); err != nil {
return nil, err
}
} else {
if err := flow.commitCurData(ctx); err != nil {
return nil, err
}
}
// ForceEntryNext Action
if flow.action.ForceEntryNext {
if err := flow.commitVoidData(ctx); err != nil {
return nil, err
}
flow.abort = false
}
// ++++++++++++++++++++++++++++++++
// JumpFunc Action
if flow.action.JumpFunc != "" {
if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
// JumpFunc is not in the flow
return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
}
jumpFunction := flow.Funcs[flow.action.JumpFunc]
// Update the previous function
flow.PrevFunctionId = jumpFunction.GetPrevId()
fn = jumpFunction
// If a jump is set, force the jump
flow.abort = false
// ++++++++++++++++++++++++++++++++
} else {
// Update the previous function ID cursor
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
}
// Abort Action forcibly terminates the flow
if flow.action.Abort {
flow.abort = true
}
// Clear the Action
flow.action = kis.Action{}
return fn, nil
}
If JumpFunc
is set, the next function fn
pointer needs to be updated accordingly. Otherwise, the normal address fn.Next()
is used.
7.4.3 Unit Testing
Next, let's define a function with a jump action configuration as follows:
kis-flow/test/load_conf/func/func-jumpFunc.yml
kistype: func
fname: jumpFunc
fmode: Calculate
source:
name: User Order Error Rate
must:
- order_id
- user_id
And implement the related function business logic as follows:
kis-flow/test/faas/faas_jump.go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call JumpFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next(kis.ActionJumpFunc("funcName1"))
}
Here, flow.Next(kis.ActionJumpFunc("funcName1"))
specifies the jump to the function named funcName1
.
Create a new flow named FlowName5
with the following configuration:
kis-flow/test/load_conf/flow/flow-FlowName5.yml
kistype: flow
status: 1
flow_name: flowName5
flows:
- fname: funcName1
- fname: funcName2
- fname: jumpFunc
Next, implement the unit test case code as follows:
kis-flow/test/kis_action_test.go
func TestActionJumpFunc(t *testing.T) {
ctx := context.Background()
// 0. Register Function callback business logic
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // Add jumpFunc business logic
// 0. Register ConnectorInit and Connector callback business logic
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Load configuration files and build the flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. Get the flow
flow1 := kis.Pool().GetFlow("flowName5")
// 3. Commit raw data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
Change the directory to kis-flow/test/
and execute:
go test -test.v -test.paniconexit0 -test.run TestActionJumpFunc
The result is as follows:
...
...
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId
funcParams
func-4faf8f019f4a4a48b84ef27abfad53d1
func-5800567c4cd842b6b377c2b0c0fd81c2
func-f6ca8010d66744429bf6069c9897a928
fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data
FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]
inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort
action:{DataReuse
ForceEntryNext
JumpFunc: Abort
}}
---> Call JumpFuncHandler ----
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2
KisFunctionV, flow = &{Id
Name
Conf:0xc000028f80 Funcs
funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0
FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId
PrevFunctionId
funcParams
func-4faf8f019f4a4a48b84ef27abfad53d1
func-5800567c4cd842b6b377c2b0c0fd81c2
func-f6ca8010d66744429bf6069c9897a928
fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data
FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]
inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort
action:{DataReuse
ForceEntryNext
JumpFunc: Abort
}}
---> Call funcName1Handler ----
...
...
We observe that the Flow keeps looping, indicating that our JumpFunc Action has taken effect.
7.5 [V0.6] Source Code
You can find the source code for version 0.6 of the project at:
https://github.com/aceld/kis-flow/releases/tag/v0.6
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (1)
666666