Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
6.1 Configuration Import
Currently, establishing a Flow and Functions requires a series of cumbersome additions each time, which is not very convenient. Next, we can construct the structure of KisFlow and also export the structure of KisFlow to local files by batch reading and writing configuration files. Currently, we will use file format for configuration persistence, but developers can also choose to do persistence with databases or remote configurations in the future.
6.1.1 Creating Configuration Files
First, we create the necessary configuration files for KisFlow business under kis-flow/test/load_conf/
. Within kis-flow/test/load_conf/
, we create three folders: conn/
, flow/
, and func/
to respectively store the configuration information for Connector
, Flow
, and Function
.
├── conn
│ └── conn-ConnName1.yml
├── flow
│ └── flow-FlowName1.yml
└── func
├── func-FuncName1.yml
├── func-FuncName2.yml
└── func-FuncName3.yml
Create some yml
files within these folders. The specific content is as follows:
A. Function
kis-flow/test/load_conf/func/func-FuncNam1.yml
kistype: func
fname: funcName1
fmode: Verify
source:
name: Public account Douyin mall user order data
must:
- order_id
- user_id
kis-flow/test/load_conf/func/func-FuncNam2.yml
kistype: func
fname: funcName2
fmode: Save
source:
name: User order error rate
must:
- order_id
- user_id
option:
cname: ConnName1
kis-flow/test/load_conf/func/func-FuncNam3.yml
kistype: func
fname: funcName3
fmode: Calculate
source:
name: User order error rate
must:
- order_id
- user_id
B. Connector
kis-flow/test/load_conf/func/func-ConnName1.yml
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: null
save:
- funcName2
C. Flow
kis-flow/test/load_conf/func/func-FlowName1.yml
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
- fname: funcName2
- fname: funcName3
6.1.2 Configuration Parsing
Create the kis-flow/file/
directory and the kis-flow/file/config_import.go
file within it.
First, define an interface allConfig that can hold all configurations:
kis-flow/file/config_import.go
type allConfig struct {
Flows map[string]*config.KisFlowConfig
Funcs map[string]*config.KisFuncConfig
Conns map[string]*config.KisConnConfig
}
Where the key serves as the Name field for each module.
Next, define methods for parsing Flow, Function, and Connector configurations. For YAML parsing, we will use the "gopkg.in/yaml.v3" library.
A. Flow Configuration Parsing
kis-flow/file/config_import.go
// kisTypeFlowConfigure parses Flow configuration files in YAML format
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
flow := new(config.KisFlowConfig)
if ok := yaml.Unmarshal(confData, flow); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
}
// Do not load Flow configuration if the status is disabled
if common.KisOnOff(flow.Status) == common.FlowDisable {
return nil
}
if _, ok := all.Flows[flow.FlowName]; ok {
return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName))
}
// Add the configuration to the collection
all.Flows[flow.FlowName] = flow
return nil
}
-
confData
: the file's binary data -
fileName
: the file path -
kistype
: the configuration file type
kisTypeFlowConfigure
will parse the configuration information into the Flows
member of allConfig.
Similarly, the methods for Function
and Connector
parsing are as follows.
B. Function Configuration Parsing
kis-flow/file/config_import.go
// kisTypeFuncConfigure parses Function configuration files in YAML format
func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
function := new(config.KisFuncConfig)
if ok := yaml.Unmarshal(confData, function); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
}
if _, ok := all.Funcs[function.FName]; ok {
return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))
}
// Add the configuration to the collection
all.Funcs[function.FName] = function
return nil
}
C. Connector Configuration Parsing
kis-flow/file/config_import.go
// kisTypeConnConfigure parses Connector configuration files in YAML format
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
conn := new(config.KisConnConfig)
if ok := yaml.Unmarshal(confData, conn); ok != nil {
return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))
}
if _, ok := all.Conns[conn.CName]; ok {
return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName))
}
// Add the configuration to the collection
all.Conns[conn.CName] = conn
return nil
}
6.1.3 Traversing Files
Below is an implementation to traverse all yml
and yaml
type files under a given path loadPath and parse the configuration information into allConfig
according to the kistype category.
kis-flow/file/config_import.go
// parseConfigWalkYaml parses all configuration files in YAML format and stores the configuration information in allConfig
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
all := new(allConfig)
all.Flows = make(map[string]*config.KisFlowConfig)
all.Funcs = make(map[string]*config.KisFuncConfig)
all.Conns = make(map[string]*config.KisConnConfig)
err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
// Validate file extension
if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {
return nil
}
// Read file content
confData, err := ioutil.ReadFile(filePath)
if err != nil {
return err
}
confMap := make(map[string]interface{})
// Validate YAML format
if err := yaml.Unmarshal(confData, confMap); err != nil {
return err
}
// Check if kisType exists
if kisType, ok := confMap["kistype"]; !ok {
return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))
} else {
switch kisType {
case common.KisIdTypeFlow:
return kisTypeFlowConfigure(all, confData, filePath, kisType)
case common.KisIdTypeFunction:
return kisTypeFuncConfigure(all, confData, filePath, kisType)
case common.KisIdTypeConnnector:
return kisTypeConnConfigure(all, confData, filePath, kisType)
default:
return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
}
}
})
if err != nil {
return nil, err
}
return all, nil
}
6.1.4 Import Method
Below is a public method ConfigImportYaml
that imports files from a given root path.
kis-flow/file/config_import.go
// ConfigImportYaml parses all configuration files in YAML format
func ConfigImportYaml(loadPath string) error {
all, err := parseConfigWalkYaml(loadPath)
if err != nil {
return err
}
for flowName, flowConfig := range all.Flows {
// Build a Flow
newFlow := flow.NewKisFlow(flowConfig)
for _, fp := range flowConfig.Flows {
if err := buildFlow(all, fp, newFlow, flowName); err != nil {
return err
}
}
// Add the flow to the FlowPool
kis.Pool().AddFlow(flowName, newFlow)
}
return nil
}
First, it calls parseConfigWalkYaml()
to load all configuration information into memory.
Then, it traverses all Flows one by one to construct them. Finally, the flow is added to the Pool. The specific construction process is as follows:
kis-flow/file/config_import.go
func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {
// Load Functions that the current Flow depends on
if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {
return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))
} else {
// Link Flow to Functions
if funcConfig.Option.CName != "" {
// Load Connectors that the current Function depends on
if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {
return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))
} else {
// Associate Function Configurations with Connector Configurations
_ = funcConfig.AddConnConfig(connConf)
}
}
// Link Flow to Functions
if err := newFlow.Link(funcConfig, fp.Params); err != nil {
return err
}
}
return nil
}
6.2 Configure Import Unit Testing
Create a unit test file kis-flow/test/kis_config_import_test.go.
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestConfigImportYmal(t *testing.T) {
ctx := context.Background()
// 0. Register Function callbacks
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callbacks
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Load configuration file and build Flow
if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. Get Flow
flow1 := kis.Pool().GetFlow("flowName1")
// 3. Commit raw data
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. Execute flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
First, register the business methods. Then load the configuration via ConfigImportYaml, get the flow instance from the pool, submit data, and run it.
Note that the absolute path to the configuration file is provided here.
Navigate to the kis-flow/test/
directory and execute the command:
go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal
The expected result is as follows:
=== RUN TestConfigImportYmal
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}
---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]
KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}
---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2
--- PASS: TestConfigImportYmal (0.01s)
PASS
ok kis-flow/test 0.517s
The expected result is consistent with our expectations. Now we can load and build KisFlow through configuration files.
6.3 Configure Export
6.3.1 Implementation of Export
kis-flow/file/config_export.go
package file
import (
"errors"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"kis-flow/common"
"kis-flow/kis"
)
// ConfigExportYaml exports flow configurations and stores them locally
func ConfigExportYaml(flow kis.Flow, savePath string) error {
if data, err := yaml.Marshal(flow.GetConfig()); err != nil {
return err
} else {
// Flow
err := ioutil.WriteFile(savePath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
if err != nil {
return err
}
// Function
for _, fp := range flow.GetConfig().Flows {
fConf := flow.GetFuncConfigByName(fp.FuncName)
if fConf == nil {
return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))
}
if fdata, err := yaml.Marshal(fConf); err != nil {
return err
} else {
if err := ioutil.WriteFile(savePath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
return err
}
}
// Connector
if fConf.Option.CName != "" {
cConf, err := fConf.GetConnConfig()
if err != nil {
return err
}
if cdata, err := yaml.Marshal(cConf); err != nil {
return err
} else {
if err := ioutil.WriteFile(savePath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
return err
}
}
}
}
}
return nil
}
6.3.2 New Interfaces in Flow
kis-flow/kis/flow.go
package kis
import (
"context"
"kis-flow/common"
"kis-flow/config"
)
type Flow interface {
// Run schedules the Flow, sequentially dispatches and executes Functions in the Flow
Run(ctx context.Context) error
// Link connects the Functions in the Flow according to the configuration file
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow submits Flow data to the upcoming Function layer
CommitRow(row interface{}) error
// Input gets the input source data for the currently executing Function of the flow
Input() common.KisRowArr
// GetName gets the name of the Flow
GetName() string
// GetThisFunction gets the currently executing Function of the Flow
GetThisFunction() Function
// GetThisFuncConf gets the configuration of the currently executing Function of the Flow
GetThisFuncConf() *config.KisFuncConfig
// GetConnector gets the Connector of the currently executing Function of the Flow
GetConnector() (Connector, error)
// GetConnConf gets the configuration of the Connector of the currently executing Function of the Flow
GetConnConf() (*config.KisConnConfig, error)
// +++++++++++++++++++++++++++++++
// GetConfig gets the configuration of the current Flow
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName gets the configuration of the current Flow by Function name
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// +++++++++++++++++++++++++++++++
}
Flow's new interfaces are implemented as follows:
kis-flow/flow/kis_flow.go
func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
return flow.Conf
}
// GetFuncConfigByName gets the configuration of the current Flow by Function name
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
if f, ok := flow.Funcs[funcName]; ok {
return f.GetConfig()
} else {
log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)
return nil
}
}
6.3.3 Fix in KisFlow's Funcs
There was a typo here before.
kis-flow/flow/kis_flow.go
// KisFlow is used to pass the context environment for the entire stream computing
type KisFlow struct {
// Basic information
Id string // Flow's distributed instance ID (used by KisFlow internally to distinguish different instances)
Name string // Flow's readable name
Conf *config.KisFlowConfig // Flow configuration strategy
// Function list
Funcs map[string]kis.Function // All managed Function objects of the current flow, key: FunctionName
FlowHead kis.Function // Function list head owned by the current Flow
FlowTail kis.Function // Function list tail owned by the current Flow
flock sync.RWMutex // Lock for managing linked list insertion and reading and writing
ThisFunction kis.Function // Currently executing KisFunction object of the Flow
ThisFunctionId string // ID of the currently executing Function
PrevFunctionId string // ID of the Function above the currently executing Function
// Function list parameters
funcParams map[string]config.FParam // Custom fixed configuration parameters of the flow in the current Function, Key: KisID of the function instance, value: FParam
fplock sync.RWMutex // Lock for managing funcParams reading and writing
// Data
buffer common.KisRowArr // Internal buffer used to temporarily store input byte data, one data is interface{}, multiple data is []interface{}, i.e., KisBatch
data common.KisDataMap // Data sources at various levels of stream computing
inPut common.KisRowArr // Input data for the current Function calculation
}
The Funcs
member here, its key meaning, was previously defined as KisID, and now it should be corrected to mean FunctionName.
Below is a simple correction to the code where Funcs
member is assigned:
// appendFunc adds Function to Flow, linked list operation
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
// ... ...
// Adds the detailed hash relationship of Function Name to the flow object
flow.Funcs[function.GetConfig().FName] = function
// ... ...
}
6.3.4 Addition in KisPool
kis-flow/kis/pool.go
// GetFlows gets all the Flows
func (pool *kisPool) GetFlows() []Flow {
pool.flowLock.RLock() // Read lock
defer pool.flowLock.RUnlock()
var flows []Flow
for _, flow := range pool.flowRouter {
flows = append(flows, flow)
}
return flows
}
KisPool adds a method to get all Flows to support the export module.
6.4 Configuration Export Unit Testing
Create the kis_config_export_test.go
file in kis-flow/test/
:
package test
import (
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestConfigExportYmal(t *testing.T) {
// 0. Register Function callback business
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. Register ConnectorInit and Connector callback business
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. Load configuration file and build Flow
if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. Export the configured in-memory KisFlow structure to files
flows := kis.Pool().GetFlows()
for _, flow := range flows {
if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export_conf/"); err != nil {
panic(err)
}
}
}
Navigate to kis-flow/test/
and execute:
go test -test.v -test.paniconexit0 -test.run TestConfigExportYmal
This will generate exported configurations under kis-flow/test/export_conf/
:
├── export_conf
│ ├── conn-ConnName1.yaml
│ ├── flow-flowName1.yaml
│ ├── func-funcName1.yaml
│ ├── func-funcName2.yaml
│ └── func-funcName3.yaml
6.5 [V0.5] Source Code
https://github.com/aceld/kis-flow/releases/tag/v0.5
Author: Aceld
GitHub: https://github.com/aceld
KisFlow Open Source Project Address: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki
Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection
Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications
Top comments (0)