DEV Community

Aceld
Aceld

Posted on • Updated on

(Part 8)Golang Framework Hands-on - Cache/Params Data Caching and Data Parameters

#go

Github: https://github.com/aceld/kis-flow
Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications


8.1 Flow Cache - Data Stream Caching

KisFlow also provides shared caching in stream computing, using a simple local cache for developers to use as needed. For third-party local cache technology dependencies, refer to: https://github.com/patrickmn/go-cache.

8.1.1 go-cache

(1) Installation

go get github.com/patrickmn/go-cache
Enter fullscreen mode Exit fullscreen mode

(2) Usage

import (
    "fmt"
    "github.com/patrickmn/go-cache"
    "time"
)

func main() {
    // Create a cache with a default expiration time of 5 minutes, and which
    // purges expired items every 10 minutes
    c := cache.New(5*time.Minute, 10*time.Minute)

    // Set the value of the key "foo" to "bar", with the default expiration time
    c.Set("foo", "bar", cache.DefaultExpiration)

    // Set the value of the key "baz" to 42, with no expiration time
    // (the item won't be removed until it is re-set, or removed using
    // c.Delete("baz")
    c.Set("baz", 42, cache.NoExpiration)

    // Get the string associated with the key "foo" from the cache
    foo, found := c.Get("foo")
    if found {
        fmt.Println(foo)
    }

    // Since Go is statically typed, and cache values can be anything, type
    // assertion is needed when values are being passed to functions that don't
    // take arbitrary types, (i.e. interface{}). The simplest way to do this for
    // values which will only be used once--e.g. for passing to another
    // function--is:
    foo, found := c.Get("foo")
    if found {
        MyFunction(foo.(string))
    }

    // This gets tedious if the value is used several times in the same function.
    // You might do either of the following instead:
    if x, found := c.Get("foo"); found {
        foo := x.(string)
        // ...
    }
    // or
    var foo string
    if x, found := c.Get("foo"); found {
        foo = x.(string)
    }
    // ...
    // foo can then be passed around freely as a string

    // Want performance? Store pointers!
    c.Set("foo", &MyStruct, cache.DefaultExpiration)
    if x, found := c.Get("foo"); found {
        foo := x.(*MyStruct)
            // ...
    }
}
Enter fullscreen mode Exit fullscreen mode

For detailed reference: https://github.com/patrickmn/go-cache

8.1.2 KisFlow Integration with go-cache

(1) Flow Provides Abstract Interface

Flow provides interfaces for cache operations as follows:

kis-flow/kis/flow.go

type Flow interface {
    // Run schedules the Flow, sequentially scheduling and executing Functions within the Flow
    Run(ctx context.Context) error
    // Link connects Functions within the Flow according to the configuration file
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow submits Flow data to the Function layer about to be executed
    CommitRow(row interface{}) error
    // Input gets the input source data for the currently executing Function in the Flow
    Input() common.KisRowArr
    // GetName gets the name of the Flow
    GetName() string
    // GetThisFunction gets the currently executing Function
    GetThisFunction() Function
    // GetThisFuncConf gets the configuration of the currently executing Function
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector gets the Connector of the currently executing Function
    GetConnector() (Connector, error)
    // GetConnConf gets the configuration of the Connector for the currently executing Function
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig gets the configuration of the current Flow
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName gets the configuration of the Function by its name
    GetFuncConfigByName(funcName string) *config.KisFuncConfig
    // Next advances the currently executing Function to the next Function with specified Action
    Next(acts ...ActionFunc) error

    // ++++++++++++++++++++++++++++++++++++++++
    // GetCacheData gets the cache data of the current Flow
    GetCacheData(key string) interface{}
    // SetCacheData sets the cache data of the current Flow
    SetCacheData(key string, value interface{}, Exp time.Duration)
}
Enter fullscreen mode Exit fullscreen mode

SetCacheData() sets the local cache, with Exp as the expiration time. If Exp is 0, it is permanent.
GetCacheData() reads the local cache.

