Skip to content

Commit

Permalink
[core] Adapt to current looplab/fsm package
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Jul 18, 2023
1 parent 941cdcc commit f322c58
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
11 changes: 6 additions & 5 deletions core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
package environment

import (
"context"
"errors"
"fmt"
"sort"
Expand Down Expand Up @@ -148,7 +149,7 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
{Name: "RECOVER", Src: []string{"ERROR"}, Dst: "DEPLOYED"},
},
fsm.Callbacks{
"before_event": func(e *fsm.Event) {
"before_event": func(_ context.Context, e *fsm.Event) {
// If the event is START_ACTIVITY, we set up and update variables relevant to plugins early on.
// This used to be done inside the transition_startactivity, but then the new RN isn't available to the
// before_START_ACTIVITY hooks. By setting it up here, we ensure the run number is available especially
Expand Down Expand Up @@ -229,7 +230,7 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
e.Cancel(errHooks)
}
},
"leave_state": func(e *fsm.Event) {
"leave_state": func(_ context.Context, e *fsm.Event) {
errHooks := env.handleHooks(env.Workflow(), fmt.Sprintf("leave_%s", e.Src))
if errHooks != nil {
e.Cancel(errHooks)
Expand All @@ -238,7 +239,7 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {

env.handlerFunc()(e)
},
"enter_state": func(e *fsm.Event) {
"enter_state": func(_ context.Context, e *fsm.Event) {
enterStateTimeMs := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("enter_state_time_ms", enterStateTimeMs)

Expand All @@ -255,7 +256,7 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
"partition": envId,
}).Debug("environment.sm entering state")
},
"after_event": func(e *fsm.Event) {
"after_event": func(_ context.Context, e *fsm.Event) {
errHooks := env.handleHooks(env.Workflow(), fmt.Sprintf("after_%s", e.Event))
if errHooks != nil {
e.Cancel(errHooks)
Expand Down Expand Up @@ -613,7 +614,7 @@ func (env *Environment) TryTransition(t Transition) (err error) {
if err != nil {
return
}
err = env.Sm.Event(t.eventName(), t)
err = env.Sm.Event(context.Background(), t.eventName(), t)
return
}

Expand Down
2 changes: 1 addition & 1 deletion core/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func runSchedulerController(ctx context.Context,
switch {
case receivedEvent == scheduler.Event_SUBSCRIBED:
if state.sm.Is("INITIAL") {
state.sm.Event("CONNECT")
state.sm.Event(context.Background(), "CONNECT")
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions core/task/schedulerstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,34 +143,34 @@ func NewScheduler(taskman *Manager, fidStore store.Singleton, shutdown func()) (
{Name: "EXIT", Src: []string{"CONNECTED"}, Dst: "FINAL"},
},
fsm.Callbacks{
"before_event": func(e *fsm.Event) {
"before_event": func(_ context.Context, e *fsm.Event) {
log.WithFields(logrus.Fields{
"event": e.Event,
"src": e.Src,
"dst": e.Dst,
}).Debug("state.sm starting transition")
},
"enter_state": func(e *fsm.Event) {
"enter_state": func(_ context.Context, e *fsm.Event) {
log.WithFields(logrus.Fields{
"event": e.Event,
"src": e.Src,
"dst": e.Dst,
}).Debug("state.sm entering state")
},
"leave_CONNECTED": func(e *fsm.Event) {
"leave_CONNECTED": func(_ context.Context, e *fsm.Event) {
log.Debug("leave_CONNECTED")

},
"before_NEW_ENVIRONMENT": func(e *fsm.Event) {
"before_NEW_ENVIRONMENT": func(_ context.Context, e *fsm.Event) {
log.Debug("before_NEW_ENVIRONMENT")
e.Async() //transition frozen until the corresponding fsm.Transition call
},
"enter_CONNECTED": func(e *fsm.Event) {
"enter_CONNECTED": func(_ context.Context, e *fsm.Event) {
log.Debug("enter_CONNECTED")
log.WithField("level", infologger.IL_Support).
Info("scheduler connected")
},
"after_NEW_ENVIRONMENT": func(e *fsm.Event) {
"after_NEW_ENVIRONMENT": func(_ context.Context, e *fsm.Event) {
log.Debug("after_NEW_ENVIRONMENT")
},
},
Expand Down Expand Up @@ -206,10 +206,10 @@ func (state *schedulerState) Start(ctx context.Context) {
if state.err != nil {
err = state.err
log.WithField("error", err.Error()).Debug("scheduler quit with error, main state machine GO_ERROR")
state.sm.Event("GO_ERROR", err) //TODO: use error information in GO_ERROR
state.sm.Event(context.Background(), "GO_ERROR", err) //TODO: use error information in GO_ERROR
} else {
log.Debug("scheduler quit, no errors")
state.sm.Event("EXIT")
state.sm.Event(context.Background(), "EXIT")
}
}()
}

0 comments on commit f322c58

Please sign in to comment.