Skip to content

Commit

Permalink
fix(streamer): first process data then ack
Browse files Browse the repository at this point in the history
  • Loading branch information
wildonion committed Sep 20, 2024
1 parent f6afb6f commit 6771c45
Show file tree
Hide file tree
Showing 7 changed files with 824 additions and 689 deletions.
156 changes: 0 additions & 156 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,162 +55,6 @@ i'm hoopoe, a realtime social event platform allows your hoop get heard!
<img src="https://github.com/wildonion/hoopoe/blob/main/infra/arch.png">
</p>

### proper way to write and handle async task execution!

> [!IMPORTANT]
> every async task must be spawned in a light io thread using a background worker which can be done via `tokio::spawn()`, it's like creating a worker per each async task, async tasks or jobs are none blocking io operations which requires a none blocking scope to execute them.
```rust
async fn heavy_io_process_(){
// it can be one of the following tasks:
// --> consuming task: send consume message to consumer actor
// --> producing task: send produce message to producer actor
// --> storing in db: send data to mutator actor
// --> storing in redis: cache on redis with exp key
// --> locking logic
// --> db operations
}

let (tx, rx) = channel::<String>(); // channel for stringified data
let tx = tx.clone();
// spawn in the background and use channel to receive
// data whenever the data is sent to the channel
tokio::spawn(async move{
let res = heavy_io_process_().await;
tx.send(res).await;
});
while let Some(data) = rx.recv().await{
// do the rest of the logics in here
// whenever we receive data
// ...
}

// ------------
// or
// ------------

// spawn in the background but wait to gets solved
// and once it's solved we then proceed with the rest
// of flow and cacnel other branches or async tasks
let task = tokio::spawn(async move{
let res = heavy_io_process_().await;
tx.send(res).await;
});

tokio::select! {
// choose this if it can gets completed soon
_ = task => {
// proceed with this flow
// ...
},
// or if you think this gets completed soonly
data = rx.recv() => {
// proceed with this flow
// ...
}
}
```

### proper way to produce and consume data from RMQ broker

> [!IMPORTANT]
> start producing or consuming in the background by sending related message to their actors inside the `tokio::spawn()` scope, this way can be used to execute any async task or job gently in the background threads using tokio scheduler.
#### to produce data in the background:

```rust
#[derive(Clone)]
struct SomeData{}
let data = SomeData{};
tokio::spawn( // running the producing notif job in the background in a free thread
{
let cloned_app_state = app_state.clone();
let cloned_notif = ProduceNotif{
"local_spawn": true,
"notif_data": {
"receiver_info": "1",
"id": "unqie-id0",
"action_data": {
"pid": 200.4
},
"actioner_info": "2",
"action_type": "ProductPurchased",
"fired_at": 1714316645,
"is_seen": false
},
"exchange_name": "SavageEx",
"exchange_type": "fanout", // amq.topic for pubsub
"routing_key": "" // routing pattern or key - will be ignored if type is fanout
};
async move{
match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap()
.broker_actors.notif_actor.send(cloned_notif).await
{
Ok(_) => {

// in here you could access the notif for an owner using
// a redis key like: notif_owner:3 which retrieve all data
// on the redis for the receiver with id 3
()

},
Err(e) => {
let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let err_instance = crate::error::HoopoeErrorResponse::new(
*MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code
source.as_bytes().to_vec(), // text of error source in form of utf8 bytes
crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime
&String::from("register_notif.producer_actor.notif_actor.send"), // current method name
Some(&zerlog_producer_actor)
).await;
return;
}
}
}
);
```

> to consume data in the background:
```rust
tokio::spawn( // running the consuming notif job in the background in a free thread
{
let cloned_app_state = app_state.clone();
let cloned_notif = ConsumeNotif{
"queue": "TestOnion",
"exchange_name": "SavageEx",
"routing_key": "",
"tag": "cons_tag0",
"redis_cache_exp": 300,
"local_spawn": true,
"cache_on_redis": true,
"store_in_db": true
};
async move{
// consuming notif by sending the ConsumeNotif message to
// the consumer actor,
match cloned_app_ctx.clone().unwrap().actors.as_ref().unwrap()
.broker_actors.notif_actor.send(cloned_notif).await
{
Ok(_) => { () },
Err(e) => {
let source = &e.source().unwrap().to_string(); // we know every goddamn type implements Error trait, we've used it here which allows use to call the source method on the object
let err_instance = crate::error::HoopoeErrorResponse::new(
*MAILBOX_CHANNEL_ERROR_CODE, // error hex (u16) code
source.as_bytes().to_vec(), // text of error source in form of utf8 bytes
crate::error::ErrorKind::Actor(crate::error::ActixMailBoxError::Mailbox(e)), // the actual source of the error caused at runtime
&String::from("register_notif.consumer_actor.notif_actor.send"), // current method name
Some(&zerlog_producer_actor)
).await;
return;
}
}

}
}
);
```