(2) Providing Constants

Provide some constants related to cache expiration time.

kis-flow/common/const.go

// cache
const (
    // DeFaultFlowCacheCleanUp is the default cache cleanup interval for Flow objects in KisFlow, in minutes
    DeFaultFlowCacheCleanUp = 5 // in minutes
    // DefaultExpiration is the default GoCache time, permanently saved
    DefaultExpiration time.Duration = 0
)
Enter fullscreen mode Exit fullscreen mode

(3) Adding and Initializing Members in KisFlow

kis-flow/flow/kis_flow.go

// KisFlow represents the context environment for stream computing
type KisFlow struct {

    // ... ...
    // ... ...

    // Local cache for the flow
    cache *cache.Cache // Temporary cache context environment for Flow
}

// NewKisFlow creates a new KisFlow
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
    flow := new(KisFlow)

    // ... ...
    // ... ...

    // Initialize local cache
    flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute)

    return flow
}
Enter fullscreen mode Exit fullscreen mode

(4) Implementing the Interface

Finally, implement the two interfaces for cache read and write operations as follows:

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) GetCacheData(key string) interface{} {
    if data, found := flow.cache.Get(key); found {
        return data
    }
    return nil
}

func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
    if Exp == common.DefaultExpiration {
        flow.cache.Set(key, value, cache.DefaultExpiration)
    } else {
        flow.cache.Set(key, value, Exp)
    }
}
Enter fullscreen mode Exit fullscreen mode

8.2 MetaData Temporary Cache Parameters

MetaData is defined as a map[string]interface{} structure available at each level of Flow, Function, and Connector to store temporary data. The lifespan of this data is consistent with the lifespan of each instance.

8.2.1 Adding MetaData to Flow

First, add the metaData map[string]interface{} member and corresponding read-write lock to KisFlow.

kis-flow/flow/kis_flow.go

// KisFlow represents the context environment throughout the entire stream computing
type KisFlow struct {
    // ... ...

    // ... ...

    // +++++++++++++++++++++++++++++++++++++++++++
    // metaData for the flow
    metaData map[string]interface{} // Custom temporary data for Flow
    mLock    sync.RWMutex           // Read-write lock to manage metaData
}
Enter fullscreen mode Exit fullscreen mode

Also, initialize the metaData member in the KisFlow constructor as follows:

kis-flow/flow/kis_flow.go

// NewKisFlow creates a KisFlow
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
    flow := new(KisFlow)

    // ... ...
    // ... ...

    // ++++++++++++++++++++++++++++++++++++++
    // Initialize temporary data
    flow.metaData = make(map[string]interface{})

    return flow
}
Enter fullscreen mode Exit fullscreen mode

Next, add the read and write interfaces for MetaData to the Flow as follows:

kis-flow/kis/flow.go

type Flow interface {
    // Run schedules the Flow, sequentially scheduling and executing Functions within the Flow
    Run(ctx context.Context) error
    // Link connects Functions within the Flow according to the configuration file
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow submits Flow data to the Function layer about to be executed
    CommitRow(row interface{}) error
    // Input gets the input source data for the currently executing Function in the Flow
    Input() common.KisRowArr
    // GetName gets the name of the Flow
    GetName() string
    // GetThisFunction gets the currently executing Function
    GetThisFunction() Function
    // GetThisFuncConf gets the configuration of the currently executing Function
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector gets the Connector of the currently executing Function
    GetConnector() (Connector, error)
    // GetConnConf gets the configuration of the Connector for the currently executing Function
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig gets the configuration of the current Flow
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName gets the configuration of the Function by its name
    GetFuncConfigByName(funcName string) *config.KisFuncConfig
    // Next advances the currently executing Function to the next Function with specified Action
    Next(acts ...ActionFunc) error
    // GetCacheData gets the cache data of the current Flow
    GetCacheData(key string) interface{}
    // SetCacheData sets the cache data of the current Flow
    SetCacheData(key string, value interface{}, Exp time.Duration)

    // ++++++++++++++++++++++++++++
    // GetMetaData gets the temporary data of the current Flow
    GetMetaData(key string) interface{}
    // SetMetaData sets the temporary data of the current Flow
    SetMetaData(key string, value interface{})
}
Enter fullscreen mode Exit fullscreen mode

