Skip to content

Commit

Permalink
Fix KafkaMessageBuffer purging with incorrect id (#46)
Browse files Browse the repository at this point in the history
KafkaMessageBuffer incorrectly used the id of the message being added rather than the one being kicked when there was no more space for concurrent split messages.

Also added minor improvements so can purge multiple items at the same time.

Using short for messageId in Merger as 4 byte ids are legacy and mean a short. Improves debuggability.
  • Loading branch information
peter-quix authored Apr 23, 2024
1 parent 4dfb5f6 commit ddffa39
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 20 deletions.
6 changes: 3 additions & 3 deletions builds/csharp/nuget/build_nugets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import fileinput
from typing import List

version = "0.7.0.0"
informal_version = "0.7.0.0"
nuget_version = "0.7.0.0"
version = "0.7.1.0"
informal_version = "0.7.1.0-dev1"
nuget_version = "0.7.1.0-dev1"


def updatecsproj(projfilepath):
Expand Down
33 changes: 23 additions & 10 deletions src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private BufferedValue GetOrCreateMessageBuffer(MergerBufferId bufferId, int tota
var kickOut = groupBuffers.OrderBy(x => x.LastUpdate).First();
indexToUse = Array.IndexOf(groupBuffers, kickOut);
this.logger.LogWarning("Concurrent split message track count reached, dropping oldest msg with segments. Group key: {0}, msg id: {1}", kickOut.BufferId.Key, kickOut.BufferId.MessageId);
this.OnMessagePurged?.Invoke(new MessagePurgedEventArgs(bufferId));
this.OnMessagePurged?.Invoke(new MessagePurgedEventArgs(kickOut.BufferId));
}

msgBuffer = new BufferedValue(bufferId, totalMessageCount);
Expand Down Expand Up @@ -237,12 +237,19 @@ private void PerformTtlCheck()
var insideCutoff = msgSegment.LastUpdate > cutoff;
if (insideCutoff && insideDelta) continue; // not old enough
msgGroupBuffer.Value[index] = null;
if (!insideCutoff) this.logger.LogWarning("Message segment expired, only a part of the message was received within allowed time. Group key: {0}, msg id: {1}.",
if (!insideCutoff)
{
this.logger.LogWarning(
"Message segment expired, only a part of the message was received within allowed time. Group key: {0}, msg id: {1}.",
msgSegment.BufferId.Key, msgSegment.BufferId.MessageId);
}
else
this.logger.LogWarning("Message segment expired, only a part of the message was received within offset delta {0}. Group key: {1}, msg id: {2}.",
{
this.logger.LogWarning(
"Message segment expired, only a part of the message was received within offset delta {0}. Group key: {1}, msg id: {2}.",
offsetDelta, msgSegment.BufferId.Key, msgSegment.BufferId.MessageId);

}

purged.Add(msgSegment.BufferId);
}
}
Expand All @@ -256,10 +263,7 @@ private void PerformTtlCheck()
}
}

foreach (var bufferId in purged)
{
this.OnMessagePurged?.Invoke(new MessagePurgedEventArgs(bufferId));
}
this.OnMessagePurged?.Invoke(new MessagePurgedEventArgs(purged));
}

/// <summary>
Expand Down Expand Up @@ -327,15 +331,24 @@ public class MessagePurgedEventArgs
/// <summary>
/// Message group key
/// </summary>
public readonly MergerBufferId BufferId;
public readonly ICollection<MergerBufferId> BufferIds;

/// <summary>
/// Initializes a new instance of <see cref="MessagePurgedEventArgs"/>
/// </summary>
/// <param name="bufferId">Message group key</param>
public MessagePurgedEventArgs(MergerBufferId bufferId)
{
this.BufferId = bufferId;
this.BufferIds = new MergerBufferId[] { bufferId };
}

