Skip to content

Commit

Permalink
[executor] Add executorId in critical log messages
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Dec 13, 2023
1 parent 0a5e009 commit 3da3af0
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
7 changes: 4 additions & 3 deletions core/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,20 +627,20 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
targetExecutorId.Value = uid.New().String()
log.WithField("executorId", targetExecutorId.Value).
WithField("offerHost", offer.GetHostname()).
WithField("level", infologger.IL_Devel).
WithField("level", infologger.IL_Support).
Info("received offer without executor ID, will start new executor if accepted")
} else {
targetExecutorId.Value = offer.ExecutorIDs[0].Value
if len(offer.ExecutorIDs) == 1 {
log.WithField("executorId", targetExecutorId.Value).
WithField("offerHost", offer.GetHostname()).
WithField("level", infologger.IL_Devel).
WithField("level", infologger.IL_Support).
Warn("received offer with one executor ID, will use existing executor")
} else if len(offer.ExecutorIDs) > 1 {
log.WithField("executorId", targetExecutorId.Value).
WithField("executorIds", offer.ExecutorIDs).
WithField("offerHost", offer.GetHostname()).
WithField("level", infologger.IL_Devel).
WithField("level", infologger.IL_Support).
Warn("received offer with more than one executor ID, will use first one")
}
}
Expand Down Expand Up @@ -965,6 +965,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han
WithField("detector", detector).
WithField("level", infologger.IL_Support).
WithField("offerHost", offer.Hostname).
WithField("executorId", targetExecutorId.Value).
Infof("launch request sent to %s: %d tasks", offer.Hostname, n)
for _, taskInfo := range taskInfosToLaunchForCurrentOffer {
log.WithPrefix("scheduler").
Expand Down
6 changes: 4 additions & 2 deletions executor/executable/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE
log.WithField("level", infologger.IL_Support).
WithField("partition", envId.String()).
WithField("detector", detector).
WithField("level", infologger.IL_Devel).
Infof("launching task %s: %s", taskInfo.TaskID.GetValue(), rawCommand)
Infof("launching task %s on executorId %s: %s",
taskInfo.TaskID.GetValue(),
taskInfo.GetExecutor().GetExecutorID().Value,
rawCommand)
} else {
if err != nil {
log.WithError(err).
Expand Down
22 changes: 16 additions & 6 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,43 +168,53 @@ func Run(cfg config.Config) {
// tasks and updates are empty.
subscribe := calls.Subscribe(unacknowledgedTasks(state), unacknowledgedUpdates(state))

log.Debug("subscribing to agent for events")
log.WithField("executorId", state.executor.GetExecutorID().Value).
Debug("subscribing to agent for events")
// ↓ empty context ↓ we get a plain RequestFunc from the executor.Call value
resp, err := subscriber.Send(context.TODO(), calls.NonStreaming(subscribe))
if resp != nil {
defer resp.Close()
}
if err == nil {
log.WithField("level", infologger.IL_Support).
WithField("executorId", state.executor.GetExecutorID().Value).
Info("executor subscribed, ready to receive events")
// We're officially connected, start decoding events
err = eventLoop(state, resp, handler)
// If we're out of the eventLoop, means a disconnect happened, willingly or not.
disconnected = time.Now()
log.Debug("event loop finished")
log.WithField("level", infologger.IL_Support).
WithField("executorId", state.executor.GetExecutorID().Value).
Info("event loop finished")
}
if err != nil && err != io.EOF {
log.WithField("error", err).
WithField("executorId", state.executor.GetExecutorID().Value).
Error("executor disconnected with error")
} else {
log.Info("executor disconnected")
log.WithField("executorId", state.executor.GetExecutorID().Value).
Info("executor disconnected")
}
}()
if state.shouldQuit {
log.Info("gracefully shutting down on request")
log.WithField("executorId", state.executor.GetExecutorID().Value).
Info("gracefully shutting down on request")
return
}
// The purpose of checkpointing is to handle recovery when mesos-agent exits.
if !cfg.Checkpoint {
log.Info("gracefully exiting because framework checkpointing is not enabled")
log.WithField("executorId", state.executor.GetExecutorID().Value).
Info("gracefully exiting because framework checkpointing is not enabled")
return
}
if time.Now().Sub(disconnected) > cfg.RecoveryTimeout {
log.WithField("timeout", cfg.RecoveryTimeout).
WithField("executorId", state.executor.GetExecutorID().Value).
Error("failed to re-establish subscription with agent within recovery timeout, aborting")
return
}
log.Debug("waiting for reconnect timeout")
log.WithField("executorId", state.executor.GetExecutorID().Value).
Debug("waiting for reconnect timeout")
<-shouldReconnect // wait for some amount of time before retrying subscription
}
}
Expand Down
2 changes: 2 additions & 0 deletions executor/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func handleMessageEvent(state *internalState, data []byte) (err error) {
"message": string(data[:]),
"error": err.Error(),
}).
WithField("executorId", state.executor.GetExecutorID().Value).
Error("no task for incoming MESSAGE")
return
}
Expand Down Expand Up @@ -168,6 +169,7 @@ func handleMessageEvent(state *internalState, data []byte) (err error) {
"message": string(data[:]),
"error": err.Error(),
}).
WithField("executorId", state.executor.GetExecutorID().Value).
Error("no task for incoming MESSAGE")
return
}
Expand Down

0 comments on commit 3da3af0

Please sign in to comment.