Define the GetMetaData() and SetMetaData() interfaces for reading and writing respectively. Finally, implement these interfaces as follows:

kis-flow/flow/kis_flow_data.go

// GetMetaData retrieves the temporary data of the current Flow object
func (flow *KisFlow) GetMetaData(key string) interface{} {
    flow.mLock.RLock()
    defer flow.mLock.RUnlock()

    data, ok := flow.metaData[key]
    if !ok {
        return nil
    }

    return data
}

// SetMetaData sets the temporary data of the current Flow object
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
    flow.mLock.Lock()
    defer flow.mLock.Unlock()

    flow.metaData[key] = value
}
Enter fullscreen mode Exit fullscreen mode

8.2.2 Adding MetaData to Function

First, add the metaData member to BaseFunction as follows:

kis-flow/function/kis_base_function.go

type BaseFunction struct {
    // Id, KisFunction instance ID, used to distinguish different instance objects within KisFlow
    Id     string
    Config *config.KisFuncConfig

    // flow
    flow kis.Flow // Context environment KisFlow

    // connector
    connector kis.Connector

    // ++++++++++++++++++++++++
    // Custom temporary data for Function
    metaData map[string]interface{}
    // Read-write lock to manage metaData
    mLock sync.RWMutex

    // link
    N kis.Function // Next stream computing Function
    P kis.Function // Previous stream computing Function
}
Enter fullscreen mode Exit fullscreen mode

In the Function constructor, each specific Function needs a constructor to initialize the metaData member. The changes are as follows:

kis-flow/function/kis_base_function.go

func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
    var f kis.Function

    // Factory produces generalized objects
    // ++++++++++++++
    switch common.KisMode(config.FMode) {
    case common.V:
        f = NewKisFunctionV() // +++
    case common.S:
        f = NewKisFunctionS() // +++
    case common.L:
        f = NewKisFunctionL() // +++
    case common.C:
        f = NewKisFunctionC() // +++
    case common.E:
        f = NewKisFunctionE() // +++
    default:
        // LOG ERROR
        return nil
    }

    // Generate random unique instance ID
    f.CreateId()

    // Set basic information properties
    if err := f.SetConfig(config); err != nil {
        panic(err)
    }

    // Set Flow
    if err := f.SetFlow(flow); err != nil {
        panic(err)
    }

    return f
}
Enter fullscreen mode Exit fullscreen mode

Each constructor is as follows:

kis-flow/function/kis_function_c.go

