diff --git a/infra/api.http.json b/infra/api.http.json index 97c10ce..3dd37fb 100755 --- a/infra/api.http.json +++ b/infra/api.http.json @@ -180,12 +180,12 @@ "header": [ { "key": "token_time", - "value": "124kUmD39YUwJRKxaQqTkTRZGDr9L6fszxLz6xKvXrqJSJQNo9vAMjYs5tbbiRPskTGUgRegd" + "value": "124kUmD39YUwJRKxaQqTkFidRW3uAzhcFniiky7b2HjFPME8R5ZLdFgs9z6WncoLbPh72EPNX" } ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": {\n \"Redis\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"channel\": \"channel-to-produce-msg-to\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n },\n \"consumer_info\": null\n}", + "raw": "{\n \"producer_info\": {\n \"Redis\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"channel\": \"savege channel\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n },\n \"consumer_info\": null\n}", "options": { "raw": { "language": "json" @@ -246,12 +246,12 @@ "header": [ { "key": "token_time", - "value": "124kUmD39YUwJRKxaQqTkRUAJQtFUBmfjrpL99YjTeaJ1jBEDgt7sCVdKKdvoUi4n6Ennr8e2" + "value": "124kUmD39YUwJRKxaQqTkFidRW3uAzhcFniiky7b2HjFPME8R5ZLdFgs9z6WncoLbPh72EPNX" } ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Redis\": {\n \"channel\": \"channel-to-consume-msg-from\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n }\n}", + "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Redis\": {\n \"channel\": \"savege channel\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n }\n}", "options": { "raw": { "language": "json" diff --git a/logs/error-kind/zerlog.log b/logs/error-kind/zerlog.log index 022cfcd..40ea0b1 100755 --- a/logs/error-kind/zerlog.log +++ b/logs/error-kind/zerlog.log @@ -58,3 +58,5 @@ code: 65535 | message: Message production error: UnknownPartition (Local: Unknow | time: 1726767152352 | method name: NotifBrokerActor.publishToKafka.deliveryStatus code: 65531 | message: invalid number at line 1 column 2 | due to: Serde Error | time: 1726771961937 | method name: NotifBrokerActor.consumeFromKafka.decode_serde +code: 65531 | message: invalid number at line 1 column 2 | due to: Serde Error + | time: 1726773899166 | method name: NotifBrokerActor.consumeFromRedis.decode_serde diff --git a/src/workers/notif/mod.rs b/src/workers/notif/mod.rs index 42cde99..814f6ee 100644 --- a/src/workers/notif/mod.rs +++ b/src/workers/notif/mod.rs @@ -3,36 +3,26 @@ /* ======================================================================================== REALTIME NOTIF EVENT STREAMING DESIGN PATTERN (README files inside docs folder) ======================================================================================== + MAKE SURE YOU'VE STARTED CONSUMERS BEFORE PRODUCING + THEY MUST BE READY FOR CONSUMING WHILE PRODUCERS ARE + SENDING MESSAGES TO THE BROKER. - concurrency tools & notes : - → an eventloop is a thread safe receiver queue of the mpsc channel which receives tasks and execute them in free background thread - → actor with Box::pin(async{}), tokio::select, tokio::spawn, tokio::sync::{Mutex, mpsc, RwLock}, std::sync::{Condvar, Arc, Mutex} - → cpu tasks are graph and geo calculations as well as cryptography algorithms which are resource intensive - → async io tasks are io and networking calls which must be handled simultaneously in order to scale resources - → async io task execution inside light threadpool: wait on the task but don't block the thread, continue with executing other tasks - → cpu task execution inside os threadpool: suspend the thread of execution by blocking it, avoid executing other tasks - → use await on the async io task to not to block the thread and let the thread execute other tasks meanwhile the task is waiting to be solved - → await pauses and suspends the function execution not the thread and tells the eventloop to pop out another task while the function is awaited - → use join on the cpu task to block the thread to suspend the thread execution with all tasks and wait for the result of the thread - → use Condvar to wait and block the thread until some data changes then notify the blocked thread - → don't use os threadpool in the context of light threadpool, they block the execution thread as well as the entire async runtime - → std mutex block the caller thread if the lock is busy it stops the thread from executing all tasks until it acquires the lock - → tokio mutex suspend the async task if the lock is busy it suspend the io task instead of blocking the executor thread - → std stuffs block and suspend the thread and stop it from executing other tasks while it doing some heavy operations inside the thread like mutex logics - → tokio stuffs suspend the async io task process instead of blocking the thread and allows the thread executing other tasks simultaneously - → use channels for atomic syncing between threads instead of using mutex in both async and none async context, send the mutated/updated data to channel instead of using mutex or condvar - → if we want some result of an either async io or cpu task we have the options of either using of mutex, channels or joining on the thread (would block cpu threads) - → as soon as the future or async io task is ready to yeild a value the runtime meanwhile of handling other tasks would notify the caller about the result - → as soon as the the result of the task is ready to be returned from the os thread the os thread will be stopped blocking and continue with executing other tasks - → actors have their own os or ligh thread of execution which uses to spawn tasks they've received via message passing channels or mailbox - → actors receive messages asyncly using their receiver eventloop of their jobq mpsc mailbox, they execute them one at a time to ensure the internal state remains consistent cause there is no mutex - → to share a data between threads it must be Send Sync and live valid - → initialize storage and actors data structures once and pack them in AppContext struct then share this between threads - → what are objects: are an isolated thread objects contains light thread for executing tasks, cron scheudling and jobq mailbox - → talk between two objects using job/task/msg queue with mpsc and rpc based channels like rmq, redis, kafka - → receive tasks from the channel by streaming over eventloop with while let Some() = rx.recv().await{} - → what eventloop does: executing received tasks inside a light thread of execution - → stream is an eventloop receiver channel of some jobq that can be iterated over to get data as they're coming from the channel + NotifBrokerActor is the worker of handling the process of publishing and consuming + messages through rmq, redis and kafka, talking to the NotifBrokerActor can be done + by sending it a message contains the setup either to publish or consume something + to and from an specific broker, so generally it's a sexy actor to produce/consume + messages from different type of brokers it uses RMQ, Redis and Kafka to produce and + consume massive messages in realtime, kindly it supports data AES256 encryption + through producing messages to the broker. we can send either producing or consuming + message to this actor to start producing or consuming in the background. + + ************************************************************************************ + it's notable that for realtime push notif streaming we MUST start consuming from + the specified broker passed in to the message structure when talking with actor, in + a place where the application logic which is likely a server is being started. + + ************************************************************************************ + ======================================================================================== */ @@ -50,6 +40,7 @@ use rdkafka::ClientConfig; use rdkafka::Message; use redis_async::resp::FromResp; use tokio::spawn; +use workers::scheduler::CronScheduler; use crate::*; use deadpool_lapin::lapin::protocol::channel; use deadpool_redis::redis::AsyncCommands; @@ -83,22 +74,6 @@ use crate::interfaces::crypter::Crypter; /* ======================================================================================== - NotifBrokerActor is the worker of handling the process of publishing and consuming - messages through rmq, redis and kafka, talking to the NotifBrokerActor can be done - by sending it a message contains the setup either to publish or consume something - to and from an specific broker, so generally it's a sexy actor to produce/consume - messages from different type of brokers it uses RMQ, Redis and Kafka to produce and - consume massive messages in realtime, kindly it supports data AES256 encryption - through producing messages to the broker. we can send either producing or consuming - message to this actor to start producing or consuming in the background. - - ************************************************************************************ - it's notable that for realtime push notif streaming we MUST start consuming from - the specified broker passed in to the message structure when talking with actor, in - a place where the application logic which is likely a server is being started. - MAKE SURE YOU'VE STARTED CONSUMING BEFORE PRODUCING - ************************************************************************************ - brokering is all about queueing, sending and receiving messages way more faster, safer and reliable than a simple eventloop or a tcp based channel. all brokers contains message/task/job queue to handle communication between services @@ -219,7 +194,7 @@ pub struct PublishNotifToRedis{ pub struct ConsumeNotifFromRedis{ pub channel: String, pub redis_cache_exp: u64, - pub decryption_config: Option + pub decryptionConfig: Option } #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] @@ -390,7 +365,20 @@ impl NotifBrokerActor{ dataString }; - let _: () = redis_conn.publish(channel.clone(), finalData).await.unwrap(); + // we should keep sending until a consumer receive the data! + tokio::spawn(async move{ + let mut int = tokio::time::interval(tokio::time::Duration::from_secs(1)); + loop{ + int.tick().await; + let getSubs: RedisResult = redis_conn.publish(channel.clone(), finalData.clone()).await; + let subs = getSubs.unwrap(); + if subs >= 1{ + log::info!("Message has been published to Redis PubSub Channel"); + break; + } + } + }); + }); }, @@ -472,7 +460,9 @@ impl NotifBrokerActor{ while let Some(message) = pubsubstream.next().await{ let resp_val = message.unwrap(); let mut channelData = String::from_resp(resp_val).unwrap(); // this is the expired key - + + log::info!("Message has been Received from Redis PubSub Channel: {}", channelData); + // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> // ===>>>===>>>===>>>===>>>===>>> data decryption logic ===>>>===>>>===>>>===>>>===>>> // ===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>>>===>> @@ -2096,7 +2086,7 @@ impl ActixMessageHandler for NotifBrokerActor{ fn handle(&mut self, msg: ConsumeNotifFromRedis, ctx: &mut Self::Context) -> Self::Result { - let ConsumeNotifFromRedis { channel, decryption_config, redis_cache_exp } = msg.clone(); + let ConsumeNotifFromRedis { channel, decryptionConfig, redis_cache_exp } = msg.clone(); let this = self.clone(); let task = async move{ @@ -2106,7 +2096,7 @@ impl ActixMessageHandler for NotifBrokerActor{ // the caller otherwise suspend the this.publishToRedis() function // until the task is ready to be polled, meanwhile it executes other // tasks (won't block the thread) - this.consumeFromRedis(&channel, decryption_config, redis_cache_exp).await; + this.consumeFromRedis(&channel, decryptionConfig, redis_cache_exp).await; };