Skip to content

Commit

Permalink
fix: messenger publish order and hang state (#102)
Browse files Browse the repository at this point in the history
* fix: ordering of on_commit kafka messages

* fix: hanging of messenger due to unexpected message format

* chore: use debug level logging for consumer messages
  • Loading branch information
gk-kindred authored Aug 12, 2024
1 parent 78b9df3 commit c080256
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 35 deletions.
51 changes: 32 additions & 19 deletions packages/talos_messenger_actions/src/kafka/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use async_trait::async_trait;
use log::{error, info};
use futures_util::future::join_all;
use log::{debug, error, info};
use tokio::sync::mpsc;

use talos_messenger_core::{
Expand Down Expand Up @@ -31,31 +32,43 @@ where
info!("Running Kafka Publisher service!!");
loop {
tokio::select! {
Some(actions) = self.rx_actions_channel.recv() => {
let MessengerCommitActions {version, commit_actions, headers } = actions;
actions_result = self.rx_actions_channel.recv() => {

if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){
match get_actions_deserialised::<Vec<KafkaAction>>(publish_actions_for_type) {
Ok(actions) => {
match actions_result {
Some(actions) => {

let total_len = actions.len() as u32;
let MessengerCommitActions {version, commit_actions, headers } = actions;

let headers_cloned = headers.clone();
for action in actions {
let publisher = self.publisher.clone();
let headers = headers_cloned.clone();
// Publish the message
tokio::spawn(async move {
publisher.send(version, action, headers, total_len ).await;
});
if let Some(publish_actions_for_type) = commit_actions.get(&self.publisher.get_publish_type().to_string()){
match get_actions_deserialised::<Vec<KafkaAction>>(publish_actions_for_type) {
Ok(actions) => {

let total_len = actions.len() as u32;

let headers_cloned = headers.clone();

let publish_vec = actions.into_iter().map(|action| {
let publisher = self.publisher.clone();
let headers = headers_cloned.clone();
async move {
publisher.send(version, action, headers, total_len ).await;
}
});
join_all(publish_vec).await;
},
Err(err) => {
error!("Failed to deserialise for version={version} key={} for data={:?} with error={:?}",&self.publisher.get_publish_type(), err.data, err.reason )
},
}
},
Err(err) => {
error!("Failed to deserialise for version={version} key={} for data={:?} with error={:?}",&self.publisher.get_publish_type(), err.data, err.reason )
},
}
},
None=> {
debug!("No actions to process..")
}

}


}
}
}
Expand Down
45 changes: 29 additions & 16 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ where
// Call prune method on suffix.
let _ = self.suffix.prune_till_index(index_to_prune);

let commit_offset = version + 1;
debug!("[Commit] Updating tpl to version .. {commit_offset}");
let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);
// let commit_offset = version + 1;
// debug!("[Commit] Updating tpl to version .. {commit_offset}");
// let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);

self.message_receiver.commit_async();
// self.message_receiver.commit_async();
}
}
_ => {}
Expand Down Expand Up @@ -145,14 +145,14 @@ where
loop {
tokio::select! {
// 1. Consume message.
Ok(Some(msg)) = self.message_receiver.consume_message() => {
// Ok(Some(msg)) = self.message_receiver.consume_message() => {
reciever_result = self.message_receiver.consume_message() => {

// 2. Add/update to suffix.
match msg {
match reciever_result {
// 2.1 For CM - Install messages on the version
ChannelMessage::Candidate(candidate) => {

Ok(Some(ChannelMessage::Candidate(candidate))) => {
let version = candidate.message.version;
debug!("Candidate version received is {version}");
if version > 0 {
// insert item to suffix
let _ = self.suffix.insert(version, candidate.message.into());
Expand All @@ -176,10 +176,9 @@ where
} else {
warn!("Version 0 will not be inserted into suffix.")
}

},
// 2.2 For DM - Update the decision with outcome + safepoint.
ChannelMessage::Decision(decision) => {
Ok(Some(ChannelMessage::Decision(decision))) => {
let version = decision.message.get_candidate_version();
info!("[Decision Message] Version received = {} and {}", decision.decision_version, version);

Expand All @@ -190,21 +189,35 @@ where
self.process_next_actions().await?;

},
Ok(None) => {
info!("No message to process..");
},
Err(error) => {
// Catch the error propogated, and if it has a version, mark the item as completed.
if let Some(version) = error.version {
if let Some(item_to_update) = self.suffix.get_mut(version){
item_to_update.item.set_state(SuffixItemState::Complete(SuffixItemCompleteStateReason::ErrorProcessing));
}
}
error!("error consuming message....{:?}", error);
},
}

}
// Receive feedback from publisher.
Some(feedback) = self.rx_feedback_channel.recv() => {
match feedback {
MessengerChannelFeedback::Error(version, key, message_error) => {
feedback_result = self.rx_feedback_channel.recv() => {
match feedback_result {
Some(MessengerChannelFeedback::Error(version, key, message_error)) => {
error!("Failed to process version={version} with error={message_error:?}");
self.handle_action_failed(version, &key);

},
MessengerChannelFeedback::Success(version, key) => {
Some(MessengerChannelFeedback::Success(version, key)) => {
info!("Successfully processed version={version} with action_key={key}");
self.handle_action_success(version, &key);
},
None => {
debug!("No feedback message to process..");
}
}
// Process the next items with commit actions
self.process_next_actions().await?
Expand Down

0 comments on commit c080256

Please sign in to comment.