Skip to content

Commit

Permalink
Supporting detached mode for subflow invocation (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
vnalawad-tibco authored Jan 16, 2024
1 parent e6476e0 commit 5a9e85f
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 12 deletions.
18 changes: 14 additions & 4 deletions action.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
var flowURI string
var preserveInstanceId, originalInstanceId string
var initStepId int
var rerun bool
var rerun, detachExecution bool
runOptions, exists := inputs["_run_options"]

var execOptions *instance.ExecOptions
Expand All @@ -198,6 +198,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
initStepId = ro.InitStepId
rerun = ro.Rerun
originalInstanceId = ro.OriginalInstanceId
detachExecution = ro.DetachExecution
}
}

Expand Down Expand Up @@ -298,6 +299,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
logger.Debugf("Applying Exec Options to instance: %s", inst.ID())
instance.ApplyExecOptions(inst, execOptions)
}

//Update flow starting time
inst.UpdateStartTime()
if stateRecorder != nil {
Expand All @@ -306,12 +308,16 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
stateRecorder.RecordStart(flowState)
}

eventID := trigger.GetHandlerEventIdFromContext(ctx)
if eventID != "" {
// Add eventId to the instance
_ = inst.SetValue(instance.EventIdAttr, eventID)
}
if trace.Enabled() {
tc, err := trace.GetTracer().StartTrace(inst.SpanConfig(), trace.ExtractTracingContext(ctx))
if err != nil {
return err
}
eventID := trigger.GetHandlerEventIdFromContext(ctx)
if eventID != "" {
tc.SetTag("flogo_event_id", eventID)
}
Expand Down Expand Up @@ -348,8 +354,12 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha
}

go func() {

defer handler.Done()
if detachExecution {
// In detached mode, no reply expected. So, notifying handler.
handler.Done()
} else {
defer handler.Done()
}

if retID {

Expand Down
19 changes: 13 additions & 6 deletions activity/subflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ func init() {
}

type Settings struct {
FlowURI string `md:"flowURI,required"`
FlowURI string `md:"flowURI,required"`
DetachedInvocation bool `md:"detached"`
}

var activityMd = activity.ToMetadata(&Settings{})
Expand All @@ -35,7 +36,7 @@ func New(ctx activity.InitContext) (activity.Activity, error) {
//}

activityMd := activity.ToMetadata(&Settings{})
act := &SubFlowActivity{flowURI: s.FlowURI, activityMd: activityMd}
act := &SubFlowActivity{flowURI: s.FlowURI, activityMd: activityMd, detachedInvocation: s.DetachedInvocation}

ctx.Logger().Debugf("flowURI: %+v", s.FlowURI)

Expand All @@ -48,8 +49,9 @@ func New(ctx activity.InitContext) (activity.Activity, error) {
// input : {sub-flow's input}
// output: {sub-flow's output}
type SubFlowActivity struct {
activityMd *activity.Metadata
flowURI string
activityMd *activity.Metadata
flowURI string
detachedInvocation bool

mutex sync.Mutex
mdUpdated uint32
Expand Down Expand Up @@ -96,7 +98,12 @@ func (a *SubFlowActivity) Eval(ctx activity.Context) (done bool, err error) {
}
}

err = instance.StartSubFlow(ctx, a.flowURI, input)
if a.detachedInvocation {
ctx.Logger().Infof("Starting SubFlow '%s' in detached mode", a.flowURI)
err = instance.StartDetachedSubFlow(ctx, a.flowURI, input)
} else {
err = instance.StartSubFlow(ctx, a.flowURI, input)
}

return false, nil
return a.detachedInvocation, err
}
8 changes: 7 additions & 1 deletion activity/subflow/descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "flogo-subflow",
"type": "flogo:activity",
"ref": "github.com/project-flogo/flow/activity/subflow",
"version": "0.9.0",
"version": "0.9.1",
"title": "Start a SubFlow",
"description": "Activity to start a sub-flow in an existing flow",
"homepage": "https://github.com/project-flogo/flow/tree/master/activity/subflow",
Expand All @@ -11,6 +11,12 @@
"name": "flowURI",
"type": "string",
"required": true
},
{
"name": "detached",
"type": "boolean",
"required": false,
"value": false
}
]
}
1 change: 1 addition & 0 deletions instance/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type RunOptions struct {
ExecOptions *ExecOptions
Rerun bool
OriginalInstanceId string
DetachExecution bool
}

// ExecOptions are optional Patch & Interceptor to be used during instance execution
Expand Down
37 changes: 36 additions & 1 deletion instance/util.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package instance

import (
"context"
"errors"
"fmt"
"strconv"

"github.com/project-flogo/core/action"
"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data"
"github.com/project-flogo/core/data/coerce"
"github.com/project-flogo/core/data/expression"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/engine/runner"
"github.com/project-flogo/core/trigger"
"github.com/project-flogo/flow/definition"
"github.com/project-flogo/flow/support"
"strconv"
)

const EventIdAttr = "event.id"

func applySettingsMapper(taskInst *TaskInst) error {

// get the input mapper
Expand Down Expand Up @@ -344,3 +351,31 @@ func StartSubFlow(ctx activity.Context, flowURI string, inputs map[string]interf

return nil
}

func StartDetachedSubFlow(ctx activity.Context, flowURI string, inputs map[string]interface{}) error {
taskInst, ok := ctx.(*TaskInst)

if !ok {
return errors.New("unable to create subFlow using this context")
}
f := action.GetFactory("github.com/project-flogo/flow")
flowAction, err := f.New(&action.Config{Settings: map[string]interface{}{"flowURI": flowURI}})
if err != nil {
return err
}

ro := &RunOptions{}
ro.Op = OpStart
ro.DetachExecution = true
inputs["_run_options"] = ro
eventId, _ := taskInst.flowInst.GetValue(EventIdAttr)
gCtx := context.Background()
if eventId != "" {
gCtx = trigger.NewContextWithEventId(gCtx, eventId.(string))
}
_, err = runner.NewDirect().RunAction(gCtx, flowAction, inputs)
if err != nil {
return err
}
return nil
}

0 comments on commit 5a9e85f

Please sign in to comment.