From cdd7fa3d49eb00b77b54304af21dd269ca49c202 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 27 Jun 2024 17:08:11 +0200 Subject: [PATCH] test FSM and calling hooks --- Makefile | 2 +- core/environment/environment.go | 4 + core/environment/environment_test.go | 12 ++ core/environment/fsm_test.go | 288 ++++++++++++++++++++++++++ core/environment/fsm_test.yaml | 12 ++ core/environment/transition.go | 7 +- core/integration/testplugin/plugin.go | 39 ++-- core/workflow/aggregatorrole.go | 13 ++ core/workflow/callrole.go | 15 ++ 9 files changed, 370 insertions(+), 22 deletions(-) create mode 100644 core/environment/environment_test.go create mode 100644 core/environment/fsm_test.go create mode 100644 core/environment/fsm_test.yaml diff --git a/Makefile b/Makefile index 0766d7f7f..2f1699132 100644 --- a/Makefile +++ b/Makefile @@ -70,7 +70,7 @@ INSTALL_WHAT:=$(patsubst %, install_%, $(WHAT)) GENERATE_DIRS := ./apricot ./coconut/cmd ./common ./common/runtype ./common/system ./core ./core/integration/ccdb ./core/integration/dcs ./core/integration/ddsched ./core/integration/kafka ./core/integration/odc ./executor ./walnut ./core/integration/trg ./core/integration/bookkeeping SRC_DIRS := ./apricot ./cmd/* ./core ./coconut ./executor ./common ./configuration ./occ/peanut ./walnut -TEST_DIRS := ./apricot/local ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration +TEST_DIRS := ./apricot/local ./configuration/cfgbackend ./configuration/componentcfg ./configuration/template ./core/task/sm ./core/workflow ./core/integration/odc/fairmq ./core/integration ./core/environment GO_TEST_DIRS := ./core/repos ./core/integration/dcs coverage:COVERAGE_PREFIX := ./coverage_results diff --git a/core/environment/environment.go b/core/environment/environment.go index 86148c9c1..7b56162c1 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -907,6 +907,10 @@ func (env *Environment) handlerFunc() func(e *fsm.Event) { "partition": env.id.String(), }).Debug("environment.sm starting transition") + if len(e.Args) == 0 { + e.Cancel(errors.New("transition missing in FSM event")) + return + } transition, ok := e.Args[0].(Transition) if !ok { e.Cancel(errors.New("transition wrapping error")) diff --git a/core/environment/environment_test.go b/core/environment/environment_test.go new file mode 100644 index 000000000..c8e80bd11 --- /dev/null +++ b/core/environment/environment_test.go @@ -0,0 +1,12 @@ +package environment + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "testing" +) + +func TestCoreEnvironment(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Core Environment Test Suite") +} diff --git a/core/environment/fsm_test.go b/core/environment/fsm_test.go new file mode 100644 index 000000000..f9c28943f --- /dev/null +++ b/core/environment/fsm_test.go @@ -0,0 +1,288 @@ +package environment + +import ( + "context" + "github.com/AliceO2Group/Control/common/utils/uid" + "github.com/AliceO2Group/Control/core/integration" + "github.com/AliceO2Group/Control/core/integration/testplugin" + "github.com/AliceO2Group/Control/core/task" + "github.com/AliceO2Group/Control/core/workflow" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/spf13/viper" + "io" + "os" + "time" +) + +func NewDummyTransition(transition string) Transition { + return &DummyTransition{ + baseTransition: baseTransition{ + name: transition, + taskman: nil, + }, + } +} + +type DummyTransition struct { + baseTransition +} + +func (t DummyTransition) do(env *Environment) (err error) { + return nil +} + +const fsmTestConfig = "fsm_test.yaml" + +var tmpDir *string + +var _ = BeforeSuite(func() { + var err error + tmpDir = new(string) + *tmpDir, err = os.MkdirTemp("", "o2control-core-environment") + Expect(err).NotTo(HaveOccurred()) + + // copy config files + configFiles := []string{fsmTestConfig} + for _, configFile := range configFiles { + from, err := os.Open("./" + configFile) + Expect(err).NotTo(HaveOccurred()) + defer from.Close() + + to, err := os.OpenFile(*tmpDir+"/"+configFile, os.O_RDWR|os.O_CREATE, 0666) + Expect(err).NotTo(HaveOccurred()) + defer to.Close() + + _, err = io.Copy(to, from) + Expect(err).NotTo(HaveOccurred()) + } + + viper.Set("coreWorkingDir", tmpDir) // used by NewRunNumber with YAML backend + + integration.Reset() + integration.RegisterPlugin("testplugin", "testPluginEndpoint", testplugin.NewPlugin) + viper.Reset() + viper.Set("integrationPlugins", []string{"testplugin"}) + viper.Set("testPluginEndpoint", "http://example.com") + viper.Set("config_endpoint", "file://"+*tmpDir+"/"+fsmTestConfig) +}) + +var _ = AfterSuite(func() { + os.RemoveAll(*tmpDir) +}) + +var _ = Describe("environment FSM", func() { + + Describe("allowed states and transitions", func() { + var env *Environment + BeforeEach(func() { + envId, err := uid.FromString("2oDvieFrVTi") + Expect(err).NotTo(HaveOccurred()) + + env, err = newEnvironment(nil, envId) + Expect(err).NotTo(HaveOccurred()) + Expect(env).NotTo(BeNil()) + }) + When("FSM is created", func() { + It("should be in STANDBY", func() { + Expect(env.Sm.Current()).To(Equal("STANDBY")) + }) + }) + When("FSM is in STANDBY", func() { + It("should allow for DEPLOY, GO_ERROR and EXIT transitions", func() { + env.Sm.SetState("STANDBY") + Expect(env.Sm.Can("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Can("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Can("EXIT")).To(BeTrue()) + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("STANDBY") + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + }) + }) + When("FSM is in DEPLOYED", func() { + It("should allow for CONFIGURED, GO_ERROR and EXIT transitions", func() { + env.Sm.SetState("DEPLOYED") + Expect(env.Sm.Can("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Can("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Can("EXIT")).To(BeTrue()) + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("DEPLOYED") + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + }) + }) + When("FSM is in CONFIGURED", func() { + It("should allow for START_ACTIVITY, RESET, GO_ERROR and EXIT transitions", func() { + env.Sm.SetState("CONFIGURED") + Expect(env.Sm.Can("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Can("RESET")).To(BeTrue()) + Expect(env.Sm.Can("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Can("EXIT")).To(BeTrue()) + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("CONFIGURED") + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + }) + }) + When("FSM is in RUNNING", func() { + It("should allow for STOP_ACTIVITY and GO_ERROR transitions", func() { + env.Sm.SetState("RUNNING") + Expect(env.Sm.Can("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Can("GO_ERROR")).To(BeTrue()) + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("RUNNING") + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + Expect(env.Sm.Cannot("EXIT")).To(BeTrue()) + }) + }) + When("FSM is in ERROR", func() { + It("should allow for RECOVER transition", func() { + env.Sm.SetState("ERROR") + Expect(env.Sm.Can("RECOVER")).To(BeTrue()) + // fixme: is this correct that we do not allow for EXIT? + // in principle, in TeardownEnvironment we ignore the FSM and do it ourselves, + // but I don't see a reason not to use allow for it + // fixme: shouldn't we add also DESTROY and TEARDOWN in FSM? + // is even DESTROY different from TEARDOWN? + }) + It("should not allow for other transitions", func() { + env.Sm.SetState("ERROR") + Expect(env.Sm.Cannot("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("EXIT")).To(BeTrue()) // should it be? (see the comment above) + }) + }) + When("FSM is in DONE", func() { + It("should not allow for any transitions", func() { + env.Sm.SetState("DONE") + Expect(env.Sm.Cannot("GO_ERROR")).To(BeTrue()) + Expect(env.Sm.Cannot("DEPLOY")).To(BeTrue()) + Expect(env.Sm.Cannot("RESET")).To(BeTrue()) + Expect(env.Sm.Cannot("CONFIGURE")).To(BeTrue()) + Expect(env.Sm.Cannot("START_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("STOP_ACTIVITY")).To(BeTrue()) + Expect(env.Sm.Cannot("RECOVER")).To(BeTrue()) + Expect(env.Sm.Cannot("EXIT")).To(BeTrue()) + }) + }) + + }) + + Describe("calling hooks on FSM events", func() { + var env *Environment + BeforeEach(func() { + envId, err := uid.FromString("2oDvieFrVTi") + Expect(err).NotTo(HaveOccurred()) + + env, err = newEnvironment(map[string]string{}, envId) + Expect(err).NotTo(HaveOccurred()) + Expect(env).NotTo(BeNil()) + }) + It("should execute the requested plugin call without errors", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + env.workflow.GetUserVars().Del("root.call_called") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE")) + + Expect(err).NotTo(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + + It("should return an error if a critical hook fails", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + env.workflow.GetUserVars().Del("root.call_called") + env.workflow.GetUserVars().Set("testplugin_fail", "true") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE")) + + Expect(err).To(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + + It("should not return an error if an non-critical hook fails", func() { + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: false, Await: "before_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + env.workflow.GetUserVars().Del("root.call_called") + env.workflow.GetUserVars().Set("testplugin_fail", "true") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE")) + + Expect(err).NotTo(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }) + + It("should return an error if a critical hook times out", func(ctx SpecContext) { + // fixme: this fails + env.workflow = workflow.NewAggregatorRole("root", []workflow.Role{ + workflow.NewCallRole( + "call", + task.Traits{Trigger: "before_CONFIGURE", Timeout: "1s", Critical: true, Await: "before_CONFIGURE"}, + "testplugin.Test()", + "")}) + workflow.LinkChildrenToParents(env.workflow) + env.Sm.SetState("DEPLOYED") + env.workflow.GetUserVars().Del("root.call_called") + env.workflow.GetUserVars().Set("testplugin_hang", "true") + + err := env.Sm.Event(context.Background(), "CONFIGURE", NewDummyTransition("CONFIGURE")) + + Expect(err).To(HaveOccurred()) + v, ok := env.workflow.GetUserVars().Get("root.call_called") + Expect(ok).To(BeTrue()) + Expect(v).To(Equal("true")) + }, SpecTimeout(10*time.Second)) + + /// TODO + // It should write the SOSOR, EOSOR, SOEOR and EOEOR timestamps correctly and clear the unneeded ones + // the above incl. GO_ERROR + // Await within the same transition should work. + // Await across different transitions should work + }) +}) diff --git a/core/environment/fsm_test.yaml b/core/environment/fsm_test.yaml new file mode 100644 index 000000000..204a292d2 --- /dev/null +++ b/core/environment/fsm_test.yaml @@ -0,0 +1,12 @@ +o2: + components: + qc: + TECHNICAL: + any: + entry: "config" + runtime: + aliecs: + defaults: + key1: value1 + vars: + key2: value2 \ No newline at end of file diff --git a/core/environment/transition.go b/core/environment/transition.go index 947195c29..a821e3dfa 100644 --- a/core/environment/transition.go +++ b/core/environment/transition.go @@ -22,7 +22,6 @@ * Intergovernmental Organization or submit itself to any jurisdiction. */ - package environment import ( @@ -61,13 +60,13 @@ func MakeTransition(taskman *task.Manager, optype pb.ControlEnvironmentRequest_O } type baseTransition struct { - taskman *task.Manager - name string + taskman *task.Manager + name string } func (t baseTransition) check() (err error) { if t.taskman == nil { - err = errors.New("cannot configure environment with nil roleman") + err = errors.New("cannot transition environment with nil taskman") } return } diff --git a/core/integration/testplugin/plugin.go b/core/integration/testplugin/plugin.go index a79bdce49..93f608035 100644 --- a/core/integration/testplugin/plugin.go +++ b/core/integration/testplugin/plugin.go @@ -126,6 +126,15 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { doFail = false } + doHangS, ok := varStack["testplugin_hang"] + if !ok { + doHangS = "false" + } + doHang, convErr := strconv.ParseBool(doHangS) + if convErr != nil { + doHang = false + } + stack = make(map[string]interface{}) stack["Noop"] = func() (out string) { // must formally return string even when we return nothing log.WithField("partition", envId). @@ -166,30 +175,26 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { WithField("await", call.GetTraits().Await). Infof("executing testplugin.Test call: %s", message) - rn := varStack["run_number"] - var ( - runNumber64 int64 - err error - ) - runNumber64, err = strconv.ParseInt(rn, 10, 32) - if err != nil { - runNumber64 = 0 + rolePath, ok := varStack["__call_rolepath"] + if !ok { + call.VarStack["__call_error"] = "could not get __call_rolepath" } - timeout := callable.AcquireTimeout(TESTPLUGIN_GENERAL_OP_TIMEOUT, varStack, "Test", envId) - defer log.WithField("partition", envId). - WithField("level", infologger.IL_Ops). - WithField("rolepath", call.GetParentRolePath()). - WithField("trigger", call.GetTraits().Trigger). - WithField("await", call.GetTraits().Await). - WithField("run", runNumber64). - Infof("executed testplugin.Test call in %s", timeout) + parentRole, ok := call.GetParentRole().(callable.ParentRole) + if ok { + parentRole.SetGlobalRuntimeVar(rolePath+"_called", "true") + } - time.Sleep(timeout) if doFail { call.VarStack["__call_error"] = "error triggered in testplugin.Test call" } + if doHang { + for { + time.Sleep(time.Second) + } + } + return } diff --git a/core/workflow/aggregatorrole.go b/core/workflow/aggregatorrole.go index 4446e5dcc..f3d287205 100644 --- a/core/workflow/aggregatorrole.go +++ b/core/workflow/aggregatorrole.go @@ -26,6 +26,7 @@ package workflow import ( "errors" + "github.com/AliceO2Group/Control/common/gera" "strings" "sync" texttemplate "text/template" @@ -49,6 +50,18 @@ type aggregatorRole struct { aggregator } +func NewAggregatorRole(name string, roles []Role) (r Role) { + return &aggregatorRole{ + roleBase: roleBase{ + Name: name, + Defaults: gera.MakeStringMap(), + Vars: gera.MakeStringMap(), + UserVars: gera.MakeStringMap(), + }, + aggregator: aggregator{Roles: roles}, + } +} + func (r *aggregatorRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error) { // NOTE: see NOTE in roleBase.UnmarshalYAML diff --git a/core/workflow/callrole.go b/core/workflow/callrole.go index ddbb06c19..8d7b67c48 100644 --- a/core/workflow/callrole.go +++ b/core/workflow/callrole.go @@ -26,6 +26,7 @@ package workflow import ( "errors" + "github.com/AliceO2Group/Control/common/gera" "strings" texttemplate "text/template" "time" @@ -51,6 +52,20 @@ type callRole struct { ReturnVar string `yaml:"-,omitempty"` } +func NewCallRole(name string, traits task.Traits, funcCall string, returnVar string) (r Role) { + return &callRole{ + roleBase: roleBase{ + Name: name, + Defaults: gera.MakeStringMap(), + Vars: gera.MakeStringMap(), + UserVars: gera.MakeStringMap(), + }, + Traits: traits, + FuncCall: funcCall, + ReturnVar: returnVar, + } +} + func (t *callRole) UnmarshalYAML(unmarshal func(interface{}) error) (err error) { aux := struct { Call struct {