diff --git a/README.md b/README.md index b1a12ea..a018aec 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ A Golang based high performance, scalable and distributed workflow framework It allows to programmatically author distributed workflow as Directed Acyclic Graph (DAG) of tasks. -Goflow executes your tasks on an array of goflow workers by uniformly distributing the loads +GoFlow executes your tasks on an array of Flow workers by uniformly distributing the loads ![Build](https://github.com/faasflow/goflow/workflows/GO-Flow-Build/badge.svg) [![GoDoc](https://godoc.org/github.com/faasflow/goflow?status.svg)](https://godoc.org/github.com/faasflow/goflow) @@ -15,7 +15,7 @@ go get github.com/faasflow/goflow ``` ## Write First Flow -> Library to Build Flow `github.com/faasflow/lib/goflow` +> Library to Build Flow `github.com/faasflow/goflow/flow` [![GoDoc](https://godoc.org/github.com/faasflow/lib/goflow?status.svg)](https://godoc.org/github.com/faasflow/lib/goflow) @@ -26,7 +26,7 @@ package main import ( "fmt" "github.com/faasflow/goflow" - flow "github.com/faasflow/lib/goflow" + flow "github.com/faasflow/goflow/flow" ) // Workload function @@ -73,7 +73,7 @@ curl -d hallo localhost:8080 GoFlow scale horizontally, you can distribute the load by just adding more instances. #### Worker Mode -Alternatively you can start your goflow in worker mode. As a worker goflow only handles the workload, +Alternatively you can start your GoFlow in worker mode. As a worker GoFlow only handles the workload, and if required you can only scale the workers ```go fs := &goflow.FlowService{ @@ -85,7 +85,7 @@ fs.StartWorker("myflow", DefineWorkflow) ``` #### Server Mode -Similarly you can start your goflow as a server. It only handles the incoming http requests you will +Similarly you can start your GoFlow as a server. It only handles the incoming http requests you will need to add workers to distribute the workload ```go fs := &goflow.FlowService{ diff --git a/flow/operation.go b/flow/operation.go new file mode 100644 index 0000000..a5d5ab5 --- /dev/null +++ b/flow/operation.go @@ -0,0 +1,139 @@ +package flow + +import ( + "fmt" +) + +var ( + BLANK_MODIFIER = func(data []byte) ([]byte, error) { return data, nil } +) + +// FuncErrorHandler the error handler for OnFailure() options +type FuncErrorHandler func(error) error + +// Modifier definition for Modify() call +type Modifier func([]byte, map[string][]string) ([]byte, error) + +type ServiceOperation struct { + Id string // ID + Mod Modifier // Modifier + Options map[string][]string // The option as a input to workload + + FailureHandler FuncErrorHandler // The Failure handler of the operation +} + +// createWorkload Create a function with execution name +func createWorkload(id string, mod Modifier) *ServiceOperation { + operation := &ServiceOperation{} + operation.Mod = mod + operation.Id = id + operation.Options = make(map[string][]string) + return operation +} + +func (operation *ServiceOperation) addOptions(key string, value string) { + array, ok := operation.Options[key] + if !ok { + operation.Options[key] = make([]string, 1) + operation.Options[key][0] = value + } else { + operation.Options[key] = append(array, value) + } +} + +func (operation *ServiceOperation) addFailureHandler(handler FuncErrorHandler) { + operation.FailureHandler = handler +} + +func (operation *ServiceOperation) GetOptions() map[string][]string { + return operation.Options +} + +func (operation *ServiceOperation) GetId() string { + return operation.Id +} + +func (operation *ServiceOperation) Encode() []byte { + return []byte("") +} + +// executeWorkload executes a function call +func executeWorkload(operation *ServiceOperation, data []byte) ([]byte, error) { + var err error + var result []byte + + options := operation.GetOptions() + result, err = operation.Mod(data, options) + + return result, err +} + +func (operation *ServiceOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error) { + var result []byte + var err error + + if operation.Mod != nil { + result, err = executeWorkload(operation, data) + if err != nil { + err = fmt.Errorf("function(%s), error: function execution failed, %v", + operation.Id, err) + if operation.FailureHandler != nil { + err = operation.FailureHandler(err) + } + if err != nil { + return nil, err + } + } + } + + return result, nil +} + +func (operation *ServiceOperation) GetProperties() map[string][]string { + + result := make(map[string][]string) + + isMod := "false" + isFunction := "false" + isHttpRequest := "false" + hasFailureHandler := "false" + + if operation.Mod != nil { + isFunction = "true" + } + if operation.FailureHandler != nil { + hasFailureHandler = "true" + } + + result["isMod"] = []string{isMod} + result["isFunction"] = []string{isFunction} + result["isHttpRequest"] = []string{isHttpRequest} + result["hasFailureHandler"] = []string{hasFailureHandler} + + return result +} + +// Apply adds a new workload to the given vertex +func (node *Node) Apply(id string, workload Modifier, opts ...Option) *Node { + + newWorkload := createWorkload(id, workload) + + o := &Options{} + for _, opt := range opts { + o.reset() + opt(o) + if len(o.option) != 0 { + for key, array := range o.option { + for _, value := range array { + newWorkload.addOptions(key, value) + } + } + } + if o.failureHandler != nil { + newWorkload.addFailureHandler(o.failureHandler) + } + } + + node.unode.AddOperation(newWorkload) + return node +} diff --git a/flow/workflow.go b/flow/workflow.go new file mode 100644 index 0000000..2a3df93 --- /dev/null +++ b/flow/workflow.go @@ -0,0 +1,287 @@ +package flow + +import ( + "fmt" + sdk "github.com/faasflow/sdk" +) + +type Context sdk.Context +type StateStore sdk.StateStore +type DataStore sdk.DataStore + +// Options options for operation execution +type Options struct { + option map[string][]string + failureHandler FuncErrorHandler +} + +// BranchOptions options for branching in DAG +type BranchOptions struct { + aggregator sdk.Aggregator + forwarder sdk.Forwarder + noForwarder bool +} + +type Workflow struct { + pipeline *sdk.Pipeline // underline pipeline definition object +} + +type Dag struct { + udag *sdk.Dag +} + +type Node struct { + unode *sdk.Node +} + +type Option func(*Options) +type BranchOption func(*BranchOptions) + +var ( + // Execution specify a edge doesn't forwards a data + // but rather mention a execution direction + Execution = InvokeEdge() +) + +// reset reset the Options +func (o *Options) reset() { + o.option = map[string][]string{} + o.failureHandler = nil +} + +// reset reset the BranchOptions +func (o *BranchOptions) reset() { + o.aggregator = nil + o.noForwarder = false + o.forwarder = nil +} + +// Aggregator aggregates all outputs into one +func Aggregator(aggregator sdk.Aggregator) BranchOption { + return func(o *BranchOptions) { + o.aggregator = aggregator + } +} + +// InvokeEdge denotes a edge doesn't forwards a data, +// but rather provides only an execution flow +func InvokeEdge() BranchOption { + return func(o *BranchOptions) { + o.noForwarder = true + } +} + +// Forwarder encodes request based on need for children vertex +// by default the data gets forwarded as it is +func Forwarder(forwarder sdk.Forwarder) BranchOption { + return func(o *BranchOptions) { + o.forwarder = forwarder + } +} + +// WorkloadOption Specify a option parameter in a workload +func WorkloadOption(key string, value ...string) Option { + return func(o *Options) { + array := []string{} + for _, val := range value { + array = append(array, val) + } + o.option[key] = array + } +} + +// OnFailure Specify a function failure handler +func OnFailure(handler FuncErrorHandler) Option { + return func(o *Options) { + o.failureHandler = handler + } +} + +// GetWorkflow initiates a flow with a pipeline +func GetWorkflow(pipeline *sdk.Pipeline) *Workflow { + workflow := &Workflow{} + workflow.pipeline = pipeline + return workflow +} + +// OnFailure set a failure handler routine for the pipeline +func (flow *Workflow) OnFailure(handler sdk.PipelineErrorHandler) { + flow.pipeline.FailureHandler = handler +} + +// Finally sets an execution finish handler routine +// it will be called once the execution has finished with state either Success/Failure +func (flow *Workflow) Finally(handler sdk.PipelineHandler) { + flow.pipeline.Finally = handler +} + +// GetPipeline expose the underlying pipeline object +func (flow *Workflow) GetPipeline() *sdk.Pipeline { + return flow.pipeline +} + +// Dag provides the workflow dag object +func (flow *Workflow) Dag() *Dag { + dag := &Dag{} + dag.udag = flow.pipeline.Dag + return dag +} + +// SetDag apply a predefined dag, and override the default dag +func (flow *Workflow) SetDag(dag *Dag) { + pipeline := flow.pipeline + pipeline.SetDag(dag.udag) +} + +// NewDag creates a new dag separately from pipeline +func NewDag() *Dag { + dag := &Dag{} + dag.udag = sdk.NewDag() + return dag +} + +// Append generalizes a seperate dag by appending its properties into current dag. +// Provided dag should be mutually exclusive +func (currentDag *Dag) Append(dag *Dag) { + err := currentDag.udag.Append(dag.udag) + if err != nil { + panic(fmt.Sprintf("Error at AppendDag, %v", err)) + } +} + +// Node adds a new vertex by id +func (currentDag *Dag) Node(vertex string, options ...BranchOption) *Node { + node := currentDag.udag.GetNode(vertex) + if node == nil { + node = currentDag.udag.AddVertex(vertex, []sdk.Operation{}) + } + o := &BranchOptions{} + for _, opt := range options { + o.reset() + opt(o) + if o.aggregator != nil { + node.AddAggregator(o.aggregator) + } + } + return &Node{unode: node} +} + +// Edge adds a directed edge between two vertex as -> +func (currentDag *Dag) Edge(from, to string, opts ...BranchOption) { + err := currentDag.udag.AddEdge(from, to) + if err != nil { + panic(fmt.Sprintf("Error at AddEdge for %s-%s, %v", from, to, err)) + } + o := &BranchOptions{} + for _, opt := range opts { + o.reset() + opt(o) + if o.noForwarder == true { + fromNode := currentDag.udag.GetNode(from) + // Add a nil forwarder overriding the default forwarder + fromNode.AddForwarder(to, nil) + } + + // in case there is a override + if o.forwarder != nil { + fromNode := currentDag.udag.GetNode(from) + fromNode.AddForwarder(to, o.forwarder) + } + } +} + +// SubDag composites a seperate dag as a node. +func (currentDag *Dag) SubDag(vertex string, dag *Dag) { + node := currentDag.udag.AddVertex(vertex, []sdk.Operation{}) + err := node.AddSubDag(dag.udag) + if err != nil { + panic(fmt.Sprintf("Error at AddSubDag for %s, %v", vertex, err)) + } + return +} + +// ForEachBranch composites a sub-dag which executes for each value +// It returns the sub-dag that will be executed for each value +func (currentDag *Dag) ForEachBranch(vertex string, foreach sdk.ForEach, options ...BranchOption) (dag *Dag) { + node := currentDag.udag.AddVertex(vertex, []sdk.Operation{}) + if foreach == nil { + panic(fmt.Sprintf("Error at AddForEachBranch for %s, foreach function not specified", vertex)) + } + node.AddForEach(foreach) + + for _, option := range options { + o := &BranchOptions{} + o.reset() + option(o) + if o.aggregator != nil { + node.AddSubAggregator(o.aggregator) + } + if o.noForwarder == true { + node.AddForwarder("dynamic", nil) + } + } + + dag = NewDag() + err := node.AddForEachDag(dag.udag) + if err != nil { + panic(fmt.Sprintf("Error at AddForEachBranch for %s, %v", vertex, err)) + } + return +} + +// ConditionalBranch composites multiple dags as a sub-dag which executes for a conditions matched +// and returns the set of dags based on the condition passed +func (currentDag *Dag) ConditionalBranch(vertex string, conditions []string, condition sdk.Condition, + options ...BranchOption) (conditiondags map[string]*Dag) { + + node := currentDag.udag.AddVertex(vertex, []sdk.Operation{}) + if condition == nil { + panic(fmt.Sprintf("Error at AddConditionalBranch for %s, condition function not specified", vertex)) + } + node.AddCondition(condition) + + for _, option := range options { + o := &BranchOptions{} + o.reset() + option(o) + if o.aggregator != nil { + node.AddSubAggregator(o.aggregator) + } + if o.noForwarder == true { + node.AddForwarder("dynamic", nil) + } + } + conditiondags = make(map[string]*Dag) + for _, conditionKey := range conditions { + dag := NewDag() + node.AddConditionalDag(conditionKey, dag.udag) + conditiondags[conditionKey] = dag + } + return +} + +// AddOperation adds an Operation to the given vertex +func (node *Node) AddOperation(operation sdk.Operation) *Node { + node.unode.AddOperation(operation) + return node +} + +// SyncNode adds a new vertex named Sync +func (flow *Workflow) SyncNode(options ...BranchOption) *Node { + + dag := flow.pipeline.Dag + + node := dag.GetNode("sync") + if node == nil { + node = dag.AddVertex("sync", []sdk.Operation{}) + } + o := &BranchOptions{} + for _, opt := range options { + o.reset() + opt(o) + if o.aggregator != nil { + node.AddAggregator(o.aggregator) + } + } + return &Node{unode: node} +} diff --git a/go.mod b/go.mod index a3a5865..3dc2b66 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/faasflow/faas-flow-redis-datastore v1.0.1-0.20200718081732-431d3cc7894a github.com/faasflow/faas-flow-redis-statestore v1.0.1-0.20200718082116-d90985fdbde1 - github.com/faasflow/lib v1.1.1-0.20200719042107-174c40f5070b github.com/faasflow/runtime v0.2.2 github.com/faasflow/sdk v1.0.0 github.com/garyburd/redigo v1.6.0 // indirect diff --git a/go.sum b/go.sum index 5be8502..5ab988a 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ github.com/faasflow/faas-flow-redis-datastore v1.0.1-0.20200718081732-431d3cc789 github.com/faasflow/faas-flow-redis-datastore v1.0.1-0.20200718081732-431d3cc7894a/go.mod h1:vV/vLuH5WGs4T040uqIbhdbQmdImGY7iYVBkqKGeUrc= github.com/faasflow/faas-flow-redis-statestore v1.0.1-0.20200718082116-d90985fdbde1 h1:k10t8Eq5jley1oVlMK1suXq0mmF9IaqYJm0VnrAHQYQ= github.com/faasflow/faas-flow-redis-statestore v1.0.1-0.20200718082116-d90985fdbde1/go.mod h1:F2ZiHritsST6NSjqrQVgoU75rWtoimLV7qKXkCmNVgM= -github.com/faasflow/lib v1.1.1-0.20200719042107-174c40f5070b h1:7XAaPgB8on80ZJ0dKuhE6D9cUSpjKsAT2hajnW4XbFg= -github.com/faasflow/lib v1.1.1-0.20200719042107-174c40f5070b/go.mod h1:8WerNZmVn5ad2Idf6hufkILrB/xXOJOnraIfykz+2Qc= github.com/faasflow/runtime v0.2.2 h1:XaKJU9X9DuLuVZhc5Von7R98aw1GgtYjfQNvWQWHma0= github.com/faasflow/runtime v0.2.2/go.mod h1:fd+6ZuXgYquHpKeaWSwbTWUrJuirfqIrt/Lrm3Rr/kY= github.com/faasflow/sdk v0.0.0-20200705012738-72f2bcdb62d1/go.mod h1:cpcCvb40uzDNzTT0qxiA6QGuOu8a71LMV2w/ikAW5LU= diff --git a/runtime/flow_executor.go b/runtime/flow_executor.go index 2d972f8..fe2f7a6 100644 --- a/runtime/flow_executor.go +++ b/runtime/flow_executor.go @@ -3,7 +3,7 @@ package runtime import ( "bytes" "fmt" - "github.com/faasflow/lib/goflow" + "github.com/faasflow/goflow/flow" "github.com/faasflow/runtime" "io/ioutil" "log" @@ -34,7 +34,7 @@ type FlowExecutor struct { Runtime *FlowRuntime } -type FlowDefinitionHandler func(flow *goflow.Workflow, context *goflow.Context) error +type FlowDefinitionHandler func(flow *flow.Workflow, context *flow.Context) error func (fe *FlowExecutor) HandleNextNode(partial *executor.PartialState) error { var err error @@ -100,8 +100,8 @@ func (fe *FlowExecutor) GetFlowName() string { } func (fe *FlowExecutor) GetFlowDefinition(pipeline *sdk.Pipeline, context *sdk.Context) error { - workflow := goflow.GetWorkflow(pipeline) - faasflowContext := (*goflow.Context)(context) + workflow := flow.GetWorkflow(pipeline) + faasflowContext := (*flow.Context)(context) return fe.Handler(workflow, faasflowContext) }