Skip to content

Commit

Permalink
some exp added
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed Sep 5, 2024
1 parent 5087c5d commit 3ee8b6c
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 24 deletions.
10 changes: 10 additions & 0 deletions src/tests/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@
https://medium.com/@maturationofthe/leveraging-rusts-tokio-library-for-asynchronous-actor-model-cf6d477afb19
https://www.reddit.com/r/rust/comments/xec77k/rayon_or_tokio_for_heavy_filesystem_io_workloads/
What is an Actor?
Actor is a threadpool or a single threaded structure which has its own mailbox and cron scheduler
to receive and execute tasks inside its thread of execution it can use tokio or os threads to execute
async io or cpu tasks, they talk through message sending patterns like mpsc in local and grpc remotely
the message or task execution can be happened by receiving the task from the actor eventloop which is
the receiver of the mailbox jobq mpsc channel in local or the grpc remotely then execute in a free
thread if it has threadpool or its own single thread of execution, the thread can be either a light
or os thread for executing async io or intensive cpu tasks.
runtime takes the async task and put it inside the thread queue
then at an appropriate time the scheduler pop the task out of the
thread queue to execute it and if a thread is free it tries to steal
Expand Down
88 changes: 74 additions & 14 deletions src/tests/task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@


use std::sync::atomic::AtomicU64;

use std::{ops::DerefMut, sync::atomic::{AtomicU64, AtomicUsize}};
use crate::*;
use interfaces::task::TaskExt;
use tracing::span_enabled;
Expand All @@ -12,8 +12,8 @@ use types::Job;
/*
async tasks or jobs that will be executed in the background inside a lightweight thread of
execution using tokio::spawn() task scheduler and jobq based channels; actor workers will run
these tasks in their own execution context.
typically any instance of the Task must contains:
these tasks in their own execution context like what i've simulated here.
typically any instance of the Task which is kina actor must contains:
- the future job itself
- the sender to send the result of executed task to the channel for using outside of the thread
- a thread safe (Mutex) worker as the background worker thread to execute the future job in it
Expand All @@ -25,14 +25,25 @@ use types::Job;
then in switching the task or doing other heavy process check the lock that if the instance
is already locked or not also we should lock the worker if we want to execute something in the
background worker of the instance thread to tell obj caller that the worker is busy rn.
*/
// future object
// job tree to push the job into the current tree
// sender to broadcast or publish some data to a channel
// an eventloop to receive a data from the channel or the queue to execute it in the background worker thread
// background worker to run the job
// locker to lock the task instance when a task is being executed
more details:
locker, threadpool, worker, future io task, eventloop,
sender, signal condvar, job tree, dep inj future job:
- cron scheduler method
- execute in worker method
- receive from eventloop and exec in threadpool method
- instance locker method
future object
job tree to push the job into the current tree
sender to broadcast or publish some data to a channel
an eventloop to receive a data from the channel or the queue to execute it in the background worker thread
background worker to run the job
locker to lock the task instance when a task is being executed
worker thread of type joinHandle to execute task or job of type async io or cpu tasks
threadpool to execute each task when receives them from mpsc recevier eventloop
atomic syncing with channels and mutex
*/
// #[derive(Debug)] // don't implement this cause Pin doesn't implement Debug
pub struct Task<J: std::future::Future<Output = O>, S, O> where // J is a Future object and must be executed with Box::pin(job);
J: std::future::Future + Send + Sync + 'static + Clone,
Expand All @@ -54,18 +65,26 @@ pub struct Task<J: std::future::Future<Output = O>, S, O> where // J is a Future
pub job: J,
pub job_tree: Vec<Task<J, S, O>>,
pub sender: tokio::sync::mpsc::Sender<S>, // use this to send the result of the task into the channel to share between other lightweight thread workers
pub eventloop: std::sync::Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<S>>>, // use this as eventloop to execute tasks as they're coming from the channel in the background worker thread
pub eventloop_queue: std::sync::Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<S>>>, // use this as eventloop to execute tasks as they're coming from the channel in the background worker thread
pub pool: Vec<tokio::task::JoinHandle<O>>,
pub worker: std::sync::Mutex<tokio::task::JoinHandle<O>>, // execute the task inside the background worker, this is a thread which is safe to be mutated in other threads
pub lock: std::sync::Mutex<()>, // the task itself is locked and can't be used by other threads
}

// thread safe eventloop and queue: arc mutex vec T vs arc mutex receiver T
pub struct QueueAndEventLoop<T: Clone + Send + Sync + 'static>{
pub eventloop: std::sync::Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<T>>>,
pub queue: std::sync::Arc<tokio::sync::Mutex<Vec<T>>>,
}


impl<O, J: std::future::Future<Output = O> + Send + Sync + 'static + Clone, S: Sync + Send + 'static>
Task<J, S, O>
where O: std::any::Any + Send + Sync + 'static{

