Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
tippmar-nr committed Nov 8, 2024
1 parent 9393eda commit 49a7159
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,36 @@

namespace NewRelic.Providers.Wrapper.AzureServiceBus;

public class AzureServiceBusReceiveWrapper : IWrapper
public class AzureServiceBusReceiveWrapper : AzureServiceBusWrapperBase
{
private const string BrokerVendorName = "AzureServiceBus";
private static ConcurrentDictionary<Type, Func<object, object>> _getResultFromGenericTask = new();
private static readonly ConcurrentDictionary<Type, Func<object, object>> _getResultFromGenericTask = new();

public bool IsTransactionRequired => false;

public CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
{
var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusReceiveWrapper));
return new CanWrapResponse(canWrap);
}

public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget;
string queueName = serviceBusReceiver.EntityPath;
string identifier = serviceBusReceiver.Identifier;
string fqns = serviceBusReceiver.FullyQualifiedNamespace;

// create a transaction
transaction = agent.CreateTransaction(
destinationType: MessageBrokerDestinationType.Queue,
brokerVendorName: BrokerVendorName,
destination: queueName);

if (instrumentedMethodCall.IsAsync)
{
transaction.AttachToAsync();
transaction.DetachFromPrimary(); //Remove from thread-local type storage
}
string queueName = serviceBusReceiver.EntityPath; // marty-test-queue
//string identifier = serviceBusReceiver.Identifier; // -9e860ed4-b16b-4d02-96e4-d8ed224ae24b
//string fqns = serviceBusReceiver.FullyQualifiedNamespace; // mt-test-servicebus.servicebus.windows.net

MessageBrokerAction action =
instrumentedMethodCall.MethodCall.Method.MethodName switch
{
"ReceiveMessagesAsync" => MessageBrokerAction.Consume,
"ReceiveDeferredMessagesAsync" => MessageBrokerAction.Consume,
"PeekMessagesInternalAsync" => MessageBrokerAction.Peek,
"AbandonMessageAsync" => MessageBrokerAction.Purge, // TODO is this correct ???,
"CompleteMessageAsync" => MessageBrokerAction.Consume,
"DeadLetterInternalAsync" => MessageBrokerAction.Purge, // TODO is this correct ???
"DeferMessageAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values???
"RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values???
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected method call: {instrumentedMethodCall.MethodCall.Method.MethodName}")
};
{
"ReceiveMessagesAsync" => MessageBrokerAction.Consume,
"ReceiveDeferredMessagesAsync" => MessageBrokerAction.Consume,
"PeekMessagesInternalAsync" => MessageBrokerAction.Peek,
"AbandonMessageAsync" => MessageBrokerAction.Purge, // TODO is this correct ???,
"CompleteMessageAsync" => MessageBrokerAction.Consume,
"DeadLetterInternalAsync" => MessageBrokerAction.Purge, // TODO is this correct ???
"DeferMessageAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values???
"RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values???
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}")
};

// start a message broker segment
var segment = transaction.StartMessageBrokerSegment(
Expand All @@ -65,61 +50,89 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
action,
BrokerVendorName, queueName);

// return an async delegate
return Delegates.GetAsyncDelegateFor<Task>(
agent,
segment,
true,
HandleResponse,
TaskContinuationOptions.ExecuteSynchronously);

void HandleResponse(Task responseTask)
if (instrumentedMethodCall.IsAsync)
{
try
// return an async delegate
return Delegates.GetAsyncDelegateFor<Task>(
agent,
segment,
false,
HandleResponse,
TaskContinuationOptions.ExecuteSynchronously);

void HandleResponse(Task responseTask)
{
if (responseTask.IsFaulted)
try
{
// TODO: handle error here?
return;
}

// if the response contains a list of messages,
// get the first message from the response and extract DT headers
dynamic resultObj = GetTaskResult(responseTask);
if (resultObj != null && resultObj.Count > 0) // TODO need to verify resultObj is of type IReadOnlyList<ServiceBusReceivedMessage>
{
var msg = resultObj[0];
if (msg.ApplicationProperties is ReadOnlyDictionary<string, object> applicationProperties)
if (responseTask.IsFaulted)
{
transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue);
transaction.NoticeError(responseTask.Exception); // TODO ???
return;
}

var resultObj = GetTaskResultFromObject(responseTask);
ExtractDTHeadersIfAvailable(resultObj);
}
finally
{
segment.End();
}
}
finally
{
segment.End();
transaction.End();
}
}
}

