Skip to content

Commit

Permalink
feat: update the messenger kafka commit frequency
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Aug 16, 2024
1 parent 77b0b4f commit 09f232e
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions packages/talos_messenger_core/src/services/inbound_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,28 @@ where
Ok(is_completed) if is_completed => {
self.suffix.set_item_state(version, SuffixItemState::Complete(reason));

// Pruning of suffix.
self.suffix.update_prune_index_from_version(version);
// Update the prune index in suffix if applicable.
let prune_index = self.suffix.update_prune_index_from_version(version);

// If there is a prune_index, it is safe to assume, all messages prioir to this are decided + on_commit actions are actioned.
// Therefore, it is safe to commit till that offset/version.
if let Some(index) = prune_index {
let prune_item_option = self.suffix.messages.get(index);

if let Some(Some(prune_item)) = prune_item_option {
let commit_offset = prune_item.item_ver + 1;
debug!("[Commit] Updating tpl to version .. {commit_offset}");
let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);

self.message_receiver.commit_async();
}
}

debug!("[Actions] All actions for version {version} completed!");
// Check prune eligibility by looking at the prune meta info.
if let Some(index_to_prune) = self.suffix.get_safe_prune_index() {
// Call prune method on suffix.
let _ = self.suffix.prune_till_index(index_to_prune);

// let commit_offset = version + 1;
// debug!("[Commit] Updating tpl to version .. {commit_offset}");
// let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64);

// self.message_receiver.commit_async();
}
}
_ => {}
Expand Down

0 comments on commit 09f232e

Please sign in to comment.