Skip to content
Swarvanu Sengupta edited this page Jul 16, 2019 · 9 revisions

Overview

faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals.

import faasflow "github.com/s8sg/faas-flow"

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
  flow.Apply("Func1").Apply("Func2")
}

After building and deploying, it will give you a function that orchestrates calling Func2 with the output of Func1

Pipeline Definition

By supplying a number of pipeline operators, complex composition can be achieved with little work: alt overview

The above pipeline can be achieved with little, but powerfull code:

Sync Chain

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
     flow.SyncNode().Apply("func1").Apply("func2").
	  Modify(func(data []byte) ([]byte, error) {
	  	// Do something
		return data, nil
	  })
     return nil
}

Async Chain

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {

     dag := flow.Dag()
     dag.Node("n1").Apply("func1")
     dag.Node("n2").Apply("func2").
        Modify(func(data []byte) ([]byte, error) {
	        // Do something
               	return data
        }).
     dag.Node("n3").callback("storage.io/bucket?id=3345612358265349126&file=result.dat")
     dag.Edge("n1", "n2")
     dag.Edge("n2", "n3")
     flow.OnFailure(func(err error) {
              // failure handler
        }).
        Finally(func(state string) {
              // cleanup code
        })
     
     return nil
}

Parallel Branching

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {

     dag := flow.Dag()
     dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
     		// do something
		return data, nil
     })
     dag.Node("n2").Apply("func1")
     dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) {
     		// do something
		return data, nil
     })
     dag.Node("n4").Callback("storage.io/bucket?id=3345612358265349126&file=result")
     dag.Edge("n1", "n2")
     dag.Edge("n1", "n3")
     dag.Edge("n2", "n4")
     dag.Edge("n3", "n4")

     return nil
}

Dynamic Branching

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
     dag := flow.Dag()
     dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
                return data, nil
     })
     conditionalDags := dag.ConditionalBranch("C",
                []string{"c1", "c2"},
                func(response []byte) []string {
                        // for each returned condition a seperate branch will execute
                        // this function executes in the runtime of condition C
                        return []string{"c1", "c2"}
                },
     )
     conditionalDags["c2"].Node("n1").Apply("func").Modify(func(data []byte) ([]byte, error) {
                return data, nil
     })
     foreachDag := conditionalDags["c1"].ForEachBranch("F",
                func(data []byte) map[string][]byte {
                        // for each returned key in the hashmap a new branch will be executed
                        // this function executes in the runtime of foreach F
                        return map[string][]byte{ "f1": data, "f2": data }
                },
     )
     foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) {
                return data, nil
     })
     dag.Node("n2").Callback("storage.io/bucket?id=3345612358265349126&file=result")
     dag.Edge("n1", "C")
     dag.Edge("C", "n2")
}

func DefineStateStore() (faasflow.StateStore, error) {
        consulss, err := consulStateStore.GetConsulStateStore()
        return consulss, err
}

func DefineDataStore() (faasflow.DataStore, error) {
        miniods, err := minioDataStore.InitFromEnv()
        return miniods, err
}
Clone this wiki locally