Skip to content

Commit

Permalink
[core] OCTRL-920 safer concurrency in KillTasks
Browse files Browse the repository at this point in the history
Parallel attempts to kill tasks were found to be the primary cause for stuck auto-environments.
In particular, it was due to channels in ackKilledTasks (and handling them) not expecting multiple listeners,
 so either one of the two kill acknowledgments would be stuck waiting for the acknowledgment to be received, or the other side, waiting for acknowledgment would never get it.
It would cause KillTasks to be stuck indefinitely, which blocks the main auto-environment code-path.
  • Loading branch information
knopers8 committed Sep 18, 2024
1 parent e20e0f6 commit a5b0ccf
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type Manager struct {
schedulerState *schedulerState
internalEventCh chan<- event.Event
ackKilledTasks *safeAcks
killTasksMu sync.Mutex // to avoid races when attempting to kill the same tasks in different goroutines
}

func NewManager(shutdown func(), internalEventCh chan<- event.Event) (taskman *Manager, err error) {
Expand Down Expand Up @@ -1042,7 +1043,7 @@ func (m *Manager) Cleanup() (killed Tasks, running Tasks, err error) {
// If the task list includes locked tasks, TaskNotFoundError is returned.
func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err error) {
taskCanBeKilledFilter := func(t *Task) bool {
if t.IsLocked() {
if t.IsLocked() || m.ackKilledTasks.contains(t.taskId) {
return false
}
for _, id := range taskIds {
Expand All @@ -1053,20 +1054,27 @@ func (m *Manager) KillTasks(taskIds []string) (killed Tasks, running Tasks, err
return false
}

if !m.killTasksMu.TryLock() {
log.WithField("level", infologger.IL_Support).Warnf("Scheduling killing tasks was delayed until another goroutine finishes doing so")
m.killTasksMu.Lock()
log.WithField("level", infologger.IL_Support).Infof("Scheduling killing tasks is resumed")
}
// TODO: use grouping instead of 2 passes of filtering for performance
toKill := m.roster.filtered(taskCanBeKilledFilter)
unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) })

if len(toKill) < len(taskIds) {
unkillable := m.roster.filtered(func(t *Task) bool { return !taskCanBeKilledFilter(t) })
log.WithField("taskIds", strings.Join(unkillable.GetTaskIds(), ", ")).
Debugf("some tasks cannot be physically killed (already dead?), will instead only be removed from roster")
Debugf("some tasks cannot be physically killed (already dead or being killed in another goroutine?), will instead only be removed from roster")
}

for _, id := range toKill.GetTaskIds() {
m.ackKilledTasks.addAckChannel(id)
}

killed, running, err = m.doKillTasks(toKill)
m.killTasksMu.Unlock()

for _, id := range killed.GetTaskIds() {
ack, ok := m.ackKilledTasks.getValue(id)
if ok {
Expand All @@ -1088,6 +1096,7 @@ func (m *Manager) doKillTasks(tasks Tasks) (killed Tasks, running Tasks, err err
inactiveTasks := tasks.Filtered(func(task *Task) bool {
return task.status != ACTIVE
})

// Remove from the roster the tasks which are also in the inactiveTasks list to delete
m.roster.updateTasks(m.roster.filtered(func(task *Task) bool {
return !inactiveTasks.Contains(func(t *Task) bool {
Expand Down

0 comments on commit a5b0ccf

Please sign in to comment.