func NewKisFunctionC() kis.Function {
    f := new(KisFunctionC)

    // Initialize metaData
    f.metaData = make(map[string]interface{})

    return f
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_v.go

func NewKisFunctionV() kis.Function {
    f := new(KisFunctionV)

    // Initialize metaData
    f.metaData = make(map[string]interface{})

    return f
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_e.go

func NewKisFunctionE() kis.Function {
    f := new(KisFunctionE)

    // Initialize metaData
    f.metaData = make(map[string]interface{})

    return f
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_s.go

func NewKisFunctionS() kis.Function {
    f := new(KisFunctionS)

    // Initialize metaData
    f.metaData = make(map[string]interface{})

    return f
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/function/kis_function_l.go

func NewKisFunctionL() kis.Function {
    f := new(KisFunctionL)

    // Initialize metaData
    f.metaData = make(map[string]interface{})

    return f
}
Enter fullscreen mode Exit fullscreen mode

Next, add interfaces to access the metaData member in the Function abstraction layer as follows:

type Function interface {
    // Call executes the stream computing logic
    Call(ctx context.Context, flow Flow) error

    // SetConfig configures the strategy for the current Function instance
    SetConfig(s *config.KisFuncConfig) error
    // GetConfig retrieves the configuration strategy of the current Function instance
    GetConfig() *config.KisFuncConfig

    // SetFlow sets the Flow instance that the current Function instance depends on
    SetFlow(f Flow) error
    // GetFlow retrieves the Flow instance that the current Function instance depends on
    GetFlow() Flow

    // AddConnector adds a Connector to the current Function instance
    AddConnector(conn Connector) error
    // GetConnector retrieves the Connector associated with the current Function instance
    GetConnector() Connector

    // CreateId generates a random instance KisID for the current Function instance
    CreateId()
    // GetId retrieves the FID of the current Function
    GetId() string
    // GetPrevId retrieves the FID of the previous Function node of the current Function
    GetPrevId() string
    // GetNextId retrieves the FID of the next Function node of the current Function
    GetNextId() string

    // Next returns the next layer computing stream Function, or nil if it is the last layer
    Next() Function
    // Prev returns the previous layer computing stream Function, or nil if it is the last layer
    Prev() Function
    // SetN sets the next Function instance
    SetN(f Function)
    // SetP sets the previous Function instance
    SetP(f Function)

    // ++++++++++++++++++++++++++++++++++
    // GetMetaData retrieves the temporary data of the current Function
    GetMetaData(key string) interface{}
    // SetMetaData sets the temporary data of the current Function
    SetMetaData(key string, value interface{})
}
Enter fullscreen mode Exit fullscreen mode

Implement the above two interfaces in the BaseFunction.

kis-flow/function/kis_base_function.go

// GetMetaData retrieves the temporary data of the current Function
func (base *BaseFunction) GetMetaData(key string) interface{} {
    base.mLock.RLock()
    defer base.mLock.RUnlock()

    data, ok := base.metaData[key]
    if !ok {
        return nil
    }

    return data
}

// SetMetaData sets the temporary data of the current Function
func (base *BaseFunction) SetMetaData(key string, value interface{}) {
    base.mLock.Lock()
    defer base.mLock.Unlock()

    base.metaData[key] = value
}
Enter fullscreen mode Exit fullscreen mode

8.2.3 Adding MetaData to Connector

First, add the metaData member to KisConnector as follows:

kis-flow/conn/kis_connector.go

type KisConnector struct {
    // Connector ID
    CId string
    // Connector Name
    CName string
    // Connector Config
    Conf *config.KisConnConfig
    // Connector Init
    onceInit sync.Once

    // ++++++++++++++
    // Custom temporary data for KisConnector
    metaData map[string]interface{}
    // Read-write lock to manage metaData
    mLock sync.RWMutex
}

// NewKisConnector creates a KisConnector based on the configuration strategy
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
    conn := new(KisConnector)
    conn.CId = id.KisID(common.KisIdTypeConnector)
    conn.CName = config.CName
    conn.Conf = config

    // +++++++++++++++++++++++++++++++++++
    conn.metaData = make(map[string]interface{})

    return conn
}
Enter fullscreen mode Exit fullscreen mode

Initialize metaData in the constructor.

Next, add interfaces to access and set MetaData in the Connector abstraction layer as follows:

kis-flow/kis/connector.go

type Connector interface {
    // Init initializes the links of the storage engine associated with the Connector
    Init() error
    // Call invokes the read and write operations of the external storage logic of the Connector
    Call(ctx context.Context, flow Flow, args interface{}) error
    // GetId retrieves the ID of the Connector
    GetId() string
    // GetName retrieves the name of the Connector
    GetName() string
    // GetConfig retrieves the configuration information of the Connector
    GetConfig() *config.KisConnConfig
    // GetMetaData retrieves the temporary data of the current Connector

    // +++++++++++++++++++++++++++++++
    GetMetaData(key string) interface{}
    // SetMetaData sets the temporary data of the current Connector
    SetMetaData(key string, value interface{})
}
Enter fullscreen mode Exit fullscreen mode

Finally, implement the above two interfaces in KisConnector as follows:

kis-flow/conn/kis_connector.go

// GetMetaData retrieves the temporary data of the current Connector
func (conn *KisConnector) GetMetaData(key string) interface{} {
    conn.mLock.RLock()
    defer conn.mLock.RUnlock()

    data, ok := conn.metaData[key]
    if !ok {
        return nil
    }

    return data
}

// SetMetaData sets the temporary data of the current Connector
func (conn *KisConnector) SetMetaData(key string, value interface{}) {
    conn.mLock.Lock()
    defer conn.mLock.Unlock()

    conn.metaData[key] = value
}
Enter fullscreen mode Exit fullscreen mode

8.3 Configuration File Parameters

KisFlow allows developers to define default parameters (Params) for configuring Flow, Function, Connector, etc., in the configuration file. Here are some examples:

Function:

kistype: func
fname: funcName1
fmode: Verify
source:
  name: Official Account Douyin Mall Order Data
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName1_param1
    default2: funcName1_param2
Enter fullscreen mode Exit fullscreen mode

Flow:

kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
    params:
      myKey1: flowValue1-1
      myKey2: flowValue1-2
  - fname: funcName2
    params:
      myKey1: flowValue2-1
      myKey2: flowValue2-2
  - fname: funcName3
    params:
      myKey1: flowValue3-1
      myKey2: flowValue3-2
Enter fullscreen mode Exit fullscreen mode

Connector:

kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2
Enter fullscreen mode Exit fullscreen mode

Developers can provide Params for each defined module. Params provided in Flow will also be added to the Functions.

In the previous steps, we already read these parameters into each module's memory, but we did not expose an interface for developers.

8.3.1 Adding Param Retrieval Interface to Flow

First, we provide an interface for Flow to query Params:

kis-flow/kis/flow.go

type Flow interface {
    // ... ...
    // ... ...

    // GetFuncParam retrieves a key-value pair of the default parameters for the currently executing Function in the Flow
    GetFuncParam(key string) string
    // GetFuncParamAll retrieves all key-value pairs of the default parameters for the currently executing Function in the Flow
    GetFuncParamAll() config.FParam
}
Enter fullscreen mode Exit fullscreen mode

Implementation:

kis-flow/flow/kis_flow_data.go

// GetFuncParam retrieves a key-value pair of the default parameters for the currently executing Function in the Flow
func (flow *KisFlow) GetFuncParam(key string) string {
    flow.fplock.RLock()
    defer flow.fplock.RUnlock()

    if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
        if value, vok := param[key]; vok {
            return value
        }
    }

    return ""
}

// GetFuncParamAll retrieves all key-value pairs of the default parameters for the currently executing Function in the Flow
func (flow *KisFlow) GetFuncParamAll() config.FParam {
    flow.fplock.RLock()
    defer flow.fplock.RUnlock()

    param, ok := flow.funcParams[flow.ThisFunctionId]
    if !ok {
        return nil
    }

    return param
}
Enter fullscreen mode Exit fullscreen mode

GetFuncParam() and GetFuncParamAll() retrieve a single key or all parameters respectively, but both fetch the Params for the currently executing Function.

8.3.2 Unit Testing

We add some parameters to each Function in flowName1.

kis-flow/test/load_conf/flow-FlowName1.yml

kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
    params:
      myKey1: flowValue1-1
      myKey2: flowValue1-2
  - fname: funcName2
    params:
      myKey1: flowValue2-1
      myKey2: flowValue2-2
  - fname: funcName3
    params:
      myKey1: flowValue3-1
      myKey2: flowValue3-2
Enter fullscreen mode Exit fullscreen mode

Then configure some default custom parameters for each associated Function:

kis-flow/test/load_conf/func/func-FuncName1.yml

kistype: func
fname: funcName1
fmode: Verify
source:
  name: Official Account Douyin Mall Order Data
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName1_param1
    default2: funcName1_param2
Enter fullscreen mode Exit fullscreen mode

kis-flow/test/load_conf/func/func-FuncName2.yml

kistype: func
fname: funcName2
fmode: Save
source:
  name: User Order Error Rate
  must:
    - order_id
    - user_id
option:
  cname: ConnName1
  default_params:
    default1: funcName2_param1
    default2: funcName2_param2
Enter fullscreen mode Exit fullscreen mode

kis-flow/test/load_conf/func/func-FuncName3.yml

kistype: func
fname: funcName3
fmode: Calculate
source:
  name: User Order Error Rate
  must:
    - order_id
    - user_id
option:
  default_params:
    default1: funcName3_param1
    default2: funcName3_param2
Enter fullscreen mode Exit fullscreen mode

We also configure some Param parameters for the Connector associated with FuncName2:

kis-flow/test/load_conf/conn/conn-ConnName1.yml

kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2
Enter fullscreen mode Exit fullscreen mode

To verify that our configuration parameters can be correctly retrieved during the execution of Functions, we modified each Function and Connector business function to print their Params:

kis-flow/test/faas/faas_demo1.go

func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName1Handler ----")

    // ++++++++++++++++
    fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

    for index, row := range flow.Input() {
        // Print data
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // Calculate result data
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // Commit result data
        _ = flow.CommitRow(resultStr)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/test/faas/faas_demo2.go

func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName2Handler ----")

    // ++++++++++++++++
    fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

    for index, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        conn, err := flow.GetConnector()
        if err != nil {
            log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
            return err
        }

        if conn.Call(ctx, flow, row) != nil {
            log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
            return err
        }

        // Calculate result data
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // Commit result data
        _ = flow.CommitRow(resultStr)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/test/faas/faas_demo3.go

func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call funcName3Handler ----")

    // ++++++++++++++++
    fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return nil
}
Enter fullscreen mode Exit fullscreen mode

kis-flow/test/caas/caas_demo1.go

func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
    fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
        flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)

    // +++++++++++ 
    fmt.Printf("Params = %+v\n", conn.GetConfig().Params)

    fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)

    return nil
}
Enter fullscreen mode Exit fullscreen mode

