Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Heartbeat compatibility, Add HeartBeatLastPushedAt to Manager Stats #85

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions api_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workers
import (
"encoding/json"
"net/http"
"time"
)

func (s *apiServer) Stats(w http.ResponseWriter, req *http.Request) {
Expand All @@ -24,14 +25,15 @@ func (s *apiServer) Stats(w http.ResponseWriter, req *http.Request) {
enc.Encode(allStats)
}

// Stats containts current stats for a manager
// Stats contains current stats for a manager
type Stats struct {
Name string `json:"manager_name"`
Processed int64 `json:"processed"`
Failed int64 `json:"failed"`
Jobs map[string][]JobStatus `json:"jobs"`
Enqueued map[string]int64 `json:"enqueued"`
RetryCount int64 `json:"retry_count"`
Name string `json:"manager_name"`
Processed int64 `json:"processed"`
Failed int64 `json:"failed"`
Jobs map[string][]JobStatus `json:"jobs"`
Enqueued map[string]int64 `json:"enqueued"`
RetryCount int64 `json:"retry_count"`
HeartbeatLastPushedAt time.Time `json:"heartbeat_last_pushed_at"`
}

// JobStatus contains the status and data for active jobs of a manager
Expand Down
37 changes: 37 additions & 0 deletions heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func GenerateProcessNonce() (string, error) {

func (m *Manager) buildHeartbeat(heartbeatTime time.Time, ttl time.Duration) (*storage.Heartbeat, error) {
queues := []string{}
msgs := map[string]string{}

concurrency := 0
busy := 0
Expand All @@ -67,6 +68,41 @@ func (m *Manager) buildHeartbeat(heartbeatTime time.Time, ttl time.Duration) (*s

w.runnersLock.Lock()
for _, r := range w.runners {

msg := r.inProgressMessage()
if msg == nil {
continue
}

workerMsg := &HeartbeatWorkerMsg{
Retry: 1,
Queue: w.queue,
Backtrace: false,
Class: msg.Class(),
Args: msg.Args(),
Jid: msg.Jid(),
CreatedAt: msg.startedAt, // not actually started at
EnqueuedAt: time.Now().UTC().Unix(),
}

jsonMsg, err := json.Marshal(workerMsg)
if err != nil {
return nil, err
}

wrapper := &HeartbeatWorkerMsgWrapper{
Queue: w.queue,
Payload: string(jsonMsg),
RunAt: msg.startedAt,
}

jsonWrapper, err := json.Marshal(wrapper)
if err != nil {
return nil, err
}

msgs[r.tid] = string(jsonWrapper)

workerHeartbeat := storage.WorkerHeartbeat{
Pid: pid,
Tid: r.tid,
Expand Down Expand Up @@ -125,6 +161,7 @@ func (m *Manager) buildHeartbeat(heartbeatTime time.Time, ttl time.Duration) (*s
ActiveManager: m.IsActive(),
WorkerHeartbeats: workerHeartbeats,
Ttl: ttl,
WorkerMessages: msgs,
}
if m.opts.Heartbeat != nil && m.opts.Heartbeat.PrioritizedManager != nil {
heartbeat.ManagerPriority = m.opts.Heartbeat.PrioritizedManager.ManagerPriority
Expand Down
69 changes: 38 additions & 31 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@ import (

// Manager coordinates work, workers, and signaling needed for job processing
type Manager struct {
uuid string
opts Options
schedule *scheduledWorker
workers []*worker
lock sync.Mutex
signal chan os.Signal
running bool
stop chan bool
active bool
logger *log.Logger
startedAt time.Time
processNonce string
heartbeatChannel chan bool
uuid string
opts Options
schedule *scheduledWorker
workers []*worker
lock sync.Mutex
signal chan os.Signal
running bool
stop chan bool
active bool
logger *log.Logger
startedAt time.Time
processNonce string
heartbeatChannel chan bool
heartbeatLastPushedAt time.Time

beforeStartHooks []func()
duringDrainHooks []func()
Expand Down Expand Up @@ -245,9 +246,10 @@ func (m *Manager) Producer() *Producer {
// GetStats returns the set of stats for the manager
func (m *Manager) GetStats() (Stats, error) {
stats := Stats{
Jobs: map[string][]JobStatus{},
Enqueued: map[string]int64{},
Name: m.opts.ManagerDisplayName,
Jobs: map[string][]JobStatus{},
Enqueued: map[string]int64{},
Name: m.opts.ManagerDisplayName,
HeartbeatLastPushedAt: m.heartbeatLastPushedAt,
}
var q []string

Expand Down Expand Up @@ -321,24 +323,25 @@ func (m *Manager) startHeartbeat() error {
m.logger.Println("ERR: Failed to get heartbeat time", err)
return err
}
heartbeat, err := m.sendHeartbeat(heartbeatTime)
_, err = m.sendHeartbeat(heartbeatTime)
if err != nil {
m.logger.Println("ERR: Failed to send heartbeat", err)
return err
}
expireTS := heartbeatTime.Add(-m.opts.Heartbeat.HeartbeatTTL).Unix()
staleMessageUpdates, err := m.handleAllExpiredHeartbeats(context.Background(), expireTS)
if err != nil {
m.logger.Println("ERR: error expiring heartbeat identities", err)
return err
}
for _, afterHeartbeatHook := range m.afterHeartbeatHooks {
err := afterHeartbeatHook(heartbeat, m, staleMessageUpdates)
if err != nil {
m.logger.Println("ERR: Failed to execute after heartbeat hook", err)
return err
}
}

//expireTS := heartbeatTime.Add(-m.opts.Heartbeat.HeartbeatTTL).Unix()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, what should be done with the code that was commented out? Do you recommend to move that into an new afterheartbeathook? Without that code work in progress from workers with expired heartbeats will never be enqueued back for processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to separate out the visual heartbeat ui code from the recoverable heartbeat (eg, what a node is working on) .

From what I've observed in the Sidekiq ruby behavior. (At least v6)

  • Heartbeats are for visual presentation of what a node is working on and utilize redis TTL expiration, and AFAIK are never read for consumption and resumption. Utilizing this in a mixed ruby/sidekiq environment is very useful for at-a-glance observability.

  • A specially named queue is created by each node to contain the in-progress work that is looked for upon boot, in order to properly recover a killed process. BRPOPLPUSH now BLMOVE, should be utilized to move the item from the queue into the specific worker's queue, after which if the process is killed, the specific queue is read upon boot and present items are assumed to be incomplete and should be restarted (along with the base assumption that all jobs are idempotent). This is where the additional logic that was attached to the heartbeat code could live. I will need to reread what the goals of the heartbeat hook logic are intended for (as my current mental model is more event based then poll based).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for looking into Sidekiq's behavior. When you mention "after which if the process is killed, the specific queue is read upon boot and present items are assumed to be incomplete and should be restarted," does this mean that work will resume when the corresponding manager of the queue is restarted? We have a use case where we have a cluster of managers, and if a given manager goes down with work in its active queue for a prolonged period of time, we would like for that work that to be picked up by another manager.

For what it's worth, our current stance is we are okay with deviating from an exact port of Sidekiq, and this is why we introduced a use case for reading the heartbeat. That said, if there are things we can do to maintain compatibility with the Sidekiq UI, we agree that's preferable. Is there anything with the new heartbeat changes, such as expiring via polling instead of TTL, which breaks compatibility with the UI somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand of how sidekiq works:

The current jobs that a given process is processing should be atomically moved into another data structure (not around heartbeat) that can be read upon the process being restarted.
Having the heartbeat separated from the manager's queue allows you to check for orphaned jobs easier, because if the heartbeat has expired, but a cron process discovers a LIST of jobs in memory, you can assume that those jobs should be restarted by another manager.

This also has the added benefit of maintaining compatibility with the sidekiq UI, as current jobs for a given process do not live in the same key as the heartbeat and current job details.

Looking into the docs around super_fetch there is logic around orphaned job discovery and reprocessing. I believe it will do a SCAN looking for job sets for any host where the accompanying heartbeat has expired (by ttl), as this indicates that there are orphaned jobs to be picked up.

https://github.com/sidekiq/sidekiq/wiki/Reliability#recovering-jobs

I can refactor this to a more middle ground approach, but my end goal is to allow for discovery of orphaned jobs from persistent hosts (same name upon restart) along with dynamically named hosts (eg: replicas in k8s).

//staleMessageUpdates, err := m.handleAllExpiredHeartbeats(context.Background(), expireTS)
//if err != nil {
// m.logger.Println("ERR: error expiring heartbeat identities", err)
// return err
//}
//for _, afterHeartbeatHook := range m.afterHeartbeatHooks {
// err := afterHeartbeatHook(heartbeat, m, staleMessageUpdates)
// if err != nil {
// m.logger.Println("ERR: Failed to execute after heartbeat hook", err)
// return err
// }
//}
m.heartbeatLastPushedAt = time.Now()
case <-m.heartbeatChannel:
return nil
}
Expand Down Expand Up @@ -418,10 +421,14 @@ func (m *Manager) stopHeartbeat() {
func (m *Manager) sendHeartbeat(heartbeatTime time.Time) (*storage.Heartbeat, error) {
heartbeat, err := m.buildHeartbeat(heartbeatTime, m.opts.Heartbeat.HeartbeatTTL)
if err != nil {
m.logger.Println("ERR: Failed to build heartbeat", err)
return heartbeat, err
}

err = m.opts.store.SendHeartbeat(context.Background(), heartbeat)
if err != nil {
m.logger.Println("ERR: Failed to send heartbeat", err)
}
return heartbeat, err
}

Expand Down
16 changes: 16 additions & 0 deletions storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,22 @@ func (r *redisStore) SendHeartbeat(ctx context.Context, heartbeat *Heartbeat) er
"active_manager", heartbeat.ActiveManager,
"worker_heartbeats", workerHeartbeats)

// ensure the heartbeat is automatically cleaned up
pipe.Expire(ctx, managerKey, heartbeat.Ttl)

// delete the worker key just in case our set is empty
pipe.Del(ctx, GetWorkersKey(managerKey))

// send all job message heartbeats
for tid, msg := range heartbeat.WorkerMessages {
// fake the sidekiq thread id
fakeThreadId := fmt.Sprintf("%d-%s", heartbeat.Pid, tid)
pipe.HSet(ctx, GetWorkersKey(managerKey), fakeThreadId, msg)
}

// make sure the worker is cleaned up
pipe.Expire(ctx, GetWorkersKey(managerKey), heartbeat.Ttl)

_, err = pipe.Exec(ctx)
if err != nil && err != redis.Nil {
return err
Expand Down
19 changes: 10 additions & 9 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ type Retries struct {
type Heartbeat struct {
Identity string `json:"identity"`

Beat int64 `json:"beat,string"`
Quiet bool `json:"quiet,string"`
Busy int `json:"busy,string"`
RttUS int `json:"rtt_us,string"`
RSS int64 `json:"rss,string"`
Info string `json:"info"`
Pid int `json:"pid,string"`
ManagerPriority int `json:"manager_priority,string"`
ActiveManager bool `json:"active_manager,string"`
Beat int64 `json:"beat,string"`
Quiet bool `json:"quiet,string"`
Busy int `json:"busy,string"`
RttUS int `json:"rtt_us,string"`
RSS int64 `json:"rss,string"`
Info string `json:"info"`
Pid int `json:"pid,string"`
ManagerPriority int `json:"manager_priority,string"`
ActiveManager bool `json:"active_manager,string"`
WorkerMessages map[string]string `json:"worker_messages"`

Ttl time.Duration

Expand Down