diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs index 752f556561..5991e07698 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaConsumerWrapper.cs @@ -73,8 +73,7 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins if (messageAsObject is MessageMetadata messageMetaData) { headersSize = GetHeadersSize(messageMetaData.Headers); - - transaction.InsertDistributedTraceHeaders(messageMetaData.Headers, DistributedTraceHeadersSetter); + transaction.InsertDistributedTraceHeaders(messageMetaData, DistributedTraceHeadersSetter); } ReportSizeMetrics(agent, transaction, topic, headersSize, messageAsObject); @@ -134,10 +133,14 @@ private static Func GetKeyAccessorFunc(Type t) => private static Func GetValueAccessorFunc(Type t) => VisibilityBypasser.Instance.GeneratePropertyAccessor(t, "Value"); - private static void DistributedTraceHeadersSetter(Headers carrier, string key, string value) + private static void DistributedTraceHeadersSetter(MessageMetadata carrier, string key, string value) { - carrier ??= new Headers(); - carrier.Add(key, Encoding.ASCII.GetBytes(value)); + carrier.Headers ??= new Headers(); + if (!string.IsNullOrEmpty(key)) + { + carrier.Headers.Remove(key); + carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value)); + } } private static long TryGetSize(object obj) diff --git a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs index d454838a4c..00732a61d6 100644 --- a/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs +++ b/src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Kafka/KafkaProducerWrapper.cs @@ -29,15 +29,19 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins var segment = transaction.StartMessageBrokerSegment(instrumentedMethodCall.MethodCall, MessageBrokerDestinationType.Topic, MessageBrokerAction.Produce, BrokerVendorName, topicPartition.Topic); - transaction.InsertDistributedTraceHeaders(messageMetadata.Headers, DistributedTraceHeadersSetter); + transaction.InsertDistributedTraceHeaders(messageMetadata, DistributedTraceHeadersSetter); return instrumentedMethodCall.MethodCall.Method.MethodName == "Produce" ? Delegates.GetDelegateFor(segment) : Delegates.GetAsyncDelegateFor(agent, segment); } - private static void DistributedTraceHeadersSetter(Headers carrier, string key, string value) + private static void DistributedTraceHeadersSetter(MessageMetadata carrier, string key, string value) { - carrier ??= new Headers(); - carrier.Add(key, Encoding.ASCII.GetBytes(value)); + carrier.Headers ??= new Headers(); + if (!string.IsNullOrEmpty(key)) + { + carrier.Headers.Remove(key); + carrier.Headers.Add(key, Encoding.ASCII.GetBytes(value)); + } } } diff --git a/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs b/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs index c7a1c4b24c..5acf079e96 100644 --- a/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs +++ b/tests/Agent/IntegrationTests/ContainerIntegrationTests/Tests/KafkaTests.cs @@ -91,7 +91,7 @@ public void Test() () => Assert.True(produceSpan.IntrinsicAttributes.ContainsKey("parentId")), () => Assert.NotNull(consumeTxnSpan), () => Assert.True(consumeTxnSpan.UserAttributes.ContainsKey("kafka.consume.byteCount")), - () => Assert.InRange((long)consumeTxnSpan.UserAttributes["kafka.consume.byteCount"], 20, 30), // usually is 24 - 26 + () => Assert.InRange((long)consumeTxnSpan.UserAttributes["kafka.consume.byteCount"], 460, 470), // includes headers () => Assert.True(consumeTxnSpan.IntrinsicAttributes.ContainsKey("traceId")), () => Assert.False(consumeTxnSpan.IntrinsicAttributes.ContainsKey("parentId")) );