Skip to content

Commit

Permalink
feat: working on redis and kafka prod/cons pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed Sep 16, 2024
1 parent 413a7c4 commit 8cf0203
Showing 1 changed file with 172 additions and 5 deletions.
177 changes: 172 additions & 5 deletions src/workers/notif/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,17 @@
a sexy actor to produce/consume messages from different type of brokers
it uses RMQ, Redis and Kafka to produce and consume massive messages in
realtime, kindly it supports data AES256 encryption through producing
messages to the broker.
messages to the broker. we can send either producing or consuming message
to this actor to start producing or consuming in the background
brokering is all about queueing, sending and receiving messages way more faster,
safer and reliable than a simple eventloop or a tcp based channel.
all brokers contains message/task/job queue to handle communication between services
asyncly, they've been designed specially for decoupling tight long running services
due to their durability nature like giving predicting output of an ai model service
while the ai model is working on other input prediction in the background we can
receive the old outputs and pass them through the brokers to receive them in some
http service for responding the caller.
In rmq producer sends message to exchange the a consumer can bind its queue to
the exchange to receive the messages, routing key determines the pattern of receive
messages inside the bounded queue from the exchange
Expand All @@ -58,7 +65,6 @@
Offset in kafka is an strategy which determines the way of tracking the sequential
order of receiving messages by kafka topics it’s like routing key in rmq
BROKER TYPES:
---> KAFKA
---> REDIS PUBSUB
Expand Down Expand Up @@ -92,6 +98,7 @@
------------------- client ------------------
======================================================================================== */

use tokio::spawn;
use crate::*;
use deadpool_lapin::lapin::protocol::channel;
use deadpool_redis::redis::AsyncCommands;
Expand Down Expand Up @@ -124,6 +131,36 @@ use crate::models::event::*;
use crate::interfaces::crypter::Crypter;


#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
#[rtype(result = "()")]
pub struct PublishNotifToKafka{
pub topic: String,
pub local_spawn: bool,
pub notif_data: NotifData,
pub encryptionConfig: Option<CryptoConfig>
}

#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
#[rtype(result = "()")]
pub struct ConsumeNotifFromKafka{ // we must build a unique consumer per each consuming process
pub topic: String,
pub consumerId: String,
}

#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
#[rtype(result = "()")]
pub struct PublishNotifToRedis{
pub channel: String,
pub local_spawn: bool,
pub notif_data: NotifData,
pub encryptionConfig: Option<CryptoConfig>
}

#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
#[rtype(result = "()")]
pub struct ConsumeNotifFromRedis{
pub channel: String,
}

#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
#[rtype(result = "()")]
Expand All @@ -149,10 +186,9 @@ pub struct CryptoConfig{
pub unique_redis_id: String,
}


#[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
#[rtype(result = "()")]
pub struct ConsumeNotif{
pub struct ConsumeNotif{ // we'll create a channel then start consuming by binding a queue to the exchange
/* -ˋˏ✄┈┈┈┈
following queue gets bounded to the passed in exchange type with its
routing key, when producer wants to produce notif data it sends them
Expand Down Expand Up @@ -239,6 +275,32 @@ impl Actor for NotifBrokerActor{

impl NotifBrokerActor{

/* ******************************************************************************* */
/* ***************************** PUBLISHING TO REDIS ***************************** */
/* ******************************************************************************* */
pub async fn publishToRedis(&self, channel: &str, notif_data: NotifData, encryptionConfig: Option<CryptoConfig>){

let storage = self.app_storage.clone();
let redis_pool = storage.as_ref().unwrap().get_redis_pool().await.unwrap();
let zerlog_producer_actor = self.clone().zerlog_producer_actor;


// publish easily to redis
// ...


}

/* ******************************************************************************** */
/* ***************************** CONSUMING FROM REDIS ***************************** */
/* ******************************************************************************** */
pub async fn consumeFromRedis(&self, channel: &str){

// start consuming from redis
// ...

}

/* ********************************************************************* */
/* ***************************** CONSUMING ***************************** */
/* ********************************************************************* */
Expand Down Expand Up @@ -1131,4 +1193,109 @@ impl ActixMessageHandler<HealthMsg> for NotifBrokerActor{
ctx.stop(); // stop the already running actor
}
}
}
}

/* **************************************************************************************** */
/* ***************************** REDIS PRODUCER NOTIF HANDLER ***************************** */
/* **************************************************************************************** */
impl ActixMessageHandler<PublishNotifToRedis> for NotifBrokerActor{

type Result = ();

fn handle(&mut self, msg: PublishNotifToRedis, ctx: &mut Self::Context) -> Self::Result {


let PublishNotifToRedis{
channel,
local_spawn,
notif_data,
encryptionConfig
} = msg.clone();

let this = self.clone();
let task = async move{

// await on the publishing task, tells runtime we need the result
// of this task, if it's ready runtime return the result back to
// the caller otherwise suspend the this.publishToRedis() function
// until the task is ready to be polled, meanwhile it executes other
// tasks (won't block the thread)
this.publishToRedis(&channel, notif_data, encryptionConfig).await;

};


// spawn the task in the background ligh thread or
// the actor thread itself.
// don't await on the spawn; let the task execute
// in the background unless you want to use select
// or tell the runtime someone needs the result right
// now but notify later once the task is completed
// and don't block the thread
if local_spawn{
task
.into_actor(self)
.spawn(ctx);
} else{
spawn(task);
}

}

}

/* **************************************************************************************** */
/* ***************************** REDIS CONSUMER NOTIF HANDLER ***************************** */
/* **************************************************************************************** */
impl ActixMessageHandler<ConsumeNotifFromRedis> for NotifBrokerActor{

type Result = ();

fn handle(&mut self, msg: ConsumeNotifFromRedis, ctx: &mut Self::Context) -> Self::Result {

let ConsumeNotifFromRedis { channel } = msg.clone();
let this = self.clone();

let task = async move{

// must store in db (mutator actor)
// cache on redis
// zerlog producer
// send to ws mpsc notif sender

};

// spawn the task in the background ligh thread
// don't await on the spawn; let the task execute
// in the background unless you want to use select
// or tell the runtime someone needs the result right
// now but notify later once the task is completed
// and don't block the thread
spawn(task);
}
}

/* **************************************************************************************** */
/* ***************************** KAFKA PRODUCER NOTIF HANDLER ***************************** */
/* **************************************************************************************** */
impl ActixMessageHandler<PublishNotifToKafka> for NotifBrokerActor{

type Result = ();

fn handle(&mut self, msg: PublishNotifToKafka, ctx: &mut Self::Context) -> Self::Result {


}
}

/* **************************************************************************************** */
/* ***************************** KAFKA CONSUMER NOTIF HANDLER ***************************** */
/* **************************************************************************************** */
impl ActixMessageHandler<ConsumeNotifFromKafka> for NotifBrokerActor{

type Result = ();

fn handle(&mut self, msg: ConsumeNotifFromKafka, ctx: &mut Self::Context) -> Self::Result {

}
}

0 comments on commit 8cf0203

Please sign in to comment.