## routes and apis

```bash
Expand Down
231 changes: 230 additions & 1 deletion docs/Rmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,233 @@ the messages stay in the queue until they are handled by a consumer. if a queue

## Final words

we can declare as much as queue per each consumer we want and bind it to an specific exchange routing key this logic enables each consumer consume message coming from an exchange from its own queue cause the queue is already bounded to that exchange routing key generally any queue that is bounded to an specific routing key will receive the messages coming from that exchange for any exchange pattern build a queue for each consumer and bind it to the routing key cause routing key specifies to which queue message must be sent and it works if there are multiple queues with different names cause they are bounded to the same routing key change.
we can declare as much as queue per each consumer we want and bind it to an specific exchange routing key this logic enables each consumer consume message coming from an exchange from its own queue cause the queue is already bounded to that exchange routing key generally any queue that is bounded to an specific routing key will receive the messages coming from that exchange for any exchange pattern build a queue for each consumer and bind it to the routing key cause routing key specifies to which queue message must be sent and it works if there are multiple queues with different names cause they are bounded to the same routing key change.

## Handling RPC Communication with RMQ

To handle **RPC (Remote Procedure Call)** queues in RabbitMQ using **`deadpool-lapin`** in Rust, you'll need to follow a few steps. RabbitMQ supports RPC via the use of a **"reply-to" queue** where a client sends a message with the expectation of receiving a response on that queue.

In RabbitMQ, the basic flow for **RPC** involves:
1. The **client** sends a message to a queue and specifies a **reply-to** queue.
2. The **server** consumes the message, processes it, and sends a response back to the **reply-to** queue specified by the client.
3. The **client** listens for the response on the **reply-to** queue.

Here’s how you can implement an RPC queue handler using `deadpool-lapin` in Rust.

### **Step 1: Set Up Deadpool-Lapin in `Cargo.toml`**
Make sure you have the following dependencies in your `Cargo.toml` file:

```toml
[dependencies]
deadpool-lapin = "0.11"
lapin = "2.0" # or the latest version of lapin
tokio = { version = "1", features = ["full"] }
futures = "0.3"
```

### **Step 2: RabbitMQ RPC Flow Using `deadpool-lapin`**

#### **Client Side: Sending the RPC Request**
The client sends a message with a `reply-to` property so that the server knows where to send the response.

Here’s an example of how you can implement an RPC client using `deadpool-lapin`:

```rust
use deadpool_lapin::{Config, Pool};
use lapin::{
options::{BasicPublishOptions, QueueDeclareOptions, BasicConsumeOptions, BasicAckOptions},
BasicProperties, message::DeliveryResult, Channel, types::FieldTable,
};
use tokio_amqp::LapinTokioExt;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Step 1: Configure and create the RabbitMQ pool
let config = Config::default();
let pool = config.create_pool(Some(tokio_amqp::LapinTokioConnectionManager)).unwrap();
let connection = pool.get().await?;

// Step 2: Open a channel and declare the queues
let channel = connection.create_channel().await?;
let request_queue = "rpc_queue";

