diff --git a/core/task/scheduler.go b/core/task/scheduler.go index 3ade292f..a736fe4c 100644 --- a/core/task/scheduler.go +++ b/core/task/scheduler.go @@ -627,20 +627,20 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han targetExecutorId.Value = uid.New().String() log.WithField("executorId", targetExecutorId.Value). WithField("offerHost", offer.GetHostname()). - WithField("level", infologger.IL_Devel). + WithField("level", infologger.IL_Support). Info("received offer without executor ID, will start new executor if accepted") } else { targetExecutorId.Value = offer.ExecutorIDs[0].Value if len(offer.ExecutorIDs) == 1 { log.WithField("executorId", targetExecutorId.Value). WithField("offerHost", offer.GetHostname()). - WithField("level", infologger.IL_Devel). + WithField("level", infologger.IL_Support). Warn("received offer with one executor ID, will use existing executor") } else if len(offer.ExecutorIDs) > 1 { log.WithField("executorId", targetExecutorId.Value). WithField("executorIds", offer.ExecutorIDs). WithField("offerHost", offer.GetHostname()). - WithField("level", infologger.IL_Devel). + WithField("level", infologger.IL_Support). Warn("received offer with more than one executor ID, will use first one") } } @@ -965,6 +965,7 @@ func (state *schedulerState) resourceOffers(fidStore store.Singleton) events.Han WithField("detector", detector). WithField("level", infologger.IL_Support). WithField("offerHost", offer.Hostname). + WithField("executorId", targetExecutorId.Value). Infof("launch request sent to %s: %d tasks", offer.Hostname, n) for _, taskInfo := range taskInfosToLaunchForCurrentOffer { log.WithPrefix("scheduler"). diff --git a/executor/executable/task.go b/executor/executable/task.go index c24cd70b..337be37f 100644 --- a/executor/executable/task.go +++ b/executor/executable/task.go @@ -107,8 +107,10 @@ func NewTask(taskInfo mesos.TaskInfo, sendStatusFunc SendStatusFunc, sendDeviceE log.WithField("level", infologger.IL_Support). WithField("partition", envId.String()). WithField("detector", detector). - WithField("level", infologger.IL_Devel). - Infof("launching task %s: %s", taskInfo.TaskID.GetValue(), rawCommand) + Infof("launching task %s on executorId %s: %s", + taskInfo.TaskID.GetValue(), + taskInfo.GetExecutor().GetExecutorID().Value, + rawCommand) } else { if err != nil { log.WithError(err). diff --git a/executor/executor.go b/executor/executor.go index fa16cb14..3ba4989a 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -168,7 +168,8 @@ func Run(cfg config.Config) { // tasks and updates are empty. subscribe := calls.Subscribe(unacknowledgedTasks(state), unacknowledgedUpdates(state)) - log.Debug("subscribing to agent for events") + log.WithField("executorId", state.executor.GetExecutorID().Value). + Debug("subscribing to agent for events") // ↓ empty context ↓ we get a plain RequestFunc from the executor.Call value resp, err := subscriber.Send(context.TODO(), calls.NonStreaming(subscribe)) if resp != nil { @@ -176,35 +177,44 @@ func Run(cfg config.Config) { } if err == nil { log.WithField("level", infologger.IL_Support). + WithField("executorId", state.executor.GetExecutorID().Value). Info("executor subscribed, ready to receive events") // We're officially connected, start decoding events err = eventLoop(state, resp, handler) // If we're out of the eventLoop, means a disconnect happened, willingly or not. disconnected = time.Now() - log.Debug("event loop finished") + log.WithField("level", infologger.IL_Support). + WithField("executorId", state.executor.GetExecutorID().Value). + Info("event loop finished") } if err != nil && err != io.EOF { log.WithField("error", err). + WithField("executorId", state.executor.GetExecutorID().Value). Error("executor disconnected with error") } else { - log.Info("executor disconnected") + log.WithField("executorId", state.executor.GetExecutorID().Value). + Info("executor disconnected") } }() if state.shouldQuit { - log.Info("gracefully shutting down on request") + log.WithField("executorId", state.executor.GetExecutorID().Value). + Info("gracefully shutting down on request") return } // The purpose of checkpointing is to handle recovery when mesos-agent exits. if !cfg.Checkpoint { - log.Info("gracefully exiting because framework checkpointing is not enabled") + log.WithField("executorId", state.executor.GetExecutorID().Value). + Info("gracefully exiting because framework checkpointing is not enabled") return } if time.Now().Sub(disconnected) > cfg.RecoveryTimeout { log.WithField("timeout", cfg.RecoveryTimeout). + WithField("executorId", state.executor.GetExecutorID().Value). Error("failed to re-establish subscription with agent within recovery timeout, aborting") return } - log.Debug("waiting for reconnect timeout") + log.WithField("executorId", state.executor.GetExecutorID().Value). + Debug("waiting for reconnect timeout") <-shouldReconnect // wait for some amount of time before retrying subscription } } diff --git a/executor/handlers.go b/executor/handlers.go index 4ad30f26..8f5c16ac 100644 --- a/executor/handlers.go +++ b/executor/handlers.go @@ -80,6 +80,7 @@ func handleMessageEvent(state *internalState, data []byte) (err error) { "message": string(data[:]), "error": err.Error(), }). + WithField("executorId", state.executor.GetExecutorID().Value). Error("no task for incoming MESSAGE") return } @@ -168,6 +169,7 @@ func handleMessageEvent(state *internalState, data []byte) (err error) { "message": string(data[:]), "error": err.Error(), }). + WithField("executorId", state.executor.GetExecutorID().Value). Error("no task for incoming MESSAGE") return }