Skip to content

Commit

Permalink
[core] React to ODC state change during RUNNING with STOP->CONFIGURED
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Aug 10, 2023
1 parent 33516e5 commit b1c1bb9
Showing 1 changed file with 87 additions and 38 deletions.
125 changes: 87 additions & 38 deletions core/environment/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/AliceO2Group/Control/common/system"
"github.com/AliceO2Group/Control/common/utils"
"github.com/AliceO2Group/Control/common/utils/uid"
event2 "github.com/AliceO2Group/Control/core/integration/odc/event"
"github.com/AliceO2Group/Control/core/task"
"github.com/AliceO2Group/Control/core/task/taskop"
"github.com/AliceO2Group/Control/core/the"
Expand All @@ -51,7 +52,7 @@ type Manager struct {
mu sync.RWMutex
m map[uid.ID]*Environment
taskman *task.Manager
incomingEventCh <-chan event.Event
incomingEventCh chan event.Event
pendingTeardownsCh map[uid.ID]chan *event.TasksReleasedEvent
pendingStateChangeCh map[uid.ID]chan *event.TasksStateChangedEvent
}
Expand All @@ -64,7 +65,7 @@ func ManagerInstance() *Manager {
return instance
}

func NewEnvManager(tm *task.Manager, incomingEventCh <-chan event.Event) *Manager {
func NewEnvManager(tm *task.Manager, incomingEventCh chan event.Event) *Manager {
instance = &Manager{
m: make(map[uid.ID]*Environment),
taskman: tm,
Expand All @@ -81,6 +82,9 @@ func NewEnvManager(tm *task.Manager, incomingEventCh <-chan event.Event) *Manage
case event.DeviceEvent:
instance.handleDeviceEvent(typedEvent)

case event.IntegratedServiceEvent:
instance.handleIntegratedServiceEvent(typedEvent)

case *event.ExecutorFailedEvent:
envIdsAffected := tm.HandleExecutorFailed(typedEvent)
for envId := range envIdsAffected {
Expand Down Expand Up @@ -174,6 +178,10 @@ func NewEnvManager(tm *task.Manager, incomingEventCh <-chan event.Event) *Manage
return instance
}

func (envs *Manager) NotifyIntegratedServiceEvent(event event.IntegratedServiceEvent) {
envs.incomingEventCh <- event
}

func (envs *Manager) GetActiveDetectors() system.IDMap {
envs.mu.RLock()
defer envs.mu.RUnlock()
Expand Down Expand Up @@ -628,6 +636,45 @@ func (envs *Manager) loadWorkflow(workflowPath string, parent workflow.Updatable
return workflow.Load(workflowPath, parent, envs.taskman, workflowUserVars, baseConfigStack)
}

func (envs *Manager) handleIntegratedServiceEvent(evt event.IntegratedServiceEvent) {
if evt == nil {
log.Error("cannot handle null IntegratedServiceEvent")
return
}

// for now we only handle ODC events
if evt.GetServiceName() == "ODC" {
if odcEvent, ok := evt.(*event2.OdcPartitionStateChangeEvent); ok && odcEvent.GetState() == "ERROR" {
envId := odcEvent.GetEnvironmentId()
env, err := envs.environment(envId)
if err != nil {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("odcState", odcEvent.GetState()).
WithError(err).
Error("cannot find environment for OdcPartitionStateChangeEvent")
} else {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("odcState", odcEvent.GetState()).
WithField("envState", env.CurrentState()).
Debug("received ODC_PARTITION_STATE_CHANGE event from ODC, trying to stop the run")
if env.CurrentState() == "RUNNING" {
go func() {
err = env.TryTransition(NewStopActivityTransition(envs.taskman))
if err != nil {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithError(err).
Error("cannot stop run after ODC_PARTITION_STATE_CHANGE ERROR event")
}
}()
}
}
}
}
}

func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
if evt == nil {
log.Error("cannot handle null DeviceEvent")
Expand Down Expand Up @@ -715,25 +762,26 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
WithField("taskId", taskId.Value).
WithError(err).
Error("cannot find environment for DeviceEvent")
}
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("taskId", taskId.Value).
WithField("envState", env.CurrentState()).
Debug("received END_OF_STREAM event from task, trying to stop the run")
if env.CurrentState() == "RUNNING" {
t.SetSafeToStop(true) // we mark this specific task as ok to STOP
go func() {
if env.IsSafeToStop() { // but then we ask the env whether *all* of them are
err = env.TryTransition(NewStopActivityTransition(envs.taskman))
if err != nil {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithError(err).
Error("cannot stop run after END_OF_STREAM event")
} else {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("taskId", taskId.Value).
WithField("envState", env.CurrentState()).
Debug("received END_OF_STREAM event from task, trying to stop the run")
if env.CurrentState() == "RUNNING" {
t.SetSafeToStop(true) // we mark this specific task as ok to STOP
go func() {
if env.IsSafeToStop() { // but then we ask the env whether *all* of them are
err = env.TryTransition(NewStopActivityTransition(envs.taskman))
if err != nil {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithError(err).
Error("cannot stop run after END_OF_STREAM event")
}
}
}
}()
}()
}
}

case pb.DeviceEventType_TASK_INTERNAL_ERROR:
Expand All @@ -754,24 +802,25 @@ func (envs *Manager) handleDeviceEvent(evt event.DeviceEvent) {
WithField("taskId", taskId.Value).
WithError(err).
Error("cannot find environment for DeviceEvent")
}
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("taskId", taskId.Value).
WithField("taskRole", t.GetParentRolePath()).
WithField("envState", env.CurrentState()).
Debug("received TASK_INTERNAL_ERROR event from task, trying to stop the run")
if env.CurrentState() == "RUNNING" {
go func() {
t.GetParent().UpdateState(task.ERROR)
err = env.TryTransition(NewStopActivityTransition(envs.taskman))
if err != nil {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithError(err).
Error("cannot stop run after END_OF_STREAM event")
}
}()
} else {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithField("taskId", taskId.Value).
WithField("taskRole", t.GetParentRolePath()).
WithField("envState", env.CurrentState()).
Debug("received TASK_INTERNAL_ERROR event from task, trying to stop the run")
if env.CurrentState() == "RUNNING" {
go func() {
t.GetParent().UpdateState(task.ERROR)
err = env.TryTransition(NewStopActivityTransition(envs.taskman))
if err != nil {
log.WithPrefix("scheduler").
WithField("partition", envId.String()).
WithError(err).
Error("cannot stop run after END_OF_STREAM event")
}
}()
}
}

}
Expand Down

0 comments on commit b1c1bb9

Please sign in to comment.