Skip to content

Commit

Permalink
Implemented PassNode and WaitNode in Serverledge
Browse files Browse the repository at this point in the history
Added test for PassNode (checks that passes output to the next simple node)
Added test for WaitNode (checks that waits 2 seconds)
Now NewFC can return an error
  • Loading branch information
redjack96 committed Sep 4, 2024
1 parent d30cac9 commit 311c28a
Show file tree
Hide file tree
Showing 13 changed files with 512 additions and 163 deletions.
3 changes: 1 addition & 2 deletions internal/fc/asl.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ func FromASL(name string, aslSrc []byte) (*FunctionComposition, error) {
functions = append(functions, funcObj)
}

comp := NewFC(stateMachine.Name, *dag, functions, true)
return &comp, nil
return NewFC(stateMachine.Name, *dag, functions, true)
}

/* ============== Build from ASL States =================== */
Expand Down
27 changes: 8 additions & 19 deletions internal/fc/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func VisitDag(dag *Dag, nodeId DagNodeId, nodes []DagNode, excludeEnd bool) []Da
}
}
return nodes
case *SimpleNode:
case *SimpleNode, *PassNode, *WaitNode, *SucceedNode, *FailNode:
toAdd := VisitDag(dag, n.GetNext()[0], nodes, excludeEnd)
for _, add := range toAdd {
if !isDagNodePresent(add, nodes) {
Expand Down Expand Up @@ -645,10 +645,14 @@ func (dag *Dag) Execute(r *CompositionRequest) (bool, error) {
shouldContinue, err = dag.executeStart(progress, node, r)
case *FanOutNode:
shouldContinue, err = dag.executeFanOut(progress, node, r)
case *PassNode:
shouldContinue, err = commonExec(dag, progress, node, r)
case *WaitNode:
shouldContinue, err = commonExec(dag, progress, node, r)
case *FailNode:
shouldContinue, err = dag.executeFailNode(progress, node, r)
shouldContinue, err = dag.executeFailNode(progress, node, r) // TODO: use commonExec
case *SucceedNode:
shouldContinue, err = dag.executeSucceedNode(progress, node, r)
shouldContinue, err = dag.executeSucceedNode(progress, node, r) // TODO: use commonExec
case *EndNode:
shouldContinue, err = dag.executeEnd(progress, node, r)
}
Expand Down Expand Up @@ -726,22 +730,7 @@ func (dag *Dag) MarshalJSON() ([]byte, error) {

// Marshal the interface and store it as concrete node value in the map
for nodeId, node := range dag.Nodes {
switch concreteNode := node.(type) {
case *StartNode:
nodes[nodeId] = concreteNode
case *EndNode:
nodes[nodeId] = concreteNode
case *SimpleNode:
nodes[nodeId] = concreteNode
case *ChoiceNode:
nodes[nodeId] = concreteNode
case *FanOutNode:
nodes[nodeId] = concreteNode
case *FanInNode:
nodes[nodeId] = concreteNode
default:
return nil, fmt.Errorf("unsupported Simpatica type")
}
nodes[nodeId] = node
}
data["Nodes"] = nodes

Expand Down
67 changes: 67 additions & 0 deletions internal/fc/dag_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,73 @@ func (b *DagBuilder) AddSucceedNodeAndBuild(message string) (*Dag, error) {
return b.Build()
}

func (b *DagBuilder) AddPassNode(result string) *DagBuilder {
nErrors := len(b.errors)
if nErrors > 0 {
fmt.Printf("AddSimpleNode skipped, because of %d error(s) in dagBuilder\n", nErrors)
return b
}

passNode := NewPassNode(result)
passNode.setBranchId(b.BranchNumber)

b.dag.addNode(passNode)
err := b.dag.chain(b.prevNode, passNode)
if err != nil {
b.appendError(err)
return b
}

b.prevNode = passNode
return b
}

func (b *DagBuilder) AddWaitNode(seconds int) *DagBuilder {
nErrors := len(b.errors)
if nErrors > 0 {
fmt.Printf("AddSimpleNode skipped, because of %d error(s) in dagBuilder\n", nErrors)
return b
}

passNode := NewWaitNode(seconds)
passNode.setBranchId(b.BranchNumber)

b.dag.addNode(passNode)
err := b.dag.chain(b.prevNode, passNode)
if err != nil {
b.appendError(err)
return b
}

b.prevNode = passNode
return b
}

func (b *DagBuilder) AddWaitNodeWithTimestamp(timestampRFC3339 string) *DagBuilder {
nErrors := len(b.errors)
if nErrors > 0 {
fmt.Printf("AddSimpleNode skipped, because of %d error(s) in dagBuilder\n", nErrors)
return b
}
timestamp, ok := parseRFC3339(timestampRFC3339)
if !ok {
b.appendError(fmt.Errorf("failed to parse timestamp RFC3339 from string %s", timestampRFC3339))
return b
}
passNode := NewWaitNodeFromTimestamp(timestamp)
passNode.setBranchId(b.BranchNumber)

b.dag.addNode(passNode)
err := b.dag.chain(b.prevNode, passNode)
if err != nil {
b.appendError(err)
return b
}

b.prevNode = passNode
return b
}

// Build ends the single branch with an EndNode. If there is more than one branch, it panics!
func (b *DagBuilder) Build() (*Dag, error) {
switch b.prevNode.(type) {
Expand Down
1 change: 1 addition & 0 deletions internal/fc/fanout_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"
)

// TODO: when a branch has a fail node, all other branches should terminate immediately and the FanOut, FanIn and all nodes in the branches should be considered failed
// FanOutNode is a DagNode that receives one input and sends multiple result, produced in parallel
type FanOutNode struct {
Id DagNodeId
Expand Down
17 changes: 13 additions & 4 deletions internal/fc/function_composition.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func CreateExecutionReportId(dagNode DagNode) ExecutionReportId {
type CompositionExecutionReport struct {
Result map[string]interface{}
Reports *hashmap.Map[ExecutionReportId, *function.ExecutionReport]
ResponseTime float64 // time waited by the user to get the output of the entire composition
ResponseTime float64 // time waited by the user to get the output of the entire composition (in seconds)
Progress *Progress `json:"-"` // skipped in Json marshaling
}

Expand Down Expand Up @@ -68,7 +68,7 @@ func (cer *CompositionExecutionReport) GetAllResults() string {
}

// NewFC instantiates a new FunctionComposition with a name and a corresponding dag. The functions parameter can contain duplicate functions (with the same name)
func NewFC(name string, dag Dag, functions []*function.Function, removeFnOnDeletion bool) FunctionComposition {
func NewFC(name string, dag Dag, functions []*function.Function, removeFnOnDeletion bool) (*FunctionComposition, error) {
functionMap := make(map[string]*function.Function)
if functions != nil {
for _, f := range functions {
Expand All @@ -77,12 +77,21 @@ func NewFC(name string, dag Dag, functions []*function.Function, removeFnOnDelet
}
}

return FunctionComposition{
// if not all unique functions are present inside the functions array, we return an error
definedFunctions := dag.GetUniqueDagFunctions()
for _, f := range definedFunctions {
_, ok2 := functionMap[f]
if !ok2 {
return nil, fmt.Errorf("the function %s is not included in the FunctionComposition functions parameter, but it must be registered to Serverledge", f)
}
}

return &FunctionComposition{
Name: name,
Functions: functionMap,
Workflow: dag,
RemoveFnOnDeletion: removeFnOnDeletion,
}
}, nil
}

func (fc *FunctionComposition) getEtcdKey() string {
Expand Down
161 changes: 122 additions & 39 deletions internal/fc/pass_node.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,167 @@
package fc

import (
"fmt"
"github.com/grussorusso/serverledge/internal/function"
"github.com/grussorusso/serverledge/internal/types"
"github.com/lithammer/shortuuid"
"time"
)

type PassNode struct {
Id DagNodeId
NodeType DagNodeType
Id DagNodeId
NodeType DagNodeType
Result string
ResultPath string
OutputTo DagNodeId
BranchId int
}

func NewPassNode(message string) *PassNode {
func NewPassNode(result string) *PassNode {
passNode := PassNode{
Id: DagNodeId("pass_" + shortuuid.New()),
NodeType: Pass,
Result: result,
}
return &passNode
}

func (p *PassNode) Equals(cmp types.Comparable) bool {
//TODO implement me
panic("implement me")
func (p *PassNode) Exec(compRequest *CompositionRequest, params ...map[string]interface{}) (map[string]interface{}, error) {
t0 := time.Now()
var err error = nil
if len(params) != 1 {
return nil, fmt.Errorf("failed to get one input for pass node: received %d inputs", len(params))
}
output := params[0]
respAndDuration := time.Now().Sub(t0).Seconds()
execReport := &function.ExecutionReport{
Result: fmt.Sprintf("%v", output),
ResponseTime: respAndDuration,
IsWarmStart: true, // not in a container
InitTime: 0,
OffloadLatency: 0,
Duration: respAndDuration,
SchedAction: "",
}
compRequest.ExecReport.Reports.Set(CreateExecutionReportId(p), execReport)
return output, err
}

func (p *PassNode) String() string {
//TODO implement me
panic("implement me")
func (p *PassNode) Equals(cmp types.Comparable) bool {
p2, ok := cmp.(*PassNode)
if !ok {
return false
}
return p.Id == p2.Id &&
p.NodeType == p2.NodeType &&
p.Result == p2.Result &&
p.ResultPath == p2.ResultPath &&
p.OutputTo == p2.OutputTo &&
p.BranchId == p2.BranchId
}

func (p *PassNode) GetId() DagNodeId {
//TODO implement me
panic("implement me")
func (p *PassNode) CheckInput(input map[string]interface{}) error {
return nil
}

func (p *PassNode) Name() string {
//TODO implement me
panic("implement me")
// AddOutput for a PassNode connects it to another DagNode, except StartNode
func (p *PassNode) AddOutput(dag *Dag, dagNode DagNodeId) error {
_, ok := dag.Nodes[dagNode].(*StartNode)
if ok {
return fmt.Errorf("the PassNode cannot be chained to a startNode")
}
p.OutputTo = dagNode
return nil
}

func (p *PassNode) Exec(compRequest *CompositionRequest, params ...map[string]interface{}) (map[string]interface{}, error) {
//TODO implement me
panic("implement me")
}
func (p *PassNode) PrepareOutput(dag *Dag, output map[string]interface{}) error {
if p.ResultPath != "" {
return fmt.Errorf("ResultPath not currently implemented") // TODO: implement it
}

func (p *PassNode) AddOutput(dag *Dag, dagNode DagNodeId) error {
//TODO implement me
panic("implement me")
}
if len(p.GetNext()) == 0 {
return fmt.Errorf("failed to map output: there are no next node after PassNode")
}
// Get the next node.
nextNodeId := p.GetNext()[0]

func (p *PassNode) CheckInput(input map[string]interface{}) error {
//TODO implement me
panic("implement me")
nextNode, ok := dag.Find(nextNodeId)
if !ok {
return fmt.Errorf("failed to find next node")
}

// If it is a SimpleNode
nextSimpleNode, ok := nextNode.(*SimpleNode)
if !ok {
return nil
}
return p.MapOutput(nextSimpleNode, output)
}

func (p *PassNode) PrepareOutput(dag *Dag, output map[string]interface{}) error {
//TODO implement me
panic("implement me")
// MapOutput changes the names of the output parameters to match the name of the input parameters of the next SimpleNode
func (p *PassNode) MapOutput(nextNode *SimpleNode, output map[string]interface{}) error {
funct, exists := function.GetFunction(nextNode.Func)
if !exists {
return fmt.Errorf("function %s doesn't exist", nextNode.Func)
}
sign := funct.Signature
// if there are no inputs, we do nothing
for _, def := range sign.GetInputs() {
// if output has same name as input, we do not need to change name
_, present := output[def.Name]
if present {
continue
}
// find an entry in the output map that successfully checks the type of the InputDefinition
key, ok := def.FindEntryThatTypeChecks(output)
if ok {
// we get the output value
val := output[key]
// we remove the output entry ...
delete(output, key)
// and replace with the input entry
output[def.Name] = val
// save the output map in the input of the node
//s.inputMutex.Lock()
//s.input = output
//s.inputMutex.Unlock()
} else {
// otherwise if no one of the entry typechecks we are doomed
return fmt.Errorf("no output entry input-checks with the next function")
}
}
// if the outputs are more than the needed input, we do nothing
return nil
}

func (p *PassNode) GetNext() []DagNodeId {
//TODO implement me
panic("implement me")
return []DagNodeId{p.OutputTo}
}

func (p *PassNode) Width() int {
//TODO implement me
panic("implement me")
return 1
}

func (p *PassNode) Name() string {
return "Pass"
}

func (p *PassNode) String() string {
return "[ Pass ]"
}

func (p *PassNode) setBranchId(number int) {
//TODO implement me
panic("implement me")
p.BranchId = number
}

func (p *PassNode) GetBranchId() int {
//TODO implement me
panic("implement me")
return p.BranchId
}

func (p *PassNode) GetId() DagNodeId {
return p.Id
}

func (p *PassNode) GetNodeType() DagNodeType {
//TODO implement me
panic("implement me")
return p.NodeType
}
Loading

0 comments on commit 311c28a

Please sign in to comment.