diff --git a/action.go b/action.go index 554682a..37900e9 100644 --- a/action.go +++ b/action.go @@ -181,7 +181,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha var flowURI string var preserveInstanceId, originalInstanceId string var initStepId int - var rerun bool + var rerun, detachExecution bool runOptions, exists := inputs["_run_options"] var execOptions *instance.ExecOptions @@ -198,6 +198,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha initStepId = ro.InitStepId rerun = ro.Rerun originalInstanceId = ro.OriginalInstanceId + detachExecution = ro.DetachExecution } } @@ -298,6 +299,7 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha logger.Debugf("Applying Exec Options to instance: %s", inst.ID()) instance.ApplyExecOptions(inst, execOptions) } + //Update flow starting time inst.UpdateStartTime() if stateRecorder != nil { @@ -306,12 +308,16 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha stateRecorder.RecordStart(flowState) } + eventID := trigger.GetHandlerEventIdFromContext(ctx) + if eventID != "" { + // Add eventId to the instance + _ = inst.SetValue(instance.EventIdAttr, eventID) + } if trace.Enabled() { tc, err := trace.GetTracer().StartTrace(inst.SpanConfig(), trace.ExtractTracingContext(ctx)) if err != nil { return err } - eventID := trigger.GetHandlerEventIdFromContext(ctx) if eventID != "" { tc.SetTag("flogo_event_id", eventID) } @@ -348,8 +354,12 @@ func (fa *FlowAction) Run(ctx context.Context, inputs map[string]interface{}, ha } go func() { - - defer handler.Done() + if detachExecution { + // In detached mode, no reply expected. So, notifying handler. + handler.Done() + } else { + defer handler.Done() + } if retID { diff --git a/activity/subflow/activity.go b/activity/subflow/activity.go index 9bf334c..5c16b2c 100755 --- a/activity/subflow/activity.go +++ b/activity/subflow/activity.go @@ -14,7 +14,8 @@ func init() { } type Settings struct { - FlowURI string `md:"flowURI,required"` + FlowURI string `md:"flowURI,required"` + DetachedInvocation bool `md:"detached"` } var activityMd = activity.ToMetadata(&Settings{}) @@ -35,7 +36,7 @@ func New(ctx activity.InitContext) (activity.Activity, error) { //} activityMd := activity.ToMetadata(&Settings{}) - act := &SubFlowActivity{flowURI: s.FlowURI, activityMd: activityMd} + act := &SubFlowActivity{flowURI: s.FlowURI, activityMd: activityMd, detachedInvocation: s.DetachedInvocation} ctx.Logger().Debugf("flowURI: %+v", s.FlowURI) @@ -48,8 +49,9 @@ func New(ctx activity.InitContext) (activity.Activity, error) { // input : {sub-flow's input} // output: {sub-flow's output} type SubFlowActivity struct { - activityMd *activity.Metadata - flowURI string + activityMd *activity.Metadata + flowURI string + detachedInvocation bool mutex sync.Mutex mdUpdated uint32 @@ -96,7 +98,12 @@ func (a *SubFlowActivity) Eval(ctx activity.Context) (done bool, err error) { } } - err = instance.StartSubFlow(ctx, a.flowURI, input) + if a.detachedInvocation { + ctx.Logger().Infof("Starting SubFlow '%s' in detached mode", a.flowURI) + err = instance.StartDetachedSubFlow(ctx, a.flowURI, input) + } else { + err = instance.StartSubFlow(ctx, a.flowURI, input) + } - return false, nil + return a.detachedInvocation, err } diff --git a/activity/subflow/descriptor.json b/activity/subflow/descriptor.json index 6cd5519..be7ff8e 100755 --- a/activity/subflow/descriptor.json +++ b/activity/subflow/descriptor.json @@ -2,7 +2,7 @@ "name": "flogo-subflow", "type": "flogo:activity", "ref": "github.com/project-flogo/flow/activity/subflow", - "version": "0.9.0", + "version": "0.9.1", "title": "Start a SubFlow", "description": "Activity to start a sub-flow in an existing flow", "homepage": "https://github.com/project-flogo/flow/tree/master/activity/subflow", @@ -11,6 +11,12 @@ "name": "flowURI", "type": "string", "required": true + }, + { + "name": "detached", + "type": "boolean", + "required": false, + "value": false } ] } diff --git a/instance/exec.go b/instance/exec.go index c9733d0..2263878 100644 --- a/instance/exec.go +++ b/instance/exec.go @@ -21,6 +21,7 @@ type RunOptions struct { ExecOptions *ExecOptions Rerun bool OriginalInstanceId string + DetachExecution bool } // ExecOptions are optional Patch & Interceptor to be used during instance execution diff --git a/instance/util.go b/instance/util.go index ded7cf7..2c39268 100644 --- a/instance/util.go +++ b/instance/util.go @@ -1,18 +1,25 @@ package instance import ( + "context" "errors" "fmt" + "strconv" + + "github.com/project-flogo/core/action" "github.com/project-flogo/core/activity" "github.com/project-flogo/core/data" "github.com/project-flogo/core/data/coerce" "github.com/project-flogo/core/data/expression" "github.com/project-flogo/core/data/metadata" + "github.com/project-flogo/core/engine/runner" + "github.com/project-flogo/core/trigger" "github.com/project-flogo/flow/definition" "github.com/project-flogo/flow/support" - "strconv" ) +const EventIdAttr = "event.id" + func applySettingsMapper(taskInst *TaskInst) error { // get the input mapper @@ -344,3 +351,31 @@ func StartSubFlow(ctx activity.Context, flowURI string, inputs map[string]interf return nil } + +func StartDetachedSubFlow(ctx activity.Context, flowURI string, inputs map[string]interface{}) error { + taskInst, ok := ctx.(*TaskInst) + + if !ok { + return errors.New("unable to create subFlow using this context") + } + f := action.GetFactory("github.com/project-flogo/flow") + flowAction, err := f.New(&action.Config{Settings: map[string]interface{}{"flowURI": flowURI}}) + if err != nil { + return err + } + + ro := &RunOptions{} + ro.Op = OpStart + ro.DetachExecution = true + inputs["_run_options"] = ro + eventId, _ := taskInst.flowInst.GetValue(EventIdAttr) + gCtx := context.Background() + if eventId != "" { + gCtx = trigger.NewContextWithEventId(gCtx, eventId.(string)) + } + _, err = runner.NewDirect().RunAction(gCtx, flowAction, inputs) + if err != nil { + return err + } + return nil +}