Skip to content

Commit

Permalink
OCTRL-902 [core] Set run timestamps before executing triggers with we…
Browse files Browse the repository at this point in the history
…ight 0
  • Loading branch information
knopers8 committed Jul 9, 2024
1 parent 0d25040 commit 29cef50
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 70 deletions.
80 changes: 71 additions & 9 deletions core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Message: "transition step starting",
})

// first, we execute hooks which should be executed before an event officially starts
errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger)
if errHooks != nil {
e.Cancel(errHooks)
the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.GetCurrentRunNumber(),
Error: errHooks.Error(),
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step finished",
})
return
}

// If the event is START_ACTIVITY, we set up and update variables relevant to plugins early on.
// This used to be done inside the transition_startactivity, but then the new RN isn't available to the
// before_START_ACTIVITY hooks. By setting it up here, we ensure the run number is available especially
Expand Down Expand Up @@ -284,7 +300,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
)
}

errHooks := env.handleHooks(env.Workflow(), trigger)
errHooks = env.handleHooksWithPositiveWeights(env.Workflow(), trigger)
if errHooks != nil {
e.Cancel(errHooks)
}
Expand Down Expand Up @@ -316,6 +332,8 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Message: "transition step starting",
})

errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger)
// fixme: in principle we should not need it anymore, since both STOP_ACTIVITY and GO_ERROR set EOR
// We might leave RUNNING not only through STOP_ACTIVITY. In such cases we also need a run stop time.
if e.Src == "RUNNING" {
endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms")
Expand All @@ -327,7 +345,21 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Debug("O2 End time already set before leave_RUNNING")
}
}
errHooks := env.handleHooks(env.Workflow(), trigger)
if errHooks != nil {
e.Cancel(errHooks)
the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{
EnvironmentId: env.id.String(),
State: env.Sm.Current(),
RunNumber: env.GetCurrentRunNumber(),
Error: errHooks.Error(),
Transition: e.Event,
TransitionStep: trigger,
Message: "transition step finished",
})
return
}

errHooks = env.handleHooksWithPositiveWeights(env.Workflow(), trigger)
if errHooks != nil {
e.Cancel(errHooks)
}
Expand Down Expand Up @@ -375,7 +407,6 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Transition: e.Event,
TransitionStep: fmt.Sprintf("tasks_%s", e.Event),
})

},
"enter_state": func(_ context.Context, e *fsm.Event) {
trigger := fmt.Sprintf("enter_%s", e.Dst)
Expand All @@ -389,10 +420,12 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Message: "transition step starting",
})

errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger)

enterStateTimeMs = strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("enter_state_time_ms", enterStateTimeMs)

errHooks := env.handleHooks(env.Workflow(), trigger)
errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(env.Workflow(), trigger))
if errHooks != nil {
// at enter_<state> it will not cancel the transition but only set the error
e.Cancel(errHooks)
Expand Down Expand Up @@ -442,7 +475,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
Message: "transition step starting",
})

errHooks := env.handleHooks(env.Workflow(), trigger)
errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger)
if errHooks != nil {
// at after_<event> it will not cancel the transition but only set the error
e.Cancel(errHooks)
Expand Down Expand Up @@ -550,6 +583,11 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
}
}

errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(env.Workflow(), trigger))
if errHooks != nil {
e.Cancel(errHooks)
}

errorMsg := ""
if e.Err != nil {
errorMsg = e.Err.Error()
Expand All @@ -570,9 +608,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment,
return
}

func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err error) {
log.WithField("partition", env.id).Debugf("begin handling hooks for trigger %s", trigger)
defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id))
func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weightPredicate func(callable.HookWeight) bool) (err error) {

// Starting point: get all hooks to be started for the current trigger
hooksMapForTrigger := workflow.GetHooksMapForTrigger(trigger)
Expand All @@ -587,13 +623,20 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err
}
allWeights := allWeightsSet.GetWeights()

