Skip to content

Commit

Permalink
Merge pull request #45 from spencewenski/processor-joinset
Browse files Browse the repository at this point in the history
Use a `JoinSet` to manage the processor's tasks instead of a `Vec`
  • Loading branch information
film42 authored Apr 26, 2024
2 parents a91324f + 61732ec commit 7dc968e
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::select;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};

Expand Down Expand Up @@ -202,11 +203,11 @@ impl Processor {
/// Takes self to consume the processor. This is for life-cycle management, not
/// memory safety because you can clone processor pretty easily.
pub async fn run(self) {
let mut handles = vec![];
let mut join_set: JoinSet<()> = JoinSet::new();

// Start worker routines.
for i in 0..self.config.num_workers {
handles.push(tokio::spawn({
join_set.spawn({
let mut processor = self.clone();
let cancellation_token = self.cancellation_token.clone();

Expand All @@ -223,11 +224,11 @@ impl Processor {

debug!("Broke out of loop for worker {}", i);
}
}));
});
}

// Start sidekiq-web metrics publisher.
handles.push(tokio::spawn({
join_set.spawn({
let redis = self.redis.clone();
let queues = self.human_readable_queues.clone();
let busy_jobs = self.busy_jobs.clone();
Expand Down Expand Up @@ -257,10 +258,10 @@ impl Processor {

debug!("Broke out of loop web metrics");
}
}));
});

// Start retry and scheduled routines.
handles.push(tokio::spawn({
join_set.spawn({
let redis = self.redis.clone();
let cancellation_token = self.cancellation_token.clone();
async move {
Expand All @@ -283,10 +284,10 @@ impl Processor {

debug!("Broke out of loop for retry and scheduled");
}
}));
});

// Watch for periodic jobs and enqueue jobs.
handles.push(tokio::spawn({
join_set.spawn({
let redis = self.redis.clone();
let cancellation_token = self.cancellation_token.clone();
async move {
Expand All @@ -308,11 +309,11 @@ impl Processor {

debug!("Broke out of loop for periodic");
}
}));
});

for handle in handles {
if let Err(err) = handle.await {
error!("Processor had a spawned task return an error: {}", &err);
while let Some(result) = join_set.join_next().await {
if let Err(err) = result {
error!("Processor had a spawned task return an error: {}", err);
}
}
}
Expand Down

0 comments on commit 7dc968e

Please sign in to comment.