/// <summary>
/// Initializes a new instance of <see cref="MessagePurgedEventArgs"/>
/// </summary>
/// <param name="bufferIds">Message group keys</param>
public MessagePurgedEventArgs(ICollection<MergerBufferId> bufferIds)
{
this.BufferIds = bufferIds;
}
}
}
Expand Down
23 changes: 21 additions & 2 deletions src/QuixStreams.Kafka.Transport/SerDes/KafkaMessageMerger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class KafkaMessageMerger
public KafkaMessageMerger(KafkaMessageBuffer messageBuffer)
{
this.helper = new KafkaMessageMergerHelper(messageBuffer, this.logger);
this.helper.OnMessageSegmentsPurged += (bufferId) =>
this.helper.OnMessageSegmentsPurged += (bufferIds) =>
{
if (this.RemoveFromBuffer(bufferId, false))
if (this.RemoveFromBuffer(bufferIds, false) > 0)
{
RaiseNextPackageIfReady().GetAwaiter().GetResult();
}
Expand Down Expand Up @@ -215,9 +215,28 @@ private bool RemoveFromBuffer(MergerBufferId bufferId, bool purge = true)
{
if (bufferId.Equals(default)) return false;
if (!pendingMessages.TryRemove(bufferId, out _)) return false;

// Remove from order entry. In general this is expected to be early in the list so no full scan
var orderEntry = this.packageOrder.FirstOrDefault(y => y.Value.Equals(bufferId));
if (!orderEntry.Equals(default(KeyValuePair<long, MergerBufferId>)))
{
this.packageOrder.Remove(orderEntry.Key);
if (this.packageOrder.Count == 0) bufferCounter = 1;
}
if (purge) this.helper.Purge(bufferId);
return true;
}

private int RemoveFromBuffer(ICollection<MergerBufferId> bufferIds, bool purge = true)
{
var counter = 0;
foreach (var bufferId in bufferIds)
{
counter += this.RemoveFromBuffer(bufferId, purge) ? 1 : 0;
}

return counter;
}

public void HandleRevoked(RevokedEventArgs args)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal KafkaMessageMergerHelper(KafkaMessageBuffer buffer, ILogger logger)
this.logger = logger ?? NullLogger.Instance;
this.buffer.OnMessagePurged += (ea) =>
{
this.OnMessageSegmentsPurged?.Invoke(ea.BufferId);
this.OnMessageSegmentsPurged?.Invoke(ea.BufferIds);
};
}

Expand Down Expand Up @@ -69,7 +69,10 @@ public MessageMergeResult TryMerge(KafkaMessage messageSegment, out MergerBuffer
return MessageMergeResult.Unmerged;
}

var messageId = Convert.ToBase64String(messageIdBytes); // Whether it is guid or not, doesn't matter, uniqueness matters

var messageId = messageIdBytes.Length == 4
? BitConverter.ToInt16(messageIdBytes, 0).ToString()
: Convert.ToBase64String(messageIdBytes); // Whether it is guid or not, doesn't matter, uniqueness matters
var messageIndex = BitConverter.ToInt32(messageIndexBytes, 0);
var messageCount = BitConverter.ToInt32(messageCountBytes, 0);

Expand Down Expand Up @@ -137,7 +140,7 @@ public void Purge(MergerBufferId bufferId)
}

/// <inheritdoc />
public event Action<MergerBufferId> OnMessageSegmentsPurged;
public event Action<ICollection<MergerBufferId>> OnMessageSegmentsPurged;

private MergedKafkaMessage AssembleMessage(MergerBufferId bufferId, ref MergerBufferId messageGroupId)
{
Expand Down
4 changes: 2 additions & 2 deletions src/QuixStreams.Kafka/KafkaMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ public class KafkaMessage
protected internal Headers ConfluentHeaders { get; protected set; }

/// <summary>
/// The estimated message size including header, key, value
/// The estimated message size in bytes including header, key, value
/// </summary>
public int MessageSize { get; protected set; }

/// <summary>
/// The estimated (worst case) size of the message headers
/// The estimated (worst case) size of the message headers in bytes
/// </summary>
public int HeaderSize { get; protected set; }

Expand Down

0 comments on commit ddffa39

Please sign in to comment.