From b1676d972a05638141211973240f3f254987f9f7 Mon Sep 17 00:00:00 2001 From: Teo Mrnjavac Date: Thu, 28 Mar 2024 11:22:57 +0100 Subject: [PATCH] [core] Emit role events to aliecs.role topic --- core/workflow/aggregatorrole.go | 22 ++++++++++++++++++++++ core/workflow/callrole.go | 23 +++++++++++++++++++++++ core/workflow/taskrole.go | 23 +++++++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/core/workflow/aggregatorrole.go b/core/workflow/aggregatorrole.go index 72217f19..418f4a66 100644 --- a/core/workflow/aggregatorrole.go +++ b/core/workflow/aggregatorrole.go @@ -31,6 +31,8 @@ import ( texttemplate "text/template" "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/event/topic" + pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/configuration/template" "github.com/AliceO2Group/Control/core/repos" "github.com/AliceO2Group/Control/core/task" @@ -234,6 +236,7 @@ func (r *aggregatorRole) updateStatus(s task.Status) { if r == nil { return } + oldStatus := r.status.get() log.WithFields(logrus.Fields{ "child status": s.String(), "aggregator status": r.status.get().String(), @@ -243,6 +246,15 @@ func (r *aggregatorRole) updateStatus(s task.Status) { Trace("aggregator role about to merge incoming child status") r.status.merge(s, r) log.WithField("new status", r.status.get()).Trace("status merged") + + if oldStatus != r.status.get() { + the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{ + Name: r.Name, + Status: r.status.get().String(), + RolePath: r.GetPath(), + EnvironmentId: r.GetEnvironmentId().String(), + }) + } r.SendEvent(&event.RoleEvent{Name: r.Name, Status: r.status.get().String(), RolePath: r.GetPath()}) if r.parent != nil { r.parent.updateStatus(r.status.get()) @@ -253,10 +265,20 @@ func (r *aggregatorRole) updateState(s task.State) { if r == nil { return } + oldState := r.state.get() r.state.merge(s, r) log.WithField("role", r.Name). WithField("partition", r.GetEnvironmentId().String()). Tracef("updated state to %s upon input state %s", r.state.get().String(), s.String()) + + if oldState != r.state.get() { + the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{ + Name: r.Name, + State: r.state.get().String(), + RolePath: r.GetPath(), + EnvironmentId: r.GetEnvironmentId().String(), + }) + } r.SendEvent(&event.RoleEvent{Name: r.Name, State: r.state.get().String(), RolePath: r.GetPath()}) if r.parent != nil { r.parent.updateState(r.state.get()) diff --git a/core/workflow/callrole.go b/core/workflow/callrole.go index 745ef291..4070fb7f 100644 --- a/core/workflow/callrole.go +++ b/core/workflow/callrole.go @@ -31,6 +31,8 @@ import ( "time" "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/event/topic" + pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/configuration/template" "github.com/AliceO2Group/Control/core/repos" "github.com/AliceO2Group/Control/core/task" @@ -207,17 +209,29 @@ func (t *callRole) UpdateState(s task.State) { } func (t *callRole) updateStatus(s task.Status) { + oldStatus := t.status.get() if t.parent == nil { log.WithField("status", s.String()). WithField("partition", t.GetEnvironmentId().String()). Error("cannot update status with nil parent") } t.status.merge(s, t) + + if oldStatus != t.status.get() { + the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{ + Name: t.Name, + Status: t.status.get().String(), + RolePath: t.GetPath(), + EnvironmentId: t.GetEnvironmentId().String(), + }) + } t.SendEvent(&event.RoleEvent{Name: t.Name, Status: t.status.get().String(), RolePath: t.GetPath()}) + t.parent.updateStatus(s) } func (t *callRole) updateState(s task.State) { + oldState := t.state.get() if t.parent == nil { log.WithField("state", s.String()). WithField("partition", t.GetEnvironmentId().String()). @@ -227,6 +241,15 @@ func (t *callRole) updateState(s task.State) { log.WithField("role", t.Name). WithField("partition", t.GetEnvironmentId().String()). Tracef("updated state to %s upon input state %s", t.state.get().String(), s.String()) + + if oldState != t.state.get() { + the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{ + Name: t.Name, + State: t.state.get().String(), + RolePath: t.GetPath(), + EnvironmentId: t.GetEnvironmentId().String(), + }) + } t.SendEvent(&event.RoleEvent{Name: t.Name, State: t.state.get().String(), RolePath: t.GetPath()}) if t.Critical == true { diff --git a/core/workflow/taskrole.go b/core/workflow/taskrole.go index 6e3cb1db..ceac8687 100644 --- a/core/workflow/taskrole.go +++ b/core/workflow/taskrole.go @@ -31,6 +31,8 @@ import ( "time" "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/event/topic" + pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/configuration/template" "github.com/AliceO2Group/Control/core/repos" "github.com/AliceO2Group/Control/core/task" @@ -211,17 +213,29 @@ func (t *taskRole) UpdateState(s task.State) { } func (t *taskRole) updateStatus(s task.Status) { + oldStatus := t.status.get() if t.parent == nil { log.WithField("status", s.String()). WithField("partition", t.GetEnvironmentId().String()). Error("cannot update status with nil parent") } t.status.merge(s, t) + + if oldStatus != t.status.get() { + the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{ + Name: t.Name, + Status: t.status.get().String(), + RolePath: t.GetPath(), + EnvironmentId: t.GetEnvironmentId().String(), + }) + } t.SendEvent(&event.RoleEvent{Name: t.Name, Status: t.status.get().String(), RolePath: t.GetPath()}) + t.parent.updateStatus(s) } func (t *taskRole) updateState(s task.State) { + oldState := t.state.get() if t.parent == nil { log.WithField("state", s.String()). WithField("partition", t.GetEnvironmentId().String()). @@ -231,6 +245,15 @@ func (t *taskRole) updateState(s task.State) { log.WithField("role", t.Name). WithField("partition", t.GetEnvironmentId().String()). Tracef("updated state to %s upon input state %s", t.state.get().String(), s.String()) + + if oldState != t.state.get() { + the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{ + Name: t.Name, + State: t.state.get().String(), + RolePath: t.GetPath(), + EnvironmentId: t.GetEnvironmentId().String(), + }) + } t.SendEvent(&event.RoleEvent{Name: t.Name, State: t.state.get().String(), RolePath: t.GetPath()}) if t.Critical == true {