-
Notifications
You must be signed in to change notification settings - Fork 39
Home
Swarvanu Sengupta edited this page Jul 16, 2019
·
9 revisions
faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals.
import faasflow "github.com/s8sg/faas-flow"
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
flow.Apply("Func1").Apply("Func2")
}
After building and deploying, it will give you a function that orchestrates calling Func2
with the output of Func1
By supplying a number of pipeline operators, complex composition can be achieved with little work:
The above pipeline can be achieved with little, but powerfull code:
Sync Chain
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
flow.SyncNode().Apply("func1").Apply("func2").
Modify(func(data []byte) ([]byte, error) {
// Do something
return data, nil
})
return nil
}
Async Chain
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := flow.Dag()
dag.Node("n1").Apply("func1")
dag.Node("n2").Apply("func2").
Modify(func(data []byte) ([]byte, error) {
// Do something
return data
}).
dag.Node("n3").callback("storage.io/bucket?id=3345612358265349126&file=result.dat")
dag.Edge("n1", "n2")
dag.Edge("n2", "n3")
flow.OnFailure(func(err error) {
// failure handler
}).
Finally(func(state string) {
// cleanup code
})
return nil
}
Parallel Branching
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := flow.Dag()
dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.Node("n2").Apply("func1")
dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.Node("n4").Callback("storage.io/bucket?id=3345612358265349126&file=result")
dag.Edge("n1", "n2")
dag.Edge("n1", "n3")
dag.Edge("n2", "n4")
dag.Edge("n3", "n4")
return nil
}
Dynamic Branching
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := flow.Dag()
dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
return data, nil
})
conditionalDags := dag.ConditionalBranch("C",
[]string{"c1", "c2"},
func(response []byte) []string {
// for each returned condition a seperate branch will execute
// this function executes in the runtime of condition C
return []string{"c1", "c2"}
},
)
conditionalDags["c2"].Node("n1").Apply("func").Modify(func(data []byte) ([]byte, error) {
return data, nil
})
foreachDag := conditionalDags["c1"].ForEachBranch("F",
func(data []byte) map[string][]byte {
// for each returned key in the hashmap a new branch will be executed
// this function executes in the runtime of foreach F
return map[string][]byte{ "f1": data, "f2": data }
},
)
foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) {
return data, nil
})
dag.Node("n2").Callback("storage.io/bucket?id=3345612358265349126&file=result")
dag.Edge("n1", "C")
dag.Edge("C", "n2")
}
func DefineStateStore() (faasflow.StateStore, error) {
consulss, err := consulStateStore.GetConsulStateStore()
return consulss, err
}
func DefineDataStore() (faasflow.DataStore, error) {
miniods, err := minioDataStore.InitFromEnv()
return miniods, err
}