// Declare the queue (ensure the queue exists)
let _ = channel
.queue_declare(
request_queue,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;

// Step 3: Declare a temporary queue for the reply-to messages (auto-delete)
let reply_queue = channel
.queue_declare(
"",
QueueDeclareOptions {
exclusive: true,
auto_delete: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await?;

let correlation_id = uuid::Uuid::new_v4().to_string(); // Use a unique correlation ID

// Step 4: Publish the request message
let request_payload = b"Request Payload";

channel
.basic_publish(
"",
request_queue,
BasicPublishOptions::default(),
request_payload.to_vec(),
BasicProperties::default()
.with_reply_to(reply_queue.name().as_bytes().to_vec()) // Set reply-to queue
.with_correlation_id(correlation_id.clone().into()),
)
.await?;

println!("Request sent. Waiting for the response...");

// Step 5: Consume the reply message from the reply-to queue
let mut consumer = channel
.basic_consume(
reply_queue.name().as_str(),
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;

while let Some(Ok(delivery)) = consumer.next().await {
if let Some(corr_id) = delivery.properties.correlation_id().as_ref() {
if corr_id.as_str() == correlation_id {
// Response received with the matching correlation_id
println!("Received RPC response: {:?}", String::from_utf8_lossy(&delivery.data));
delivery.ack(BasicAckOptions::default()).await.unwrap();
break;
}
}
}

Ok(())
}
```

#### **Server Side: Processing the Request and Sending the Response**
The server listens for requests on the queue, processes the messages, and sends a response back to the client using the `reply-to` queue specified by the client.

Here’s an example of how to implement the RPC server:

```rust
use deadpool_lapin::{Config, Pool};
use lapin::{
options::{BasicAckOptions, BasicPublishOptions, BasicConsumeOptions, QueueDeclareOptions},
BasicProperties, Channel, message::DeliveryResult, types::FieldTable,
};
use tokio_amqp::LapinTokioExt;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Step 1: Configure and create the RabbitMQ pool
let config = Config::default();
let pool = config.create_pool(Some(tokio_amqp::LapinTokioConnectionManager)).unwrap();
let connection = pool.get().await?;

// Step 2: Open a channel and declare the queue
let channel = connection.create_channel().await?;
let request_queue = "rpc_queue";

let _ = channel
.queue_declare(
request_queue,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;

// Step 3: Start consuming messages from the queue
let mut consumer = channel
.basic_consume(
request_queue,
"",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;

while let Some(Ok(delivery)) = consumer.next().await {
let reply_to = delivery.properties.reply_to().clone();
let correlation_id = delivery.properties.correlation_id().clone();

// Process the message (in this case, we just echo back the received payload)
println!("Received RPC request: {:?}", String::from_utf8_lossy(&delivery.data));

// Step 4: Send the response back to the reply-to queue
if let Some(reply_to) = reply_to {
let response_payload = b"Response Payload";
channel
.basic_publish(
"",
reply_to.as_str(),
BasicPublishOptions::default(),
response_payload.to_vec(),
BasicProperties::default()
.with_correlation_id(correlation_id), // Attach the same correlation ID
)
.await?;
}

// Acknowledge the message
delivery.ack(BasicAckOptions::default()).await?;
}

Ok(())
}
```

### **Explanation of Key Components**

1. **Reply-to Queue**:
- The **client** declares a temporary (or persistent) queue to receive the RPC response.
- This is done using:
```rust
let reply_queue = channel
.queue_declare(
"", // Empty name to create a temporary exclusive queue
QueueDeclareOptions {
exclusive: true,
auto_delete: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await?;
```

2. **Correlation ID**:
- This is used to uniquely identify the request/response pair. The client generates a **UUID** for the correlation ID and sets it in the message properties when publishing the request.
- The **server** echoes this correlation ID back in the response to allow the client to match the response with the original request.

3. **Client-Side Logic**:
- The client sends a request and then consumes messages from the **reply-to** queue, checking the `correlation_id` to ensure the response matches the original request.

4. **Server-Side Logic**:
- The server consumes messages from the **request queue** (in this case, `rpc_queue`) and sends the response back to the `reply-to` queue specified in the message properties.

### **Handling Multiple RPC Requests**

Since RabbitMQ queues are shared, you can handle multiple clients and requests by:
1. Using **unique correlation IDs** for each request.
2. Each client can declare its own exclusive **reply-to queue** to avoid interference from other clients.

Alternatively, you could use a **shared reply-to queue** but rely on **correlation IDs** to properly route responses to the correct clients.

### **Conclusion**

This example shows how to implement an **RPC pattern** using **`deadpool-lapin`** in Rust for RabbitMQ. The client sends a message with a `reply-to` queue and a unique `correlation_id`, and the server processes the request and sends the response back to the specified `reply-to` queue, allowing the client to consume the response asynchronously. This pattern is useful for implementing request-response communication in distributed systems where the client expects a response for each request sent to RabbitMQ.
Loading

0 comments on commit 6771c45

Please sign in to comment.