From 3ee8b6c5cbd03ff089b129366830f5613411a497 Mon Sep 17 00:00:00 2001 From: wildonion Date: Thu, 5 Sep 2024 09:18:42 +0330 Subject: [PATCH] some exp added --- src/tests/actor.rs | 10 ++++ src/tests/task.rs | 88 ++++++++++++++++++++++++++++++------ src/tests/tx.rs | 22 ++++++++- src/workers/notif/mod.rs | 23 ++++++++-- src/workers/scheduler/mod.rs | 24 ++++++++-- 5 files changed, 143 insertions(+), 24 deletions(-) diff --git a/src/tests/actor.rs b/src/tests/actor.rs index 1e9ba89..bbe267c 100644 --- a/src/tests/actor.rs +++ b/src/tests/actor.rs @@ -8,6 +8,16 @@ https://medium.com/@maturationofthe/leveraging-rusts-tokio-library-for-asynchronous-actor-model-cf6d477afb19 https://www.reddit.com/r/rust/comments/xec77k/rayon_or_tokio_for_heavy_filesystem_io_workloads/ + + What is an Actor? + Actor is a threadpool or a single threaded structure which has its own mailbox and cron scheduler + to receive and execute tasks inside its thread of execution it can use tokio or os threads to execute + async io or cpu tasks, they talk through message sending patterns like mpsc in local and grpc remotely + the message or task execution can be happened by receiving the task from the actor eventloop which is + the receiver of the mailbox jobq mpsc channel in local or the grpc remotely then execute in a free + thread if it has threadpool or its own single thread of execution, the thread can be either a light + or os thread for executing async io or intensive cpu tasks. + runtime takes the async task and put it inside the thread queue then at an appropriate time the scheduler pop the task out of the thread queue to execute it and if a thread is free it tries to steal diff --git a/src/tests/task.rs b/src/tests/task.rs index 0815cb1..cff0287 100644 --- a/src/tests/task.rs +++ b/src/tests/task.rs @@ -1,7 +1,7 @@ -use std::sync::atomic::AtomicU64; +use std::{ops::DerefMut, sync::atomic::{AtomicU64, AtomicUsize}}; use crate::*; use interfaces::task::TaskExt; use tracing::span_enabled; @@ -12,8 +12,8 @@ use types::Job; /* async tasks or jobs that will be executed in the background inside a lightweight thread of execution using tokio::spawn() task scheduler and jobq based channels; actor workers will run - these tasks in their own execution context. - typically any instance of the Task must contains: + these tasks in their own execution context like what i've simulated here. + typically any instance of the Task which is kina actor must contains: - the future job itself - the sender to send the result of executed task to the channel for using outside of the thread - a thread safe (Mutex) worker as the background worker thread to execute the future job in it @@ -25,14 +25,25 @@ use types::Job; then in switching the task or doing other heavy process check the lock that if the instance is already locked or not also we should lock the worker if we want to execute something in the background worker of the instance thread to tell obj caller that the worker is busy rn. -*/ -// future object -// job tree to push the job into the current tree -// sender to broadcast or publish some data to a channel -// an eventloop to receive a data from the channel or the queue to execute it in the background worker thread -// background worker to run the job -// locker to lock the task instance when a task is being executed + more details: + locker, threadpool, worker, future io task, eventloop, + sender, signal condvar, job tree, dep inj future job: + - cron scheduler method + - execute in worker method + - receive from eventloop and exec in threadpool method + - instance locker method + + future object + job tree to push the job into the current tree + sender to broadcast or publish some data to a channel + an eventloop to receive a data from the channel or the queue to execute it in the background worker thread + background worker to run the job + locker to lock the task instance when a task is being executed + worker thread of type joinHandle to execute task or job of type async io or cpu tasks + threadpool to execute each task when receives them from mpsc recevier eventloop + atomic syncing with channels and mutex +*/ // #[derive(Debug)] // don't implement this cause Pin doesn't implement Debug pub struct Task, S, O> where // J is a Future object and must be executed with Box::pin(job); J: std::future::Future + Send + Sync + 'static + Clone, @@ -54,18 +65,26 @@ pub struct Task, S, O> where // J is a Future pub job: J, pub job_tree: Vec>, pub sender: tokio::sync::mpsc::Sender, // use this to send the result of the task into the channel to share between other lightweight thread workers - pub eventloop: std::sync::Arc>>, // use this as eventloop to execute tasks as they're coming from the channel in the background worker thread + pub eventloop_queue: std::sync::Arc>>, // use this as eventloop to execute tasks as they're coming from the channel in the background worker thread + pub pool: Vec>, pub worker: std::sync::Mutex>, // execute the task inside the background worker, this is a thread which is safe to be mutated in other threads pub lock: std::sync::Mutex<()>, // the task itself is locked and can't be used by other threads } +// thread safe eventloop and queue: arc mutex vec T vs arc mutex receiver T +pub struct QueueAndEventLoop{ + pub eventloop: std::sync::Arc>>, + pub queue: std::sync::Arc>>, +} + + impl + Send + Sync + 'static + Clone, S: Sync + Send + 'static> Task where O: std::any::Any + Send + Sync + 'static{ - pub async fn new(job: J, + pub async fn new(job: J, num_threads: usize, sender: tokio::sync::mpsc::Sender, - eventloop: std::sync::Arc>>, + eventloop_queue: std::sync::Arc>>, fut_output: O) -> Self{ // sender and receiver @@ -77,7 +96,12 @@ impl + Send + Sync + 'static + Clone, S: S job: job.clone(), dep_injection_fut_obj: Box::pin(async move{ fut_output }), // pinning the future into the ram with the output of type O sender, - eventloop, + pool: { + (0..num_threads) + .map(|_| tokio::spawn(job.clone())) + .collect::>>() + }, + eventloop_queue, job_tree: vec![], worker: { // this is the worker that can execute the task inside of itself, it's basically a lightweight thread std::sync::Mutex::new( // lock the worker @@ -148,6 +172,42 @@ impl + Send + Sync + 'static + Clone, S: S self.status = TaskStatus::Hanlted; } + /* ------------------------------------------------------------- + since futures are object safe trait hence they have all traits + features we can pass them to the functions in an static or dynamic + dispatch way using Arc or Box or impl Future or event as the return + type of a closure trait method: + std::pin::Pin + Send + Sync + 'static> + Arc R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static + Box R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static + Arc R + Send + Sync + 'static>> where R: std::future::Future + Send + Sync + 'static + F: std::future::Future + Send + Sync + 'static + param: impl std::future::Future + Send + Sync + 'static + + NOTE: mutex requires the type to be Sized and since traits are + not sized at compile time we should annotate them with dyn keyword + and put them behind a pointer with valid lifetime or Box and Arc smart pointers + so for the mutexed_job we must wrap the whole mutex inside an Arc or annotate it + with something like &'valid tokio::sync::Mutex R + Send + Sync + 'static> + the reason is that Mutex is a guard and not an smart pointer which can hanlde + an automatic pointer with lifetime + */ + pub async fn cron_scheduler(&mut self, + boxed_job: Box R + Send + Sync + 'static>, + mutexed_job: std::sync::Arc R + Send + Sync + 'static>>, + arced_job: std::sync::Arc R + Send + Sync + 'static>) + where + R: std::future::Future + Send + Sync + 'static{ + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); + tokio::spawn(async move{ + loop{ + interval.tick().await; + arced_job().await; + } + }); + } + } // once the task gets dropped drop any incomplete futures inside the worker diff --git a/src/tests/tx.rs b/src/tests/tx.rs index 5f732e9..790c6d8 100644 --- a/src/tests/tx.rs +++ b/src/tests/tx.rs @@ -490,8 +490,28 @@ pub async fn solidDesignPattern(){ dep injection : combine all of the aboves pass different types to method through a single interface using trait - handling dep injection using Box + Send + Sync + 'static> + Arc R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static + Box R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static + Arc R + Send + Sync + 'static>> where R: std::future::Future + Send + Sync + 'static + F: std::future::Future + Send + Sync + 'static + param: impl std::future::Future + Send + Sync + 'static + + NOTE: mutex requires the type to be Sized and since traits are + not sized at compile time we should annotate them with dyn keyword + and put them behind a pointer with valid lifetime or Box and Arc smart pointers + so for the mutexed_job we must wrap the whole mutex inside an Arc or annotate it + with something like &'valid tokio::sync::Mutex R + Send + Sync + 'static> + the reason is that Mutex is a guard and not an smart pointer which can hanlde + an automatic pointer with lifetime */ // traits: diff --git a/src/workers/notif/mod.rs b/src/workers/notif/mod.rs index b8f4f8c..5b35288 100644 --- a/src/workers/notif/mod.rs +++ b/src/workers/notif/mod.rs @@ -7,7 +7,7 @@ thread safe eventloop queue : the receiver of each broker or the mpsc channel like Arc>> queue : buffer of thread safe event objects like Buffer{data:Arc>>} syntax : while let Some(notif_data) = mpscReceiverEventloopOrRmqOrKafkaOrRedisSubscriber.recv().await{} - CronScheduler : an actor background worker to call io periodically using ctx.run_interval(), loop{} and tokio time and spawn or redis key space notifications pubsub + CronSchedulerActor : an actor background worker to call io periodically using ctx.run_interval(), loop{} and tokio time and spawn or redis key space notifications pubsub storing and caching : store and cache received notif data in pg db and on redis node talking : local talking with mailbox & remote talking with (JSON/G/Capnp)RPC websocket config : send received notif data through the ws mpsc sender / receive notif data in ws server scope @@ -18,6 +18,7 @@ → transactionpool service prodcons actor => receive transaction object → product service prodcons actor => receive order object to purchase atomically concurrency tools & notes : + → an eventloop is a thread safe receiver queue of the mpsc channel which receives tasks and execute them in free background thread → tokio::select, tokio::spawn, tokio::sync::{Mutex, mpsc, RwLock}, std::sync::{Condvar, Arc, Mutex} → cpu tasks are graph and geo calculations as well as cryptography algorithms which are resource intensive → async io tasks are io and networking calls which must be handled simultaneously in order to scale resources @@ -33,6 +34,7 @@ → std stuffs block and suspend the thread and stop it from executing other tasks while it doing some heavy operations inside the thread like mutex logics → tokio stuffs suspend the async io task process instead of blocking the thread and allows the thread executing other tasks simultaneously → use channels for atomic syncing between threads instead of using mutex in both async and none async context + → if we want some result of an either async io or cpu task we have the options of either using of mutex, channels or joining on the thread (would block cpu threads) → as soon as the future or async io task is ready to yeild a value the runtime meanwhile of handling other tasks would notify the caller about the result → as soon as the the result of the task is ready to be returned from the os thread the os thread will be stopped blocking and continue with executing other tasks → actors have their own os or ligh thread of execution which uses to spawn tasks they've received via message passing channels or mailbox @@ -40,10 +42,21 @@ → initialize storage and actors data structures once and pack them in AppContext struct then share this between threads ======================================================================================== - a sexy actor to produce/consume messages from different type of brokers - it uses RMQ, Redis and Kafka to produce and consume massive messages in - realtime, kindly it supports data AES256 encryption through producing - messages to the broker. + a sexy actor to produce/consume messages from different type of brokers + it uses RMQ, Redis and Kafka to produce and consume massive messages in + realtime, kindly it supports data AES256 encryption through producing + messages to the broker. + + brokering is all about queueing, sending and receiving messages way more faster, + safer and reliable than a simple eventloop or a tcp based channel. + In rmq producer sends message to exchange the a consumer can bind its queue to + the exchange to receive the messages, routing key determines the pattern of receive + messages inside the bounded queue from the exchange + In kafka producer sends messages to topic the consumer can receives data from + the topic, Rmq adds an extra layer on top of msg handling logic which is creating + queue per each consumer. + Offset in kafka is an strategy which determines the way of tracking the sequential + order of receiving messages by kafka topics it’s like routing key in rmq BROKER TYPES: diff --git a/src/workers/scheduler/mod.rs b/src/workers/scheduler/mod.rs index c89e603..ce607f6 100644 --- a/src/workers/scheduler/mod.rs +++ b/src/workers/scheduler/mod.rs @@ -56,10 +56,26 @@ impl CronScheduler{ } } - // task is an io future job of one of the following types: - // std::pin::Pin + Send + Sync + 'static> - // Arc R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static - // F: std::future::Future + Send + Sync + 'static + /* ------------------------------------------------------------- + since futures are object safe trait hence they have all traits + features we can pass them to the functions in an static or dynamic + dispatch way using Arc or Box or impl Future or event as the return + type of a closure trait method: + std::pin::Pin + Send + Sync + 'static> + Arc R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static + Box R + Send + Sync + 'static> where R: std::future::Future + Send + Sync + 'static + Arc R + Send + Sync + 'static>> where R: std::future::Future + Send + Sync + 'static + F: std::future::Future + Send + Sync + 'static + param: impl std::future::Future + Send + Sync + 'static + + NOTE: mutex requires the type to be Sized and since traits are + not sized at compile time we should annotate them with dyn keyword + and put them behind a pointer with valid lifetime or Box and Arc smart pointers + so for the mutexed_job we must wrap the whole mutex inside an Arc or annotate it + with something like &'valid tokio::sync::Mutex R + Send + Sync + 'static> + the reason is that Mutex is a guard and not an smart pointer which can hanlde + an automatic pointer with lifetime + */ pub async fn startCronScheduler(seconds: u64, // make the future cloneable in each iteration and tokio scope // as well as safe to be shared between threads