pub async fn new(job: J,
pub async fn new(job: J, num_threads: usize,
sender: tokio::sync::mpsc::Sender<S>,
eventloop: std::sync::Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<S>>>,
eventloop_queue: std::sync::Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<S>>>,
fut_output: O) -> Self{

// sender and receiver
Expand All @@ -77,7 +96,12 @@ impl<O, J: std::future::Future<Output = O> + Send + Sync + 'static + Clone, S: S
job: job.clone(),
dep_injection_fut_obj: Box::pin(async move{ fut_output }), // pinning the future into the ram with the output of type O
sender,
eventloop,
pool: {
(0..num_threads)
.map(|_| tokio::spawn(job.clone()))
.collect::<Vec<tokio::task::JoinHandle<O>>>()
},
eventloop_queue,
job_tree: vec![],
worker: { // this is the worker that can execute the task inside of itself, it's basically a lightweight thread
std::sync::Mutex::new( // lock the worker
Expand Down Expand Up @@ -148,6 +172,42 @@ impl<O, J: std::future::Future<Output = O> + Send + Sync + 'static + Clone, S: S
self.status = TaskStatus::Hanlted;
}

/* -------------------------------------------------------------
since futures are object safe trait hence they have all traits
features we can pass them to the functions in an static or dynamic
dispatch way using Arc or Box or impl Future or event as the return
type of a closure trait method:
std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>
Arc<dyn Fn() -> R + Send + Sync + 'static> where R: std::future::Future<Output = ()> + Send + Sync + 'static
Box<dyn Fn() -> R + Send + Sync + 'static> where R: std::future::Future<Output = ()> + Send + Sync + 'static
Arc<Mutex<dyn Fn() -> R + Send + Sync + 'static>> where R: std::future::Future<Output = ()> + Send + Sync + 'static
F: std::future::Future<Output = ()> + Send + Sync + 'static
param: impl std::future::Future<Output = ()> + Send + Sync + 'static
NOTE: mutex requires the type to be Sized and since traits are
not sized at compile time we should annotate them with dyn keyword
and put them behind a pointer with valid lifetime or Box and Arc smart pointers
so for the mutexed_job we must wrap the whole mutex inside an Arc or annotate it
with something like &'valid tokio::sync::Mutex<dyn Fn() -> R + Send + Sync + 'static>
the reason is that Mutex is a guard and not an smart pointer which can hanlde
an automatic pointer with lifetime
*/
pub async fn cron_scheduler<F, R>(&mut self,
boxed_job: Box<dyn Fn() -> R + Send + Sync + 'static>,
mutexed_job: std::sync::Arc<tokio::sync::Mutex<dyn Fn() -> R + Send + Sync + 'static>>,
arced_job: std::sync::Arc<dyn Fn() -> R + Send + Sync + 'static>)
where
R: std::future::Future<Output = O> + Send + Sync + 'static{

let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
tokio::spawn(async move{
loop{
interval.tick().await;
arced_job().await;
}
});
}

}

// once the task gets dropped drop any incomplete futures inside the worker
Expand Down
22 changes: 21 additions & 1 deletion src/tests/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,28 @@ pub async fn solidDesignPattern(){
dep injection : combine all of the aboves
pass different types to method through a single interface using trait
handling dep injection using Box<dyn and Mutex<dyn
handling dep injection using Box<dyn and Arc<Mutex<dyn
trait for stat dyn disptach and dep injection and polymorphism (pass multiple type to a method)
task is an io future job of one of the following types
since futures are object safe trait hence they have all traits
features we can pass them to the functions in an static or dynamic
dispatch way using Arc or Box or impl Future or event as the return
type of a closure trait method:
std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>
Arc<dyn Fn() -> R + Send + Sync + 'static> where R: std::future::Future<Output = ()> + Send + Sync + 'static
Box<dyn Fn() -> R + Send + Sync + 'static> where R: std::future::Future<Output = ()> + Send + Sync + 'static
Arc<Mutex<dyn Fn() -> R + Send + Sync + 'static>> where R: std::future::Future<Output = ()> + Send + Sync + 'static
F: std::future::Future<Output = ()> + Send + Sync + 'static
param: impl std::future::Future<Output = ()> + Send + Sync + 'static
NOTE: mutex requires the type to be Sized and since traits are
not sized at compile time we should annotate them with dyn keyword
and put them behind a pointer with valid lifetime or Box and Arc smart pointers
so for the mutexed_job we must wrap the whole mutex inside an Arc or annotate it
with something like &'valid tokio::sync::Mutex<dyn Fn() -> R + Send + Sync + 'static>
the reason is that Mutex is a guard and not an smart pointer which can hanlde
an automatic pointer with lifetime
*/

// traits:
Expand Down
23 changes: 18 additions & 5 deletions src/workers/notif/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
thread safe eventloop queue : the receiver of each broker or the mpsc channel like Arc<Mutex<Recevier<Data>>>
queue : buffer of thread safe event objects like Buffer<Event>{data:Arc<Mutex<Vec<Event>>>}
syntax : while let Some(notif_data) = mpscReceiverEventloopOrRmqOrKafkaOrRedisSubscriber.recv().await{}
CronScheduler : an actor background worker to call io periodically using ctx.run_interval(), loop{} and tokio time and spawn or redis key space notifications pubsub
CronSchedulerActor : an actor background worker to call io periodically using ctx.run_interval(), loop{} and tokio time and spawn or redis key space notifications pubsub
storing and caching : store and cache received notif data in pg db and on redis
node talking : local talking with mailbox & remote talking with (JSON/G/Capnp)RPC
websocket config : send received notif data through the ws mpsc sender / receive notif data in ws server scope
Expand All @@ -18,6 +18,7 @@
→ transactionpool service prodcons actor => receive transaction object
→ product service prodcons actor => receive order object to purchase atomically
concurrency tools & notes :
→ an eventloop is a thread safe receiver queue of the mpsc channel which receives tasks and execute them in free background thread
→ tokio::select, tokio::spawn, tokio::sync::{Mutex, mpsc, RwLock}, std::sync::{Condvar, Arc, Mutex}
→ cpu tasks are graph and geo calculations as well as cryptography algorithms which are resource intensive
→ async io tasks are io and networking calls which must be handled simultaneously in order to scale resources
Expand All @@ -33,17 +34,29 @@
→ std stuffs block and suspend the thread and stop it from executing other tasks while it doing some heavy operations inside the thread like mutex logics
→ tokio stuffs suspend the async io task process instead of blocking the thread and allows the thread executing other tasks simultaneously
→ use channels for atomic syncing between threads instead of using mutex in both async and none async context
→ if we want some result of an either async io or cpu task we have the options of either using of mutex, channels or joining on the thread (would block cpu threads)
→ as soon as the future or async io task is ready to yeild a value the runtime meanwhile of handling other tasks would notify the caller about the result
→ as soon as the the result of the task is ready to be returned from the os thread the os thread will be stopped blocking and continue with executing other tasks
→ actors have their own os or ligh thread of execution which uses to spawn tasks they've received via message passing channels or mailbox
→ to share a data between threads it must be Send Sync and live valid
→ initialize storage and actors data structures once and pack them in AppContext struct then share this between threads
========================================================================================
a sexy actor to produce/consume messages from different type of brokers
it uses RMQ, Redis and Kafka to produce and consume massive messages in
realtime, kindly it supports data AES256 encryption through producing
messages to the broker.
a sexy actor to produce/consume messages from different type of brokers
it uses RMQ, Redis and Kafka to produce and consume massive messages in
realtime, kindly it supports data AES256 encryption through producing
messages to the broker.
brokering is all about queueing, sending and receiving messages way more faster,
safer and reliable than a simple eventloop or a tcp based channel.
In rmq producer sends message to exchange the a consumer can bind its queue to
the exchange to receive the messages, routing key determines the pattern of receive
messages inside the bounded queue from the exchange
In kafka producer sends messages to topic the consumer can receives data from
the topic, Rmq adds an extra layer on top of msg handling logic which is creating
queue per each consumer.
Offset in kafka is an strategy which determines the way of tracking the sequential
order of receiving messages by kafka topics it’s like routing key in rmq
BROKER TYPES:
Expand Down
24 changes: 20 additions & 4 deletions src/workers/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,26 @@ impl CronScheduler{
}
}

// task is an io future job of one of the following types:
// std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>
// Arc<dyn Fn() -> R + Send + Sync + 'static> where R: std::future::Future<Output = ()> + Send + Sync + 'static
// F: std::future::Future<Output = ()> + Send + Sync + 'static
/* -------------------------------------------------------------
since futures are object safe trait hence they have all traits
features we can pass them to the functions in an static or dynamic
dispatch way using Arc or Box or impl Future or event as the return
type of a closure trait method:
std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>
Arc<dyn Fn() -> R + Send + Sync + 'static> where R: std::future::Future<Output = ()> + Send + Sync + 'static
Box<dyn Fn() -> R + Send + Sync + 'static> where R: std::future::Future<Output = ()> + Send + Sync + 'static
Arc<Mutex<dyn Fn() -> R + Send + Sync + 'static>> where R: std::future::Future<Output = ()> + Send + Sync + 'static
F: std::future::Future<Output = ()> + Send + Sync + 'static
param: impl std::future::Future<Output = ()> + Send + Sync + 'static
NOTE: mutex requires the type to be Sized and since traits are
not sized at compile time we should annotate them with dyn keyword
and put them behind a pointer with valid lifetime or Box and Arc smart pointers
so for the mutexed_job we must wrap the whole mutex inside an Arc or annotate it
with something like &'valid tokio::sync::Mutex<dyn Fn() -> R + Send + Sync + 'static>
the reason is that Mutex is a guard and not an smart pointer which can hanlde
an automatic pointer with lifetime
*/
pub async fn startCronScheduler<F, R>(seconds: u64,
// make the future cloneable in each iteration and tokio scope
// as well as safe to be shared between threads
Expand Down

0 comments on commit 3ee8b6c

Please sign in to comment.