Skip to content

Commit

Permalink
[core] Emit role events to aliecs.role topic
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Mar 28, 2024
1 parent 1dd0b01 commit b1676d9
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
22 changes: 22 additions & 0 deletions core/workflow/aggregatorrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
texttemplate "text/template"

"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/event/topic"
pb "github.com/AliceO2Group/Control/common/protos"
"github.com/AliceO2Group/Control/configuration/template"
"github.com/AliceO2Group/Control/core/repos"
"github.com/AliceO2Group/Control/core/task"
Expand Down Expand Up @@ -234,6 +236,7 @@ func (r *aggregatorRole) updateStatus(s task.Status) {
if r == nil {
return
}
oldStatus := r.status.get()
log.WithFields(logrus.Fields{
"child status": s.String(),
"aggregator status": r.status.get().String(),
Expand All @@ -243,6 +246,15 @@ func (r *aggregatorRole) updateStatus(s task.Status) {
Trace("aggregator role about to merge incoming child status")
r.status.merge(s, r)
log.WithField("new status", r.status.get()).Trace("status merged")

if oldStatus != r.status.get() {
the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{
Name: r.Name,
Status: r.status.get().String(),
RolePath: r.GetPath(),
EnvironmentId: r.GetEnvironmentId().String(),
})
}
r.SendEvent(&event.RoleEvent{Name: r.Name, Status: r.status.get().String(), RolePath: r.GetPath()})
if r.parent != nil {
r.parent.updateStatus(r.status.get())
Expand All @@ -253,10 +265,20 @@ func (r *aggregatorRole) updateState(s task.State) {
if r == nil {
return
}
oldState := r.state.get()
r.state.merge(s, r)
log.WithField("role", r.Name).
WithField("partition", r.GetEnvironmentId().String()).
Tracef("updated state to %s upon input state %s", r.state.get().String(), s.String())

if oldState != r.state.get() {
the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{
Name: r.Name,
State: r.state.get().String(),
RolePath: r.GetPath(),
EnvironmentId: r.GetEnvironmentId().String(),
})
}
r.SendEvent(&event.RoleEvent{Name: r.Name, State: r.state.get().String(), RolePath: r.GetPath()})
if r.parent != nil {
r.parent.updateState(r.state.get())
Expand Down
23 changes: 23 additions & 0 deletions core/workflow/callrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"time"

"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/event/topic"
pb "github.com/AliceO2Group/Control/common/protos"
"github.com/AliceO2Group/Control/configuration/template"
"github.com/AliceO2Group/Control/core/repos"
"github.com/AliceO2Group/Control/core/task"
Expand Down Expand Up @@ -207,17 +209,29 @@ func (t *callRole) UpdateState(s task.State) {
}

func (t *callRole) updateStatus(s task.Status) {
oldStatus := t.status.get()
if t.parent == nil {
log.WithField("status", s.String()).
WithField("partition", t.GetEnvironmentId().String()).
Error("cannot update status with nil parent")
}
t.status.merge(s, t)

if oldStatus != t.status.get() {
the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{
Name: t.Name,
Status: t.status.get().String(),
RolePath: t.GetPath(),
EnvironmentId: t.GetEnvironmentId().String(),
})
}
t.SendEvent(&event.RoleEvent{Name: t.Name, Status: t.status.get().String(), RolePath: t.GetPath()})

t.parent.updateStatus(s)
}

func (t *callRole) updateState(s task.State) {
oldState := t.state.get()
if t.parent == nil {
log.WithField("state", s.String()).
WithField("partition", t.GetEnvironmentId().String()).
Expand All @@ -227,6 +241,15 @@ func (t *callRole) updateState(s task.State) {
log.WithField("role", t.Name).
WithField("partition", t.GetEnvironmentId().String()).
Tracef("updated state to %s upon input state %s", t.state.get().String(), s.String())

if oldState != t.state.get() {
the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{
Name: t.Name,
State: t.state.get().String(),
RolePath: t.GetPath(),
EnvironmentId: t.GetEnvironmentId().String(),
})
}
t.SendEvent(&event.RoleEvent{Name: t.Name, State: t.state.get().String(), RolePath: t.GetPath()})

if t.Critical == true {
Expand Down
23 changes: 23 additions & 0 deletions core/workflow/taskrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"time"

"github.com/AliceO2Group/Control/common/event"
"github.com/AliceO2Group/Control/common/event/topic"
pb "github.com/AliceO2Group/Control/common/protos"
"github.com/AliceO2Group/Control/configuration/template"
"github.com/AliceO2Group/Control/core/repos"
"github.com/AliceO2Group/Control/core/task"
Expand Down Expand Up @@ -211,17 +213,29 @@ func (t *taskRole) UpdateState(s task.State) {
}

func (t *taskRole) updateStatus(s task.Status) {
oldStatus := t.status.get()
if t.parent == nil {
log.WithField("status", s.String()).
WithField("partition", t.GetEnvironmentId().String()).
Error("cannot update status with nil parent")
}
t.status.merge(s, t)

if oldStatus != t.status.get() {
the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{
Name: t.Name,
Status: t.status.get().String(),
RolePath: t.GetPath(),
EnvironmentId: t.GetEnvironmentId().String(),
})
}
t.SendEvent(&event.RoleEvent{Name: t.Name, Status: t.status.get().String(), RolePath: t.GetPath()})

t.parent.updateStatus(s)
}

func (t *taskRole) updateState(s task.State) {
oldState := t.state.get()
if t.parent == nil {
log.WithField("state", s.String()).
WithField("partition", t.GetEnvironmentId().String()).
Expand All @@ -231,6 +245,15 @@ func (t *taskRole) updateState(s task.State) {
log.WithField("role", t.Name).
WithField("partition", t.GetEnvironmentId().String()).
Tracef("updated state to %s upon input state %s", t.state.get().String(), s.String())

if oldState != t.state.get() {
the.EventWriterWithTopic(topic.Role).WriteEvent(&pb.Ev_RoleEvent{
Name: t.Name,
State: t.state.get().String(),
RolePath: t.GetPath(),
EnvironmentId: t.GetEnvironmentId().String(),
})
}
t.SendEvent(&event.RoleEvent{Name: t.Name, State: t.state.get().String(), RolePath: t.GetPath()})

if t.Critical == true {
Expand Down

0 comments on commit b1676d9

Please sign in to comment.