filteredWeights := make([]callable.HookWeight, 0)
for _, weight := range allWeights {
if weightPredicate(weight) {
filteredWeights = append(filteredWeights, weight)
}
}

// Prepare structures to accumulate errors
allErrors := make(map[callable.Hook]error)
criticalFailures := make([]error, 0)

// FOR EACH weight within the current state machine trigger moment
// 4 phases: start calls, await calls, execute task hooks, error handling
for _, weight := range allWeights {
for _, weight := range filteredWeights {
hooksForWeight, thereAreHooksToStartForTheCurrentTriggerAndWeight := hooksMapForTrigger[weight]

// PHASE 1: start asynchronously any call hooks and add them to the pending await map
Expand Down Expand Up @@ -715,6 +758,25 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string) (err
return nil
}

func (env *Environment) handleAllHooks(workflow workflow.Role, trigger string) (err error) {
log.WithField("partition", env.id).Debugf("begin handling hooks for trigger %s", trigger)
defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id))
return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return true })
}

func (env *Environment) handleHooksWithNegativeWeights(workflow workflow.Role, trigger string) (err error) {
log.WithField("partition", env.id).Debugf("begin handling hooks with negative weights for trigger %s", trigger)
defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks with negative weights for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id))
return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return w < 0 })
}

// "positive" include 0
func (env *Environment) handleHooksWithPositiveWeights(workflow workflow.Role, trigger string) (err error) {
log.WithField("partition", env.id).Debugf("begin handling hooks with positive weights for trigger %s", trigger)
defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks with positive weights for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id))
return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return w >= 0 })
}

// runTasksAsHooks returns a map of failed hook tasks and their respective error values.
// The returned map includes both critical and non-critical failures, and it's up to the caller
// to further filter as needed.
Expand Down
87 changes: 43 additions & 44 deletions core/environment/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,15 @@ var _ = Describe("calling hooks on FSM events", func() {
})

