From 61732eca4fdb53e028ef28fe649fe1e850c39a29 Mon Sep 17 00:00:00 2001 From: Spencer Ferris <3319370+spencewenski@users.noreply.github.com> Date: Fri, 26 Apr 2024 00:42:12 -0700 Subject: [PATCH] Use a `JoinSet` to manage the processor's tasks instead of a `Vec` With a `Vec`, the handles are awaited in the order they are inserted. This means that if a tasks exits with an error, it won't be logged until the tasks ahead of it in the `Vec` complete, which is typically not until the entire application exits. Tokio provides an alternative structure to track tasks -- a `JoinSet`. When iterating over a `JoinSet`, task results are returned in the order that the tasks are completed instead of the order they're spawned. This could potentially be used to optionally shutdown the entire processor if any of its tasks exit with an error (this would be a new config). --- src/processor.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/processor.rs b/src/processor.rs index 0d5f450..85f3bf8 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -6,6 +6,7 @@ use crate::{ use std::collections::BTreeMap; use std::sync::Arc; use tokio::select; +use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; @@ -202,11 +203,11 @@ impl Processor { /// Takes self to consume the processor. This is for life-cycle management, not /// memory safety because you can clone processor pretty easily. pub async fn run(self) { - let mut handles = vec![]; + let mut join_set: JoinSet<()> = JoinSet::new(); // Start worker routines. for i in 0..self.config.num_workers { - handles.push(tokio::spawn({ + join_set.spawn({ let mut processor = self.clone(); let cancellation_token = self.cancellation_token.clone(); @@ -223,11 +224,11 @@ impl Processor { debug!("Broke out of loop for worker {}", i); } - })); + }); } // Start sidekiq-web metrics publisher. - handles.push(tokio::spawn({ + join_set.spawn({ let redis = self.redis.clone(); let queues = self.human_readable_queues.clone(); let busy_jobs = self.busy_jobs.clone(); @@ -257,10 +258,10 @@ impl Processor { debug!("Broke out of loop web metrics"); } - })); + }); // Start retry and scheduled routines. - handles.push(tokio::spawn({ + join_set.spawn({ let redis = self.redis.clone(); let cancellation_token = self.cancellation_token.clone(); async move { @@ -283,10 +284,10 @@ impl Processor { debug!("Broke out of loop for retry and scheduled"); } - })); + }); // Watch for periodic jobs and enqueue jobs. - handles.push(tokio::spawn({ + join_set.spawn({ let redis = self.redis.clone(); let cancellation_token = self.cancellation_token.clone(); async move { @@ -308,11 +309,11 @@ impl Processor { debug!("Broke out of loop for periodic"); } - })); + }); - for handle in handles { - if let Err(err) = handle.await { - error!("Processor had a spawned task return an error: {}", &err); + while let Some(result) = join_set.join_next().await { + if let Err(err) = result { + error!("Processor had a spawned task return an error: {}", err); } } }