From d34ecb6dd74f4b12230fdb80ec55c16e115b6884 Mon Sep 17 00:00:00 2001 From: wildonion Date: Sat, 21 Sep 2024 14:06:48 +0330 Subject: [PATCH] feat(streamer): add secure cell validation logic using redis --- infra/api.http.json | 24 +++--- src/workers/notif/mod.rs | 154 ++++++++++++++++++++++++++++++++------- 2 files changed, 139 insertions(+), 39 deletions(-) diff --git a/infra/api.http.json b/infra/api.http.json index 5848ac7..38e99a0 100755 --- a/infra/api.http.json +++ b/infra/api.http.json @@ -114,12 +114,12 @@ "header": [ { "key": "token_time", - "value": "124kUmD39YUwJRKxaQqTkhhPRLLaieQJWkV2VpXLgRNFuXheXocqxVVT8M6Zxcce5ZKEArkkE" + "value": "124kUmD39YUwJRKxaQqTkFGmzvpLdYvq4fofcvJ2i9VJ9hUNX5fYQRot2QK8XC5wrfA1cc4fo" } ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": {\n \"Rmq\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"exchange_name\": \"SavageEx\",\n \"exchange_type\": \"fanout\", // amq.topic for pubsub\n \"routing_key\": \"\", // routing pattern or key - will be ignored if type is fanout\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Kafka\": null,\n \"Redis\": null\n },\n \"consumer_info\": null\n}", + "raw": "{\n \"producer_info\": {\n \"Rmq\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"exchange_name\": \"SavageEx\",\n \"exchange_type\": \"fanout\", // amq.topic for pubsub\n \"routing_key\": \"\", // routing pattern or key - will be ignored if type is fanout\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\",\n \"unique_key\": \"123\" // use a new UUID per each notif data\n }\n },\n \"Kafka\": null,\n \"Redis\": null\n },\n \"consumer_info\": null\n}", "options": { "raw": { "language": "json" @@ -147,12 +147,12 @@ "header": [ { "key": "token_time", - "value": "124kUmD39YUwJRKxaQqTkhhPRLLaieQJWkV2VpXLgRNFuXheXocqxVVT8M6Zxcce5ZKEArkkE" + "value": "124kUmD39YUwJRKxaQqTkFGmzvpLdYvq4fofcvJ2i9VJ9hUNX5fYQRot2QK8XC5wrfA1cc4fo" } ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": {\n \"Kafka\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"brokers\": \"localhost:9092\", // localhost:29092,localhost:39092,localhost:49092\n \"headers\": [{\n \"key\": \"wildonionKey\", \n \"val\": \"wildonionValue\"\n }],\n \"partitions\": 4,\n \"topic\": \"SavageTopic\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Redis\": null\n },\n \"consumer_info\": null\n}", + "raw": "{\n \"producer_info\": {\n \"Kafka\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"brokers\": \"localhost:9092\", // localhost:29092,localhost:39092,localhost:49092\n \"headers\": [{\n \"key\": \"wildonionKey\", \n \"val\": \"wildonionValue\"\n }],\n \"partitions\": 4,\n \"topic\": \"SavageTopic\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12341\",\n \"unique_key\": \"123\" // use a new UUID per each notif data\n }\n },\n \"Rmq\": null,\n \"Redis\": null\n },\n \"consumer_info\": null\n}", "options": { "raw": { "language": "json" @@ -180,12 +180,12 @@ "header": [ { "key": "token_time", - "value": "124kUmD39YUwJRKxaQqTkhhPRLLaieQJWkV2VpXLgRNFuXheXocqxVVT8M6Zxcce5ZKEArkkE" + "value": "124kUmD39YUwJRKxaQqTkFGmzvpLdYvq4fofcvJ2i9VJ9hUNX5fYQRot2QK8XC5wrfA1cc4fo" } ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": {\n \"Redis\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"channel\": \"savege channel\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n },\n \"consumer_info\": null\n}", + "raw": "{\n \"producer_info\": {\n \"Redis\":{\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"notif_data\": { \n \"receiver_info\": \"1\",\n \"id\": \"unqie-id0\",\n \"action_data\": {\n \"pid\": 200.4\n }, \n \"actioner_info\": \"2\", \n \"action_type\": \"ProductPurchased\", \n \"fired_at\": 1714316645, \n \"is_seen\": false\n }, \n \"channel\": \"savege channel\",\n \"encryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@123452\",\n \"unique_key\": \"123\" // use a new UUID per each notif data\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n },\n \"consumer_info\": null\n}", "options": { "raw": { "language": "json" @@ -213,12 +213,12 @@ "header": [ { "key": "token_time", - "value": "124kUmD39YUwJRKxaQqTkhhPRLLaieQJWkV2VpXLgRNFuXheXocqxVVT8M6Zxcce5ZKEArkkE" + "value": "124kUmD39YUwJRKxaQqTkFGmzvpLdYvq4fofcvJ2i9VJ9hUNX5fYQRot2QK8XC5wrfA1cc4fo" } ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Rmq\": {\n \"queue\": \"TestOnion\",\n \"exchange_name\": \"SavageEx\",\n \"routing_key\": \"\",\n \"tag\": \"cons_tag0\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"store_in_db\": true,\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Kafka\": null,\n \"Redis\": null\n }\n}", + "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Rmq\": {\n \"queue\": \"TestOnion\",\n \"exchange_name\": \"SavageEx\",\n \"routing_key\": \"\",\n \"tag\": \"cons_tag0\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"local_spawn\": true, // spawn the task into the actor thread itself\n \"store_in_db\": true,\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\",\n \"unique_key\": \"123\" // use a new UUID per each notif data\n }\n },\n \"Kafka\": null,\n \"Redis\": null\n }\n}", "options": { "raw": { "language": "json" @@ -246,12 +246,12 @@ "header": [ { "key": "token_time", - "value": "124kUmD39YUwJRKxaQqTkhhPRLLaieQJWkV2VpXLgRNFuXheXocqxVVT8M6Zxcce5ZKEArkkE" + "value": "124kUmD39YUwJRKxaQqTkFGmzvpLdYvq4fofcvJ2i9VJ9hUNX5fYQRot2QK8XC5wrfA1cc4fo" } ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Redis\": {\n \"channel\": \"savege channel\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n }\n}", + "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Redis\": {\n \"channel\": \"savege channel\",\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@123452\",\n \"unique_key\": \"123\" // use a new UUID per each notif data\n }\n },\n \"Rmq\": null,\n \"Kafka\": null\n }\n}", "options": { "raw": { "language": "json" @@ -279,12 +279,12 @@ "header": [ { "key": "token_time", - "value": "124kUmD39YUwJRKxaQqTkhhPRLLaieQJWkV2VpXLgRNFuXheXocqxVVT8M6Zxcce5ZKEArkkE" + "value": "124kUmD39YUwJRKxaQqTkFGmzvpLdYvq4fofcvJ2i9VJ9hUNX5fYQRot2QK8XC5wrfA1cc4fo" } ], "body": { "mode": "raw", - "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Kafka\": {\n \"brokers\": \"localhost:9092\", // localhost:29092,localhost:39092,localhost:49092\n \"topics\": [\"SavageTopic\"],\n \"consumerGroupId\": \"cid\", // this can be UUID\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12345\"\n }\n },\n \"Rmq\": null,\n \"Redis\": null\n }\n}", + "raw": "{\n \"producer_info\": null,\n \"consumer_info\": {\n \"Kafka\": {\n \"brokers\": \"localhost:9092\", // localhost:29092,localhost:39092,localhost:49092\n \"topics\": [\"SavageTopic\"],\n \"consumerGroupId\": \"cid\", // this can be UUID\n \"redis_cache_exp\": 300, // this can be 0 to not to store data on redis\n \"decryptionConfig\": { // this can be null\n \"secret\": \"wildonion@1234\",\n \"passphrase\": \"wildonion@12341\",\n \"unique_key\": \"123\" // use a new UUID per each notif data\n }\n },\n \"Rmq\": null,\n \"Redis\": null\n }\n}", "options": { "raw": { "language": "json" diff --git a/src/workers/notif/mod.rs b/src/workers/notif/mod.rs index 7abd989..10ea97e 100644 --- a/src/workers/notif/mod.rs +++ b/src/workers/notif/mod.rs @@ -233,6 +233,7 @@ pub struct ConsumerInfo{ pub struct CryptoConfig{ pub secret: String, pub passphrase: String, + pub unique_key: String } #[derive(Message, Clone, Serialize, Deserialize, Debug, Default, ToSchema)] @@ -350,12 +351,18 @@ impl NotifBrokerActor{ let mut dataString = serde_json::to_string(¬if_data).unwrap(); let finalData = if encryptionConfig.is_some(){ - let CryptoConfig{ secret, passphrase } = encryptionConfig.unwrap(); + let CryptoConfig{ secret, passphrase, unique_key } = encryptionConfig.unwrap(); let mut secure_cell_config = &mut wallexerr::misc::SecureCellConfig{ secret_key: hex::encode(secret), passphrase: hex::encode(passphrase), data: vec![], }; + + // store the config on redis for future validation in consumer + let mut redis_conn = redis_pool.get().await.unwrap(); // TODO - handle the error properly + let redisKey = format!("Redis_SecureCellConfig-{}", unique_key); + let configString = serde_json::to_string(&secure_cell_config).unwrap(); + let _: () = redis_conn.set(redisKey, configString).await.unwrap(); // after calling encrypt method dataString has changed and contains the hex encrypted data dataString.encrypt(secure_cell_config); @@ -421,20 +428,22 @@ impl NotifBrokerActor{ // try to create the secure cell config and // do passphrase and secret key validation logic before // consuming messages - let mut secure_cell_config = if let Some(mut config) = decryption_config.clone(){ + let secure_cell_config_unique_key = if let Some(mut config) = decryption_config.clone(){ config.secret = hex::encode(config.secret); config.passphrase = hex::encode(config.passphrase); // return the loaded instance from redis - SecureCellConfig{ - secret_key: config.secret, - passphrase: config.passphrase, + let secureCellConfig = SecureCellConfig{ + secret_key: config.clone().secret, + passphrase: config.clone().passphrase, data: vec![], - } + }; + + (secureCellConfig, config.unique_key) } else{ - SecureCellConfig::default() + (SecureCellConfig::default(), String::from("")) }; // use redis async for handling realtime streaming of events @@ -459,10 +468,32 @@ impl NotifBrokerActor{ tokio::spawn(async move{ // realtime streaming over redis channel to receive emitted notifs while let Some(message) = pubsubstream.next().await{ + + /* WE SHOULD DO THE VALIDATION IN EVERY ITERATION */ + let mut secure_cell_config = secure_cell_config_unique_key.clone().0; + let redisUniqueKey = secure_cell_config_unique_key.clone().1; + let mut redis_conn = redis_pool.get().await.unwrap(); // TODO - handle the error properly + let redisKey = format!("Redis_SecureCellConfig-{}", redisUniqueKey); + let isKeyThere: bool = redis_conn.exists(&redisKey).await.unwrap(); + if isKeyThere{ + let getConfig: String = redis_conn.get(redisKey).await.unwrap(); + let mut redisConfig = serde_json::from_str::(&getConfig).unwrap(); + // validating... + if + redisConfig.passphrase != secure_cell_config.passphrase || + redisConfig.secret_key != secure_cell_config.secret_key + { + log::error!("invalid secure cell config, CHANNEL IS NOT SAFE!"); + return; + } + + } else{ + log::error!("Redis configs are not set properly on Redis"); + } + let resp_val = message.unwrap(); let mut channelData = String::from_resp(resp_val).unwrap(); // this is the expired key - - let mut secure_cell_config = secure_cell_config.clone(); + let cloned_notif_data_sender_channel = cloned_notif_data_sender_channel.clone(); let zerlog_producer_actor = zerlog_producer_actor.clone(); let notif_mutator_actor = notif_mutator_actor.clone(); @@ -724,12 +755,18 @@ impl NotifBrokerActor{ let mut dataString = serde_json::to_string(¬if_data).unwrap(); let finalData = if encryptionConfig.is_some(){ - let CryptoConfig{ secret, passphrase } = encryptionConfig.clone().unwrap(); + let CryptoConfig{ secret, passphrase, unique_key } = encryptionConfig.clone().unwrap(); let mut secure_cell_config = &mut wallexerr::misc::SecureCellConfig{ secret_key: hex::encode(secret), passphrase: hex::encode(passphrase), data: vec![], }; + + // store the config on redis for future validation in consumer + let mut redis_conn = redis_pool.get().await.unwrap(); // TODO - handle the error properly + let redisKey = format!("Kafka_SecureCellConfig-{}", unique_key); + let configString = serde_json::to_string(&secure_cell_config).unwrap(); + let _: () = redis_conn.set(redisKey, configString).await.unwrap(); // after calling encrypt method dataString has changed and contains the hex encrypted data dataString.encrypt(secure_cell_config); @@ -889,20 +926,22 @@ impl NotifBrokerActor{ match redis_pool.get().await{ Ok(mut redis_conn) => { - let mut secure_cell_config = if let Some(mut config) = decryptionConfig.clone(){ + let secure_cell_config_unique_key = if let Some(mut config) = decryptionConfig.clone(){ config.secret = hex::encode(config.secret); config.passphrase = hex::encode(config.passphrase); // return the loaded instance from redis - SecureCellConfig{ - secret_key: config.secret, - passphrase: config.passphrase, + let secureCellConfig = SecureCellConfig{ + secret_key: config.clone().secret, + passphrase: config.clone().passphrase, data: vec![], - } + }; + + (secureCellConfig, config.unique_key) } else{ - SecureCellConfig::default() + (SecureCellConfig::default(), String::from("")) }; let cloned_notif_data_sender_channel = notif_data_sender.clone(); @@ -949,6 +988,28 @@ impl NotifBrokerActor{ // streaming over the consumer to receive messages from the topics while let Ok(message) = consumer.recv().await{ + /* WE SHOULD DO THE VALIDATION IN EVERY ITERATION */ + let mut secure_cell_config = secure_cell_config_unique_key.clone().0; + let redisUniqueKey = secure_cell_config_unique_key.clone().1; + let mut redis_conn = redis_pool.get().await.unwrap(); // TODO - handle the error properly + let redisKey = format!("Kafka_SecureCellConfig-{}", redisUniqueKey); + let isKeyThere: bool = redis_conn.exists(&redisKey).await.unwrap(); + if isKeyThere{ + let getConfig: String = redis_conn.get(redisKey).await.unwrap(); + let mut redisConfig = serde_json::from_str::(&getConfig).unwrap(); + // validating... + if + redisConfig.passphrase != secure_cell_config.passphrase || + redisConfig.secret_key != secure_cell_config.secret_key + { + log::error!("invalid secure cell config, CHANNEL IS NOT SAFE!"); + return; + } + + } else{ + log::error!("Kafka configs are not set properly on Redis"); + } + // it uses std::str::from_utf8() to convert utf8 bytes into string let mut consumedBuffer = message.payload().unwrap(); let hexed_data = std::str::from_utf8(consumedBuffer).unwrap(); @@ -1295,20 +1356,22 @@ impl NotifBrokerActor{ // try to create the secure cell config and // do passphrase and secret key validation logic before // consuming messages - let mut secure_cell_config = if let Some(mut config) = decryptionConfig.clone(){ + let secure_cell_config_uniqueKey = if let Some(mut config) = decryptionConfig.clone(){ config.secret = hex::encode(config.secret); config.passphrase = hex::encode(config.passphrase); // return the loaded instance from redis - SecureCellConfig{ - secret_key: config.secret, - passphrase: config.passphrase, + let secureCellConfig = SecureCellConfig{ + secret_key: config.clone().secret, + passphrase: config.clone().passphrase, data: vec![], - } + }; + + (secureCellConfig, config.unique_key) } else{ - SecureCellConfig::default() + (SecureCellConfig::default(), String::from("")) }; // -ˋˏ✄┈┈┈┈ consuming from the queue owned by this consumer @@ -1335,12 +1398,33 @@ impl NotifBrokerActor{ match delivery{ Ok(delv) => { + /* WE SHOULD DO THE VALIDATION IN EVERY ITERATION */ + let mut secure_cell_config = secure_cell_config_uniqueKey.0.clone(); + let redisUniqueKey = secure_cell_config_uniqueKey.clone().1; + let mut redis_conn = redis_pool.get().await.unwrap(); // TODO - handle the error properly + let redisKey = format!("Rmq_SecureCellConfig-{}", redisUniqueKey); + let isKeyThere: bool = redis_conn.exists(&redisKey).await.unwrap(); + if isKeyThere{ + let getConfig: String = redis_conn.get(redisKey).await.unwrap(); + let mut redisConfig = serde_json::from_str::(&getConfig).unwrap(); + // validating... + if + redisConfig.passphrase != secure_cell_config.passphrase || + redisConfig.secret_key != secure_cell_config.secret_key + { + log::error!("invalid secure cell config, CHANNEL IS NOT SAFE!"); + return; + } + + } else{ + log::error!("Rmq configs are not set properly on Redis"); + } + log::info!("[*] received delivery from queue#<{}>", q.name()); let consumedBuffer = delv.data.clone(); let hexed_data = std::str::from_utf8(&consumedBuffer).unwrap(); let mut payload = hexed_data.to_string(); - let mut secure_cell_config = secure_cell_config.clone(); let cloned_notif_data_sender_channel = cloned_notif_data_sender_channel.clone(); let redis_pool = redis_pool.clone(); let notif_mutator_actor = notif_mutator_actor.clone(); @@ -1633,7 +1717,8 @@ impl NotifBrokerActor{ /* ********************************************************************* */ /* ***************************** PRODUCING ***************************** */ /* ********************************************************************* */ - pub async fn publishToRmq(&self, data: &str, exchange: &str, routing_key: &str, exchange_type: &str){ + pub async fn publishToRmq(&self, data: &str, exchange: &str, routing_key: &str, + exchange_type: &str, secureCellConfig: SecureCellConfig, redisUniqueKey: &str){ let zerlog_producer_actor = self.clone().zerlog_producer_actor; let this = self.clone(); @@ -1646,6 +1731,7 @@ impl NotifBrokerActor{ let routing_key = routing_key.to_string(); let exchange_type = exchange_type.to_string(); let data = data.to_string(); + let redisUniqueKey = redisUniqueKey.to_string(); tokio::spawn(async move{ @@ -1653,6 +1739,14 @@ impl NotifBrokerActor{ let rmq_pool = storage.as_ref().unwrap().get_lapin_pool().await.unwrap(); let redis_pool = storage.as_ref().unwrap().get_redis_pool().await.unwrap(); + + // store the config on redis for future validation in consumer + let mut redis_conn = redis_pool.get().await.unwrap(); // TODO - handle the error properly + let redisKey = format!("Rmq_SecureCellConfig-{}", redisUniqueKey); + let configString = serde_json::to_string(&secureCellConfig).unwrap(); + let _: () = redis_conn.set(redisKey, configString).await.unwrap(); + + // trying to ge a connection from the pool match rmq_pool.get().await{ Ok(pool) => { @@ -1836,15 +1930,21 @@ impl ActixMessageHandler for NotifBrokerActor{ let this = self.clone(); let mut stringData = serde_json::to_string(¬if_data).unwrap(); + let mut scc = SecureCellConfig::default(); + let mut ruk = String::from(""); + let finalData = if encryptionConfig.is_some(){ - let CryptoConfig{ secret, passphrase } = encryptionConfig.clone().unwrap(); + let CryptoConfig{ secret, passphrase, unique_key } = encryptionConfig.clone().unwrap(); let mut secure_cell_config = &mut wallexerr::misc::SecureCellConfig{ secret_key: hex::encode(secret), passphrase: hex::encode(passphrase), data: vec![], }; + scc = secure_cell_config.clone(); + ruk = unique_key; + // after calling encrypt method stringData has changed and contains the hex encrypted data stringData.encrypt(secure_cell_config); @@ -1860,13 +1960,13 @@ impl ActixMessageHandler for NotifBrokerActor{ // every actor has its own thread of execution. if local_spawn{ async move{ - this.publishToRmq(&finalData, &exchange_name, &routing_key, &exchange_type).await; + this.publishToRmq(&finalData, &exchange_name, &routing_key, &exchange_type, scc, &ruk).await; } .into_actor(self) // convert the future into an actor future of type NotifBrokerActor .spawn(ctx); // spawn the future object into this actor context thread } else{ // spawn the future in the background into the tokio lightweight thread tokio::spawn(async move{ - this.publishToRmq(&finalData, &exchange_name, &routing_key, &exchange_type).await; + this.publishToRmq(&finalData, &exchange_name, &routing_key, &exchange_type, scc, &ruk).await; }); }