diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 8ff5c3f..72573c2 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -66,20 +66,28 @@ where Ok(is_completed) if is_completed => { self.suffix.set_item_state(version, SuffixItemState::Complete(reason)); - // Pruning of suffix. - self.suffix.update_prune_index_from_version(version); + // Update the prune index in suffix if applicable. + let prune_index = self.suffix.update_prune_index_from_version(version); + + // If there is a prune_index, it is safe to assume, all messages prioir to this are decided + on_commit actions are actioned. + // Therefore, it is safe to commit till that offset/version. + if let Some(index) = prune_index { + let prune_item_option = self.suffix.messages.get(index); + + if let Some(Some(prune_item)) = prune_item_option { + let commit_offset = prune_item.item_ver + 1; + debug!("[Commit] Updating tpl to version .. {commit_offset}"); + let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); + + self.message_receiver.commit_async(); + } + } debug!("[Actions] All actions for version {version} completed!"); // Check prune eligibility by looking at the prune meta info. if let Some(index_to_prune) = self.suffix.get_safe_prune_index() { // Call prune method on suffix. let _ = self.suffix.prune_till_index(index_to_prune); - - // let commit_offset = version + 1; - // debug!("[Commit] Updating tpl to version .. {commit_offset}"); - // let _ = self.message_receiver.update_offset_to_commit(commit_offset as i64); - - // self.message_receiver.commit_async(); } } _ => {}