Context("activity-related timestamps", func() {
It("should set run_start_time_ms before before_START_ACTIVITY hooks", func() {
It("should set run_start_time_ms just after before_START_ACTIVITY<0 hooks are executed", func() {
env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{
workflow.NewCallRole(
"call",
"call1",
task.Traits{Trigger: "before_START_ACTIVITY-1", Timeout: "5s", Critical: true, Await: "before_START_ACTIVITY-1"},
"testplugin.TimestampObserver()",
""),
workflow.NewCallRole(
"call2",
task.Traits{Trigger: "before_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_START_ACTIVITY"},
"testplugin.TimestampObserver()",
"")})
Expand All @@ -231,44 +236,45 @@ var _ = Describe("calling hooks on FSM events", func() {
err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false))
Expect(err).NotTo(HaveOccurred())

v, ok := env.workflow.GetUserVars().Get("seen_run_start_time_ms")
_, ok := env.workflow.GetUserVars().Get("root.call1_saw_run_start_time_ms")
Expect(ok).To(BeFalse())
_, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_start_time_ms")
Expect(ok).To(BeTrue())
Expect(v).To(Equal("true"))
_, ok = env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms")
_, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_start_completion_time_ms")
Expect(ok).To(BeFalse())
})
It("should set run_start_completion_time_ms after after_START_ACTIVITY hooks", func() {
It("should set run_start_completion_time_ms just after after_START_ACTIVITY<0 hooks are executed", func() {
env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{
workflow.NewCallRole(
"call1",
task.Traits{Trigger: "after_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_START_ACTIVITY"},
task.Traits{Trigger: "after_START_ACTIVITY-1", Timeout: "5s", Critical: true, Await: "after_START_ACTIVITY-1"},
"testplugin.TimestampObserver()",
""),
workflow.NewCallRole(
"call2",
task.Traits{Trigger: "before_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY"},
task.Traits{Trigger: "after_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_START_ACTIVITY"},
"testplugin.TimestampObserver()",
"")})
workflow.LinkChildrenToParents(env.workflow)
env.Sm.SetState("CONFIGURED")

err := env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false))

Expect(err).NotTo(HaveOccurred())
_, ok := env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms")
Expect(ok).To(BeFalse())

err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false))

Expect(err).NotTo(HaveOccurred())
v, ok := env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms")
_, ok := env.workflow.GetUserVars().Get("root.call1_saw_run_start_completion_time_ms")
Expect(ok).To(BeFalse())
_, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_start_completion_time_ms")
Expect(ok).To(BeTrue())
Expect(v).To(Equal("true"))
})
It("should set run_end_time_ms before before_STOP_ACTIVITY hooks", func() {
It("should set run_end_time_ms just after before_STOP_ACTIVITY<0 hooks are executed", func() {
env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{
workflow.NewCallRole(
"call1",
task.Traits{Trigger: "before_STOP_ACTIVITY-1", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY-1"},
"testplugin.TimestampObserver()",
""),
workflow.NewCallRole(
"call2",
task.Traits{Trigger: "before_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY"},
"testplugin.TimestampObserver()",
"")})
Expand All @@ -280,22 +286,23 @@ var _ = Describe("calling hooks on FSM events", func() {
err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false))
Expect(err).NotTo(HaveOccurred())

v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms")
_, ok := env.workflow.GetUserVars().Get("root.call1_saw_run_end_time_ms")
Expect(ok).To(BeFalse())
_, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_end_time_ms")
Expect(ok).To(BeTrue())
Expect(v).To(Equal("true"))
_, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms")
_, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_end_completion_time_ms")
Expect(ok).To(BeFalse())
})
It("should set run_end_completion_time_ms after after_STOP_ACTIVITY hooks", func() {
It("should set run_end_completion_time_ms just before after_STOP_ACTIVITY<0 hooks are executed", func() {
env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{
workflow.NewCallRole(
"call1",
task.Traits{Trigger: "after_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_STOP_ACTIVITY"},
task.Traits{Trigger: "after_STOP_ACTIVITY-1", Timeout: "5s", Critical: true, Await: "after_STOP_ACTIVITY-1"},
"testplugin.TimestampObserver()",
""),
workflow.NewCallRole(
"call2",
task.Traits{Trigger: "before_RESET", Timeout: "5s", Critical: true, Await: "before_RESET"},
task.Traits{Trigger: "after_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_STOP_ACTIVITY"},
"testplugin.TimestampObserver()",
"")})
workflow.LinkChildrenToParents(env.workflow)
Expand All @@ -306,18 +313,10 @@ var _ = Describe("calling hooks on FSM events", func() {
err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false))
Expect(err).NotTo(HaveOccurred())

v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms")
Expect(ok).To(BeTrue())
Expect(v).To(Equal("true"))
_, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms")
_, ok := env.workflow.GetUserVars().Get("root.call1_saw_run_end_completion_time_ms")
Expect(ok).To(BeFalse())

err = env.Sm.Event(context.Background(), "RESET", NewDummyTransition("RESET", false))
Expect(err).NotTo(HaveOccurred())

v, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms")
_, ok = env.workflow.GetUserVars().Get("root.call2_saw_run_end_completion_time_ms")
Expect(ok).To(BeTrue())
Expect(v).To(Equal("true"))
})
It("should clear timestamps from previous runs and set run_start_time_ms again before before_START_ACTIVITY hooks", func() {
env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{
Expand All @@ -334,21 +333,21 @@ var _ = Describe("calling hooks on FSM events", func() {
err = env.Sm.Event(context.Background(), "STOP_ACTIVITY", NewDummyTransition("STOP_ACTIVITY", false))
Expect(err).NotTo(HaveOccurred())

env.workflow.GetUserVars().Del("seen_run_start_time_ms")
env.workflow.GetUserVars().Del("seen_run_start_completion_time_ms")
env.workflow.GetUserVars().Del("seen_run_end_time_ms")
env.workflow.GetUserVars().Del("seen_run_end_completion_time_ms")
env.workflow.GetUserVars().Del("root.call_saw_run_start_time_ms")
env.workflow.GetUserVars().Del("root.call_saw_run_start_completion_time_ms")
env.workflow.GetUserVars().Del("root.call_saw_run_end_time_ms")
env.workflow.GetUserVars().Del("root.call_saw_run_end_completion_time_ms")
err = env.Sm.Event(context.Background(), "START_ACTIVITY", NewDummyTransition("START_ACTIVITY", false))
Expect(err).NotTo(HaveOccurred())

v, ok := env.workflow.GetUserVars().Get("seen_run_start_time_ms")
v, ok := env.workflow.GetUserVars().Get("root.call_saw_run_start_time_ms")
Expect(ok).To(BeTrue())
Expect(v).To(Equal("true"))
_, ok = env.workflow.GetUserVars().Get("seen_run_start_completion_time_ms")
_, ok = env.workflow.GetUserVars().Get("root.call_saw_run_start_completion_time_ms")
Expect(ok).To(BeFalse())
_, ok = env.workflow.GetUserVars().Get("seen_run_end_time_ms")
_, ok = env.workflow.GetUserVars().Get("root.call_saw_run_end_time_ms")
Expect(ok).To(BeFalse())
_, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms")
_, ok = env.workflow.GetUserVars().Get("root.call_saw_run_end_completion_time_ms")
Expect(ok).To(BeFalse())
})
When("START_ACTIVITY transition fails", func() {
Expand Down Expand Up @@ -409,7 +408,7 @@ var _ = Describe("calling hooks on FSM events", func() {
It("should set both run end timestamps", func() {
env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{
workflow.NewCallRole(
"call1",
"call",
task.Traits{Trigger: "leave_RUNNING", Timeout: "5s", Critical: true, Await: "leave_RUNNING"},
"testplugin.TimestampObserver()",
"")})
Expand All @@ -421,10 +420,10 @@ var _ = Describe("calling hooks on FSM events", func() {
err = env.Sm.Event(context.Background(), "GO_ERROR", NewDummyTransition("GO_ERROR", false))
Expect(err).NotTo(HaveOccurred())

v, ok := env.workflow.GetUserVars().Get("seen_run_end_time_ms")
v, ok := env.workflow.GetUserVars().Get("root.call_saw_run_end_time_ms")
Expect(ok).To(BeTrue())
Expect(v).To(Equal("true"))
_, ok = env.workflow.GetUserVars().Get("seen_run_end_completion_time_ms")
_, ok = env.workflow.GetUserVars().Get("root.call_saw_run_end_completion_time_ms")
Expect(ok).To(BeFalse())
v, ok = env.workflow.GetUserVars().Get("run_end_completion_time_ms")
Expect(ok).To(BeTrue())
Expand Down
2 changes: 1 addition & 1 deletion core/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error
Message: "workflow teardown ongoing",
})

err = env.handleHooks(env.Workflow(), "leave_"+env.CurrentState())
err = env.handleAllHooks(env.Workflow(), "leave_"+env.CurrentState())
if err != nil {
log.WithFields(logrus.Fields{
"partition": environmentId.String(),
Expand Down
2 changes: 1 addition & 1 deletion core/integration/testplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
for _, key := range []string{"run_start_time_ms", "run_start_completion_time_ms", "run_end_time_ms", "run_end_completion_time_ms"} {
value, ok := varStack[key]
if ok && len(value) > 0 && value != "0" {
parentRole.SetGlobalRuntimeVar("seen_"+key, "true")
parentRole.SetGlobalRuntimeVar(rolePath+"_saw_"+key, "true")
}
}
return
Expand Down
Loading

0 comments on commit 29cef50

Please sign in to comment.