Skip to content

Commit

Permalink
Documenting worker pool (#104)
Browse files Browse the repository at this point in the history
* Documenting worker pool

* Update src/asynk/async_worker_pool.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_worker_pool.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_worker_pool.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_worker_pool.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_worker_pool.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* move comment to worker

* documenting blocking module

* Update src/asynk/async_worker.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_worker_pool.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/blocking/queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/blocking/queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_worker.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>
  • Loading branch information
pxp9 and ayrat555 authored Dec 22, 2022
1 parent aa4cd3f commit 01934e2
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 16 deletions.
11 changes: 4 additions & 7 deletions src/asynk/async_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{RetentionMode, SleepParams};
use log::error;
use typed_builder::TypedBuilder;

/// it executes tasks only of task_type type, it sleeps when there are no tasks in the queue
#[derive(TypedBuilder)]
pub struct AsyncWorker<AQueue>
where
Expand All @@ -28,11 +29,7 @@ impl<AQueue> AsyncWorker<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
pub async fn run(
&mut self,
task: Task,
runnable: Box<dyn AsyncRunnable>,
) -> Result<(), FangError> {
async fn run(&mut self, task: Task, runnable: Box<dyn AsyncRunnable>) -> Result<(), FangError> {
let result = runnable.run(&mut self.queue).await;

match result {
Expand Down Expand Up @@ -86,13 +83,13 @@ where
Ok(())
}

pub async fn sleep(&mut self) {
async fn sleep(&mut self) {
self.sleep_params.maybe_increase_sleep_period();

tokio::time::sleep(self.sleep_params.sleep_period).await;
}

pub async fn run_tasks(&mut self) -> Result<(), FangError> {
pub(crate) async fn run_tasks(&mut self) -> Result<(), FangError> {
loop {
//fetch task
match self
Expand Down
13 changes: 10 additions & 3 deletions src/asynk/async_worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
#[builder(setter(into))]
/// the AsyncWorkerPool uses a queue to control the tasks that will be executed.
pub queue: AQueue,
/// sleep_params controls how much time a worker will sleep while waiting for tasks
#[builder(default, setter(into))]
pub sleep_params: SleepParams,
/// retention_mode controls if tasks should be persisted after execution
#[builder(default, setter(into))]
pub retention_mode: RetentionMode,
/// the number of workers of the AsyncWorkerPool.
#[builder(setter(into))]
pub number_of_workers: u32,
/// The type of tasks that will be executed by `AsyncWorkerPool`.
#[builder(default=DEFAULT_TASK_TYPE.to_string(), setter(into))]
pub task_type: String,
}
Expand All @@ -29,6 +34,8 @@ impl<AQueue> AsyncWorkerPool<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
/// Starts the configured number of workers
/// This is necessary in order to execute tasks.
pub async fn start(&mut self) {
for idx in 0..self.number_of_workers {
let pool = self.clone();
Expand All @@ -37,7 +44,7 @@ where
}

#[async_recursion]
pub async fn supervise_task(pool: AsyncWorkerPool<AQueue>, restarts: u64, worker_number: u32) {
async fn supervise_task(pool: AsyncWorkerPool<AQueue>, restarts: u64, worker_number: u32) {
let restarts = restarts + 1;
let join_handle = Self::spawn_worker(
pool.queue.clone(),
Expand All @@ -56,7 +63,7 @@ where
}
}

pub async fn spawn_worker(
async fn spawn_worker(
queue: AQueue,
sleep_params: SleepParams,
retention_mode: RetentionMode,
Expand All @@ -66,7 +73,7 @@ where
Self::run_worker(queue, sleep_params, retention_mode, task_type).await
})
}
pub async fn run_worker(
async fn run_worker(
queue: AQueue,
sleep_params: SleepParams,
retention_mode: RetentionMode,
Expand Down
39 changes: 39 additions & 0 deletions src/blocking/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,46 @@ impl From<cron::error::Error> for QueueError {
}
}

/// This trait defines operations for a synchronous queue.
/// The trait can be implemented for different storage backends.
/// For now, the trait is only implemented for PostgreSQL. More backends are planned to be implemented in the future.
pub trait Queueable {
/// This method should retrieve one task of the `task_type` type. If `task_type` is `None` it will try to
/// fetch a task of the type `common`. After fetching it should update the state of the task to
/// `FangTaskState::InProgress`.
fn fetch_and_touch_task(&self, task_type: String) -> Result<Option<Task>, QueueError>;

/// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type
/// created by an `WorkerPool`.
fn insert_task(&self, params: &dyn Runnable) -> Result<Task, QueueError>;

/// The method will remove all tasks from the queue
fn remove_all_tasks(&self) -> Result<usize, QueueError>;

/// Remove all tasks that are scheduled in the future.
fn remove_all_scheduled_tasks(&self) -> Result<usize, QueueError>;

/// Removes all tasks that have the specified `task_type`.
fn remove_tasks_of_type(&self, task_type: &str) -> Result<usize, QueueError>;

/// Remove a task by its id.
fn remove_task(&self, id: Uuid) -> Result<usize, QueueError>;

/// To use this function task has to be uniq. uniq() has to return true.
/// If task is not uniq this function will not do anything.
/// Remove a task by its metadata (struct fields values)
fn remove_task_by_metadata(&self, task: &dyn Runnable) -> Result<usize, QueueError>;

fn find_task_by_id(&self, id: Uuid) -> Option<Task>;

/// Update the state field of the specified task
/// See the `FangTaskState` enum for possible states.
fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result<Task, QueueError>;

/// Update the state of a task to `FangTaskState::Failed` and set an error_message.
fn fail_task(&self, task: &Task, error: &str) -> Result<Task, QueueError>;

/// Schedule a task.
fn schedule_task(&self, task: &dyn Runnable) -> Result<Task, QueueError>;

fn schedule_retry(
Expand All @@ -117,6 +134,27 @@ pub trait Queueable {
) -> Result<Task, QueueError>;
}

/// An async queue that can be used to enqueue tasks.
/// It uses a PostgreSQL storage. It must be connected to perform any operation.
/// To connect a `Queue` to the PostgreSQL database call the `get_connection` method.
/// A Queue can be created with the TypedBuilder.
///
/// ```rust
/// // Set DATABASE_URL enviroment variable if you would like to try this function.
/// pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<PgConnection>> {
/// let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
///
/// let manager = r2d2::ConnectionManager::<PgConnection>::new(database_url);
///
/// r2d2::Pool::builder()
/// .max_size(pool_size)
/// .build(manager)
/// .unwrap()
/// }
///
/// let queue = Queue::builder().connection_pool(connection_pool(3)).build();
/// ```
///
#[derive(Clone, TypedBuilder)]
pub struct Queue {
#[builder(setter(into))]
Expand Down Expand Up @@ -208,6 +246,7 @@ impl Queueable for Queue {
}

impl Queue {
/// Connect to the db if not connected
pub fn get_connection(&self) -> Result<PoolConnection, QueueError> {
let result = self.connection_pool.get();

Expand Down
22 changes: 22 additions & 0 deletions src/blocking/runnable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,48 @@ use crate::Scheduled;
pub const COMMON_TYPE: &str = "common";
pub const RETRIES_NUMBER: i32 = 20;

/// Implement this trait to run your custom tasks.
#[typetag::serde(tag = "type")]
pub trait Runnable {
/// Execute the task. This method should define its logic
fn run(&self, _queueable: &dyn Queueable) -> Result<(), FangError>;

/// Define the type of the task.
/// The `common` task type is used by default
fn task_type(&self) -> String {
COMMON_TYPE.to_string()
}

/// If set to true, no new tasks with the same metadata will be inserted
/// By default it is set to false.
fn uniq(&self) -> bool {
false
}

/// This method defines if a task is periodic or it should be executed once in the future.
///
/// Be careful it works only with the UTC timezone.
/**
```rust
fn cron(&self) -> Option<Scheduled> {
let expression = "0/20 * * * Aug-Sep * 2022/1";
Some(Scheduled::CronPattern(expression.to_string()))
}
```
*/
/// In order to schedule a task once, use the `Scheduled::ScheduleOnce` enum variant.
fn cron(&self) -> Option<Scheduled> {
None
}

/// Define the maximum number of retries the task will be retried.
/// By default the number of retries is 20.
fn max_retries(&self) -> i32 {
RETRIES_NUMBER
}

/// Define the backoff mode
/// By default, it is exponential, 2^(attempt)
fn backoff(&self, attempt: u32) -> u32 {
u32::pow(2, attempt)
}
Expand Down
6 changes: 4 additions & 2 deletions src/blocking/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use log::error;
use std::thread;
use typed_builder::TypedBuilder;

/// A executioner of tasks, it executes tasks only of one given task_type, it sleeps when they are
/// not tasks to be executed.
#[derive(TypedBuilder)]
pub struct Worker<BQueue>
where
Expand Down Expand Up @@ -52,7 +54,7 @@ where
}
}

pub fn run_tasks(&mut self) -> Result<(), FangError> {
pub(crate) fn run_tasks(&mut self) -> Result<(), FangError> {
loop {
match self.queue.fetch_and_touch_task(self.task_type.clone()) {
Ok(Some(task)) => {
Expand Down Expand Up @@ -114,7 +116,7 @@ where
self.sleep_params.maybe_reset_sleep_period();
}

pub fn sleep(&mut self) {
fn sleep(&mut self) {
self.sleep_params.maybe_increase_sleep_period();

thread::sleep(self.sleep_params.sleep_period);
Expand Down
16 changes: 12 additions & 4 deletions src/blocking/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@ pub struct WorkerPool<BQueue>
where
BQueue: Queueable + Clone + Sync + Send + 'static,
{
#[builder(setter(into))]
pub number_of_workers: u32,
/// the AsyncWorkerPool uses a queue to control the tasks that will be executed.
#[builder(setter(into))]
pub queue: BQueue,
#[builder(setter(into), default)]
pub task_type: String,
/// sleep_params controls how much time a worker will sleep while waiting for tasks
/// execute.
#[builder(setter(into), default)]
pub sleep_params: SleepParams,
/// retention_mode controls if tasks should be persisted after execution
#[builder(setter(into), default)]
pub retention_mode: RetentionMode,
/// the number of workers of the AsyncWorkerPool.
#[builder(setter(into))]
pub number_of_workers: u32,
/// The type of tasks that will be executed by `AsyncWorkerPool`.
#[builder(setter(into), default)]
pub task_type: String,
}

#[derive(Clone, TypedBuilder)]
Expand All @@ -46,6 +52,8 @@ impl<BQueue> WorkerPool<BQueue>
where
BQueue: Queueable + Clone + Sync + Send + 'static,
{
/// Starts the configured number of workers
/// This is necessary in order to execute tasks.
pub fn start(&mut self) -> Result<(), FangError> {
for idx in 1..self.number_of_workers + 1 {
let name = format!("worker_{}{}", self.task_type, idx);
Expand Down

0 comments on commit 01934e2

Please sign in to comment.