private IEnumerable<string> ProcessHeaders(ReadOnlyDictionary<string, object> applicationProperties, string key)
{
var headerValues = new List<string>();
foreach (var item in applicationProperties)
return Delegates.GetDelegateFor<object>(
onFailure: transaction.NoticeError,
onComplete: () => segment.End(),
onSuccess: ExtractDTHeadersIfAvailable);


void ExtractDTHeadersIfAvailable(object resultObj)
{
if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase))
if (resultObj != null)
{
headerValues.Add(item.Value as string);
switch (instrumentedMethodCall.MethodCall.Method.MethodName)
{
case "ReceiveMessagesAsync":
case "ReceiveDeferredMessagesAsync":
case "PeekMessagesInternalAsync":
// if the response contains a list of messages,
// get the first message from the response and extract DT headers
dynamic messages = resultObj;
if (messages.Count > 0)
{
var msg = messages[0];
if (msg.ApplicationProperties is ReadOnlyDictionary<string, object> applicationProperties)
{
transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue);
}
}
break;
}
}
}
IEnumerable<string> ProcessHeaders(ReadOnlyDictionary<string, object> applicationProperties, string key)
{
var headerValues = new List<string>();
foreach (var item in applicationProperties)
{
if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase))
{
headerValues.Add(item.Value as string);
}
}

return headerValues;
return headerValues;
}
}
}

private static object GetTaskResult(object task)
private static object GetTaskResultFromObject(object taskObj)
{
if (((Task)task).IsFaulted)
var task = taskObj as Task;
if (task == null)
{
return null;
}
if (task.IsFaulted)
{
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,36 @@
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Reflection;

namespace NewRelic.Providers.Wrapper.AzureServiceBus;

public class AzureServiceBusSendWrapper : IWrapper
public class AzureServiceBusSendWrapper : AzureServiceBusWrapperBase
{
private const string BrokerVendorName = "AzureServiceBus";
private static ConcurrentDictionary<Type, Func<object, object>> _getResultFromGenericTask = new();

public bool IsTransactionRequired => true;

public CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
{
var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusSendWrapper));
return new CanWrapResponse(canWrap);
}

public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget;
string queueName = serviceBusReceiver.EntityPath;
string identifier = serviceBusReceiver.Identifier;
string fqns = serviceBusReceiver.FullyQualifiedNamespace;

// ???
if (instrumentedMethodCall.IsAsync)
{
transaction.AttachToAsync();
transaction.DetachFromPrimary(); //Remove from thread-local type storage
}
string queueName = serviceBusReceiver.EntityPath; // marty-test-queue
//string identifier = serviceBusReceiver.Identifier; // -9e860ed4-b16b-4d02-96e4-d8ed224ae24b
//string fqns = serviceBusReceiver.FullyQualifiedNamespace; // mt-test-servicebus.servicebus.windows.net

// determine message broker action based on method name
MessageBrokerAction action =
instrumentedMethodCall.MethodCall.Method.MethodName switch
{
"SendMessagesAsync" => MessageBrokerAction.Produce,
"ScheduleMessagesAsync" => MessageBrokerAction.Produce,
"CancelScheduledMessagesAsync" => MessageBrokerAction.Purge, // TODO is this correct ???,
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected method call: {instrumentedMethodCall.MethodCall.Method.MethodName}")
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}")
};

// start a message broker segment
Expand All @@ -58,6 +44,7 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
if (action == MessageBrokerAction.Produce)
{
dynamic messages = instrumentedMethodCall.MethodCall.MethodArguments[0];

// iterate all messages that are being sent,
// insert DT headers into each message
foreach (var message in messages)
Expand All @@ -69,10 +56,10 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins

// return an async delegate
return Delegates.GetAsyncDelegateFor<Task>(agent, segment);
}

private void ProcessHeaders(IDictionary<string, object> applicationProperties, string key, string value)
{
applicationProperties.Add(key, value);
void ProcessHeaders(IDictionary<string, object> applicationProperties, string key, string value)
{
applicationProperties.Add(key, value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;

namespace NewRelic.Providers.Wrapper.AzureServiceBus
{
public abstract class AzureServiceBusWrapperBase : IWrapper
{
protected const string BrokerVendorName = "AzureServiceBus";

public bool IsTransactionRequired => true; // only instrument service bus methods if we're already in a transaction
public abstract CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo);

public abstract AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent,ITransaction transaction);

}
}

0 comments on commit 49a7159

Please sign in to comment.