diff --git a/core/environment/environment.go b/core/environment/environment.go index 6c53da7a..dcfd85dd 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -172,6 +172,22 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Message: "transition step starting", }) + // first, we execute hooks which should be executed before an event officially starts + errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger) + if errHooks != nil { + e.Cancel(errHooks) + the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ + EnvironmentId: env.id.String(), + State: env.Sm.Current(), + RunNumber: env.GetCurrentRunNumber(), + Error: errHooks.Error(), + Transition: e.Event, + TransitionStep: trigger, + Message: "transition step finished", + }) + return + } + // If the event is START_ACTIVITY, we set up and update variables relevant to plugins early on. // This used to be done inside the transition_startactivity, but then the new RN isn't available to the // before_START_ACTIVITY hooks. By setting it up here, we ensure the run number is available especially @@ -284,7 +300,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, ) } - errHooks := env.handleHooks(env.Workflow(), trigger) + errHooks = env.handleHooksWithPositiveWeights(env.Workflow(), trigger) if errHooks != nil { e.Cancel(errHooks) } @@ -316,6 +332,8 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Message: "transition step starting", }) + errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger) + // fixme: in principle we should not need it anymore, since both STOP_ACTIVITY and GO_ERROR set EOR // We might leave RUNNING not only through STOP_ACTIVITY. In such cases we also need a run stop time. if e.Src == "RUNNING" { endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms") @@ -327,7 +345,21 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Debug("O2 End time already set before leave_RUNNING") } } - errHooks := env.handleHooks(env.Workflow(), trigger) + if errHooks != nil { + e.Cancel(errHooks) + the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ + EnvironmentId: env.id.String(), + State: env.Sm.Current(), + RunNumber: env.GetCurrentRunNumber(), + Error: errHooks.Error(), + Transition: e.Event, + TransitionStep: trigger, + Message: "transition step finished", + }) + return + } + + errHooks = env.handleHooksWithPositiveWeights(env.Workflow(), trigger) if errHooks != nil { e.Cancel(errHooks) } @@ -375,7 +407,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Transition: e.Event, TransitionStep: fmt.Sprintf("tasks_%s", e.Event), }) - }, "enter_state": func(_ context.Context, e *fsm.Event) { trigger := fmt.Sprintf("enter_%s", e.Dst) @@ -389,10 +420,12 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Message: "transition step starting", }) + errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger) + enterStateTimeMs = strconv.FormatInt(time.Now().UnixMilli(), 10) env.workflow.SetRuntimeVar("enter_state_time_ms", enterStateTimeMs) - errHooks := env.handleHooks(env.Workflow(), trigger) + errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(env.Workflow(), trigger)) if errHooks != nil { // at enter_ it will not cancel the transition but only set the error e.Cancel(errHooks) @@ -442,7 +475,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, Message: "transition step starting", }) - errHooks := env.handleHooks(env.Workflow(), trigger) + errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger) if errHooks != nil { // at after_ it will not cancel the transition but only set the error e.Cancel(errHooks) @@ -550,6 +583,11 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, } } + errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(env.Workflow(), trigger)) + if errHooks != nil { + e.Cancel(errHooks) + } + errorMsg := "" if e.Err != nil { errorMsg = e.Err.Error() @@ -570,9 +608,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, return } -func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err error) { - log.WithField("partition", env.id).Debugf("begin handling hooks for trigger %s", trigger) - defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id)) +func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weightPredicate func(callable.HookWeight) bool) (err error) { // Starting point: get all hooks to be started for the current trigger hooksMapForTrigger := workflow.GetHooksMapForTrigger(trigger) @@ -587,13 +623,20 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err } allWeights := allWeightsSet.GetWeights() + filteredWeights := make([]callable.HookWeight, 0) + for _, weight := range allWeights { + if weightPredicate(weight) { + filteredWeights = append(filteredWeights, weight) + } + } + // Prepare structures to accumulate errors allErrors := make(map[callable.Hook]error) criticalFailures := make([]error, 0) // FOR EACH weight within the current state machine trigger moment // 4 phases: start calls, await calls, execute task hooks, error handling - for _, weight := range allWeights { + for _, weight := range filteredWeights { hooksForWeight, thereAreHooksToStartForTheCurrentTriggerAndWeight := hooksMapForTrigger[weight] // PHASE 1: start asynchronously any call hooks and add them to the pending await map @@ -715,6 +758,25 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err return nil } +func (env *Environment) handleAllHooks(workflow workflow.Role, trigger string) (err error) { + log.WithField("partition", env.id).Debugf("begin handling hooks for trigger %s", trigger) + defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id)) + return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return true }) +} + +func (env *Environment) handleHooksWithNegativeWeights(workflow workflow.Role, trigger string) (err error) { + log.WithField("partition", env.id).Debugf("begin handling hooks with negative weights for trigger %s", trigger) + defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks with negative weights for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id)) + return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return w < 0 }) +} + +// "positive" include 0 +func (env *Environment) handleHooksWithPositiveWeights(workflow workflow.Role, trigger string) (err error) { + log.WithField("partition", env.id).Debugf("begin handling hooks with positive weights for trigger %s", trigger) + defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks with positive weights for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id)) + return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return w >= 0 }) +} + // runTasksAsHooks returns a map of failed hook tasks and their respective error values. // The returned map includes both critical and non-critical failures, and it's up to the caller // to further filter as needed. diff --git a/core/environment/hooks_test.go b/core/environment/hooks_test.go index 14077223..5fab31e8 100644 --- a/core/environment/hooks_test.go +++ b/core/environment/hooks_test.go @@ -218,10 +218,15 @@ var _ = Describe("calling hooks on FSM events", func() { }) Context("activity-related timestamps", func() { - It("should set run_start_time_ms before before_START_ACTIVITY hooks", func() { + It("should set run_start_time_ms just after before_START_ACTIVITY<0 hooks are executed", func() { env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ workflow.NewCallRole( - "call", + "call1", + task.Traits{Trigger: "before_START_ACTIVITY-1", Timeout: "5s", Critical: true, Await: "before_START_ACTIVITY-1"}, + "testplugin.TimestampObserver()", + ""), + workflow.NewCallRole( + "call2", task.Traits{Trigger: "before_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_START_ACTIVITY"}, "testplugin.TimestampObserver()", "")}) @@ -231,44 +236,45 @@ var _ = Describe("calling hooks on FSM events", func() { err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) Expect(err).NotTo(HaveOccurred()) - v, ok := env.workflow.GetUserVars().Get("seen_run_start_time_ms") + _, ok := env.workflow.GetUserVars().Get("root.call1_saw_run_start_time_ms") + Expect(ok).To(BeFalse()) + _, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_start_time_ms") Expect(ok).To(BeTrue()) - Expect(v).To(Equal("true")) - _, ok = env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms") + _, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_start_completion_time_ms") Expect(ok).To(BeFalse()) }) - It("should set run_start_completion_time_ms after after_START_ACTIVITY hooks", func() { + It("should set run_start_completion_time_ms just after after_START_ACTIVITY<0 hooks are executed", func() { env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ workflow.NewCallRole( "call1", - task.Traits{Trigger: "after_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_START_ACTIVITY"}, + task.Traits{Trigger: "after_START_ACTIVITY-1", Timeout: "5s", Critical: true, Await: "after_START_ACTIVITY-1"}, "testplugin.TimestampObserver()", ""), workflow.NewCallRole( "call2", - task.Traits{Trigger: "before_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY"}, + task.Traits{Trigger: "after_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_START_ACTIVITY"}, "testplugin.TimestampObserver()", "")}) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("CONFIGURED") err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) - Expect(err).NotTo(HaveOccurred()) - _, ok := env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms") - Expect(ok).To(BeFalse()) - - err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false)) - Expect(err).NotTo(HaveOccurred()) - v, ok := env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms") + _, ok := env.workflow.GetUserVars().Get("root.call1_saw_run_start_completion_time_ms") + Expect(ok).To(BeFalse()) + _, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_start_completion_time_ms") Expect(ok).To(BeTrue()) - Expect(v).To(Equal("true")) }) - It("should set run_end_time_ms before before_STOP_ACTIVITY hooks", func() { + It("should set run_end_time_ms just after before_STOP_ACTIVITY<0 hooks are executed", func() { env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ workflow.NewCallRole( "call1", + task.Traits{Trigger: "before_STOP_ACTIVITY-1", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY-1"}, + "testplugin.TimestampObserver()", + ""), + workflow.NewCallRole( + "call2", task.Traits{Trigger: "before_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY"}, "testplugin.TimestampObserver()", "")}) @@ -280,22 +286,23 @@ var _ = Describe("calling hooks on FSM events", func() { err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false)) Expect(err).NotTo(HaveOccurred()) - v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms") + _, ok := env.workflow.GetUserVars().Get("root.call1_saw_run_end_time_ms") + Expect(ok).To(BeFalse()) + _, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_end_time_ms") Expect(ok).To(BeTrue()) - Expect(v).To(Equal("true")) - _, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + _, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_end_completion_time_ms") Expect(ok).To(BeFalse()) }) - It("should set run_end_completion_time_ms after after_STOP_ACTIVITY hooks", func() { + It("should set run_end_completion_time_ms just before after_STOP_ACTIVITY<0 hooks are executed", func() { env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ workflow.NewCallRole( "call1", - task.Traits{Trigger: "after_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_STOP_ACTIVITY"}, + task.Traits{Trigger: "after_STOP_ACTIVITY-1", Timeout: "5s", Critical: true, Await: "after_STOP_ACTIVITY-1"}, "testplugin.TimestampObserver()", ""), workflow.NewCallRole( "call2", - task.Traits{Trigger: "before_RESET", Timeout: "5s", Critical: true, Await: "before_RESET"}, + task.Traits{Trigger: "after_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_STOP_ACTIVITY"}, "testplugin.TimestampObserver()", "")}) workflow.LinkChildrenToParents(env.workflow) @@ -306,18 +313,10 @@ var _ = Describe("calling hooks on FSM events", func() { err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false)) Expect(err).NotTo(HaveOccurred()) - v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms") - Expect(ok).To(BeTrue()) - Expect(v).To(Equal("true")) - _, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + _, ok := env.workflow.GetUserVars().Get("root.call1_saw_run_end_completion_time_ms") Expect(ok).To(BeFalse()) - - err = env.Sm.Event(context.Background(), "RESET", NewDummyTransition("RESET", false)) - Expect(err).NotTo(HaveOccurred()) - - v, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + _, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_end_completion_time_ms") Expect(ok).To(BeTrue()) - Expect(v).To(Equal("true")) }) It("should clear timestamps from previous runs and set run_start_time_ms again before before_START_ACTIVITY hooks", func() { env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ @@ -334,21 +333,21 @@ var _ = Describe("calling hooks on FSM events", func() { err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false)) Expect(err).NotTo(HaveOccurred()) - env.workflow.GetUserVars().Del("seen_run_start_time_ms") - env.workflow.GetUserVars().Del("seen_run_start_completion_time_ms") - env.workflow.GetUserVars().Del("seen_run_end_time_ms") - env.workflow.GetUserVars().Del("seen_run_end_completion_time_ms") + env.workflow.GetUserVars().Del("root.call_saw_run_start_time_ms") + env.workflow.GetUserVars().Del("root.call_saw_run_start_completion_time_ms") + env.workflow.GetUserVars().Del("root.call_saw_run_end_time_ms") + env.workflow.GetUserVars().Del("root.call_saw_run_end_completion_time_ms") err = env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false)) Expect(err).NotTo(HaveOccurred()) - v, ok := env.workflow.GetUserVars().Get("seen_run_start_time_ms") + v, ok := env.workflow.GetUserVars().Get("root.call_saw_run_start_time_ms") Expect(ok).To(BeTrue()) Expect(v).To(Equal("true")) - _, ok = env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms") + _, ok = env.workflow.GetUserVars().Get("root.call_saw_run_start_completion_time_ms") Expect(ok).To(BeFalse()) - _, ok = env.workflow.GetUserVars().Get("seen_run_end_time_ms") + _, ok = env.workflow.GetUserVars().Get("root.call_saw_run_end_time_ms") Expect(ok).To(BeFalse()) - _, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + _, ok = env.workflow.GetUserVars().Get("root.call_saw_run_end_completion_time_ms") Expect(ok).To(BeFalse()) }) When("START_ACTIVITY transition fails", func() { @@ -409,7 +408,7 @@ var _ = Describe("calling hooks on FSM events", func() { It("should set both run end timestamps", func() { env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ workflow.NewCallRole( - "call1", + "call", task.Traits{Trigger: "leave_RUNNING", Timeout: "5s", Critical: true, Await: "leave_RUNNING"}, "testplugin.TimestampObserver()", "")}) @@ -421,10 +420,10 @@ var _ = Describe("calling hooks on FSM events", func() { err = env.Sm.Event(context.Background(), "GO_ERROR", NewDummyTransition("GO_ERROR", false)) Expect(err).NotTo(HaveOccurred()) - v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms") + v, ok := env.workflow.GetUserVars().Get("root.call_saw_run_end_time_ms") Expect(ok).To(BeTrue()) Expect(v).To(Equal("true")) - _, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms") + _, ok = env.workflow.GetUserVars().Get("root.call_saw_run_end_completion_time_ms") Expect(ok).To(BeFalse()) v, ok = env.workflow.GetUserVars().Get("run_end_completion_time_ms") Expect(ok).To(BeTrue()) diff --git a/core/environment/manager.go b/core/environment/manager.go index b149deb8..062e5ec0 100644 --- a/core/environment/manager.go +++ b/core/environment/manager.go @@ -595,7 +595,7 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error Message: "workflow teardown ongoing", }) - err = env.handleHooks(env.Workflow(), "leave_"+env.CurrentState()) + err = env.handleAllHooks(env.Workflow(), "leave_"+env.CurrentState()) if err != nil { log.WithFields(logrus.Fields{ "partition": environmentId.String(), diff --git a/core/integration/testplugin/plugin.go b/core/integration/testplugin/plugin.go index bad1b69b..19d96e81 100644 --- a/core/integration/testplugin/plugin.go +++ b/core/integration/testplugin/plugin.go @@ -215,7 +215,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { for _, key := range []string{"run_start_time_ms", "run_start_completion_time_ms", "run_end_time_ms", "run_end_completion_time_ms"} { value, ok := varStack[key] if ok && len(value) > 0 && value != "0" { - parentRole.SetGlobalRuntimeVar("seen_"+key, "true") + parentRole.SetGlobalRuntimeVar(rolePath+"_saw_"+key, "true") } } return diff --git a/docs/handbook/operation_order.md b/docs/handbook/operation_order.md index 77b2bc5d..296e702a 100644 --- a/docs/handbook/operation_order.md +++ b/docs/handbook/operation_order.md @@ -12,18 +12,19 @@ This is the order of actions happening at a healthy start of run. ### before_START_ACTIVITY +- `before_START_ACTIVITY` hooks with negative weights are executed: + - `trg.PrepareForRun()` at `-200` - `"run_number"` is set. - `"run_start_time_ms"` is set using the current time. It is considered as the SOR and SOSOR timestamps. -- `before_START_ACTIVITY` hooks are executed: - - `trg.PrepareForRun()` at `-200` - - `trg.RunLoad()`, `bookkeeping.StartOfRun()` at `-100` - - `bookkeeping.RetrieveFillInfo()` at `-99` - - `kafka.PublishStartActivityUpdate()` at `-50` - - `dcs.StartOfRun()`, `odc.Start()` (does not need to return now), `ccdb.RunStart()` at `0` +- `before_START_ACTIVITY` hooks with positive weights (incl. 0) are executed: + - `trg.RunLoad()`, `bookkeeping.StartOfRun()` at `10` + - `bookkeeping.RetrieveFillInfo()` at `11` + - `kafka.PublishStartActivityUpdate()` at `50` + - `dcs.StartOfRun()`, `odc.Start()` (does not need to return now), `ccdb.RunStart()` at `100` ### leave_CONFIGURED -- `leave_CONFIGURED` hooks are executed +- `leave_CONFIGURED` hooks are executed: - `kafka.PublishLeaveStateUpdate()` at `0` ### Transition START_ACTIVITY @@ -38,10 +39,12 @@ This is the order of actions happening at a healthy start of run. - `o2-roc-ctp-emulator` for all ROC CTP emulator endpoints, `kafka.PublishEnterStateUpdate()` at `0` ### after_START_ACTIVITY + +- `after_START_ACTIVITY` hooks with negative weights are executed + - `trg.RunStart()` at `-10` + - waiting until `odc.Start()` executed at `before_START_ACTIVITY+100` completes at `-10` - `"run_start_completion_time_ms"` is set using current time. It is considered as the EOSOR timestamp. -- `after_START_ACTIVITY` hooks are executed: - - `trg.RunStart()` at `0` - - waiting until `odc.Start()` executed at `before_START_ACTIVITY` completes at `0` +- `after_START_ACTIVITY` hooks with positive weights (incl. 0) are executed: - `bookkeeping.UpdateRunStart()`, `bookkeeping.UpdateEnv()` at `+100` ## STOP_ACTIVITY (End Of Run) @@ -50,8 +53,9 @@ This is the order of actions happening at a healthy end of run. ### before_STOP_ACTIVITY +- `before_STOP_ACTIVITY` hooks with negative weights are executed - `"run_end_time_ms"` is set using the current time. It is considered as the EOR and SOEOR timestamps. -- `before_STOP_ACTIVITY` hooks are executed: +- `before_STOP_ACTIVITY` hooks with positive weights (incl. 0) are executed: - `trg.RunStop()`, `odc.Stop()` (does not need to return now) at `0` ### leave_RUNNING @@ -70,9 +74,12 @@ This is the order of actions happening at a healthy end of run. - `kafka.PublishEnterStateUpdate()` at `0` ### after_STOP_ACTIVITY -- `"run_end_completion_time_ms"` is set using current time. It is considered as the EOEOR timestamp. -- `after_STOP_ACTIVITY` hooks are executed: + +- `after_STOP_ACTIVITY` hooks with negative weights are executed: - `trg.RunUnload()` at `-100` - - `ccdb.RunStop()`, `dcs.EndOfRun()` at `0` - - waiting until `odc.Stop()` executed at `before_STOP_ACTIVITY` completes at `0` + - `dcs.EndOfRun()` at `-50` + - waiting until `odc.Stop()` executed at `before_STOP_ACTIVITY` completes at `-50` +- `"run_end_completion_time_ms"` is set using current time. It is considered as the EOEOR timestamp. +- `after_STOP_ACTIVITY` hooks with positive weights (incl. 0) are executed: + - `ccdb.RunStop()` at `0` - `bookkeeping.UpdateRunStop()`, `bookkeeping.UpdateEnv()` at `+100` \ No newline at end of file