Skip to content

Commit

Permalink
Merge pull request #96 from kaleido-io/one-ack-per-batch
Browse files Browse the repository at this point in the history
Only send one ack per batch
  • Loading branch information
peterbroadhurst authored Sep 12, 2022
2 parents 5b2c2a3 + 8239732 commit 6a933e7
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions src/eventstream-proxy/eventstream-proxy.base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,21 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase {
return;
}

this.logger.log(`Received ack ${data.id}`);
if (this.socket !== undefined && this.awaitingAck.find(msg => msg.id === data.id)) {
const firstAck = this.awaitingAck.find(msg => msg.id === data.id);
const inflight = this.awaitingAck.find(msg => msg.id === data.id)
this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`);
if (this.socket !== undefined && inflight !== undefined) {
this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id);
if (firstAck) {
this.socket.ack(firstAck.batchNumber);
if (
// If nothing is left awaiting an ack - then we clearly need to ack
this.awaitingAck.length === 0 ||
(
// Or if we have a batch number associated with this ID, then we can only ack if there
// are no other messages in-flight with the same batch number.
inflight.batchNumber !== undefined && !this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber)
)
) {
this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`);
this.socket.ack(inflight.batchNumber);
}
}
}
Expand Down

0 comments on commit 6a933e7

Please sign in to comment.