Finally, we write the unit test cases:

kis-flow/test/kis_params_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestParams(t *testing.T) {
    ctx := context.Background()

    // 0. Register Function callback businesses
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. Register ConnectorInit and Connector callback businesses
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. Load configuration files and build Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. Get Flow
    flow1 := kis.Pool().GetFlow("flowName1")

    // 3. Submit original data
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. Execute flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Navigate to the kis-flow/test/ directory and execute:

go test -test.v -test.paniconexit0 -test.run  TestParams 
Enter fullscreen mode Exit fullscreen mode
=== RUN   TestParams
....
....

---> Call funcName1Handler ----
Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]
...
...

---> Call funcName2Handler ----
Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]

...
...

---> Call funcName3Handler ----
Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2]

...
...
--- PASS: TestParams (0.01s)
PASS
ok      kis-flow/test   0.433s
Enter fullscreen mode Exit fullscreen mode

As we can see, we can now correctly retrieve the Params configuration parameters at each level.

8.4 [V0.7] Source Code

https://github.com/aceld/kis-flow/releases/tag/v0.7


Author: Aceld
GitHub: https://github.com/aceld

KisFlow Open Source Project Address: https://github.com/aceld/kis-flow

Document: https://github.com/aceld/kis-flow/wiki


Part1-OverView
Part2.1-Project Construction / Basic Modules
Part2.2-Project Construction / Basic Modules
Part3-Data Stream
Part4-Function Scheduling
Part5-Connector
Part6-Configuration Import and Export
Part7-KisFlow Action
Part8-Cache/Params Data Caching and Data Parameters
Part9-Multiple Copies of Flow
Part10-Prometheus Metrics Statistics
Part11-Adaptive Registration of FaaS Parameter Types Based on Reflection


Case1-Quick Start
Case2-Flow Parallel Operation
Case3-Application of KisFlow in Multi-Goroutines
Case4-KisFlow in Message Queue (MQ) Applications


Top comments (0)