Skip to content

Commit

Permalink
Added workload id support (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
Carael authored May 15, 2024
1 parent a39d2cc commit f147474
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 20 deletions.
4 changes: 3 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="AutoMapper.Extensions.Microsoft.DependencyInjection" Version="12.0.1" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.4" />
<PackageVersion Include="Azure.Identity" Version="1.11.3" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
<PackageVersion Include="coverlet.collector" Version="6.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down Expand Up @@ -33,6 +34,7 @@
<PackageVersion Include="Snapshooter.Xunit" Version="0.14.1" />
<PackageVersion Include="Squadron.Mongo" Version="0.18.0" />
<PackageVersion Include="StrongGrid" Version="0.106.0" />
<PackageVersion Include="System.Configuration.ConfigurationManager" Version="8.0.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.7">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
7 changes: 3 additions & 4 deletions src/Messaging.AzureServiceBus/AzureServiceBusOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ namespace Magnet.Messaging.AzureServiceBus;

public class AzureServiceBusOptions
{
public string ConnectionString { get; set; }

public string Topic { get; set; }

public string? ConnectionString { get; set; }
public string? Url { get; set; }
public string Topic { get; set; } = "magnet";
public TimeSpan ReceiveTimeout { get; set; } = TimeSpan.FromMinutes(5);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Configuration;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

Expand All @@ -10,8 +11,22 @@ public static MagnetServerBuilder AddAzureServiceBus(
this MagnetServerBuilder builder,
IConfiguration configuration)
{
IConfigurationSection section = configuration.GetSection("Magnet:ServiceBus");
AzureServiceBusOptions azureOptions = section.Get<AzureServiceBusOptions>();
AzureServiceBusOptions? azureOptions = configuration
.GetSection("Magnet:ServiceBus")
.Get<AzureServiceBusOptions>();

if (azureOptions == null)
{
throw new ConfigurationErrorsException(
"Magnet:ServiceBus section is missing in the configuration");
}

if (azureOptions.ConnectionString == null && azureOptions.Url == null)
{
throw new ConfigurationErrorsException(
"ConnectionString or Url is required for Azure Service Bus.");
}

builder.AddAzureServiceBus(azureOptions);
return builder;
}
Expand All @@ -23,6 +38,7 @@ public static MagnetServerBuilder AddAzureServiceBus(
{
builder.Services.AddSingleton(options);
builder.Services.AddSingleton<IMessageBus, MessageBus>();

return builder;
}

Expand All @@ -33,6 +49,7 @@ public static MagnetServerBuilder AddAzureServiceBus(
{
var options = new AzureServiceBusOptions();
setup.Invoke(options);

return builder.AddAzureServiceBus(options);
}

Expand Down
55 changes: 42 additions & 13 deletions src/Messaging.AzureServiceBus/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Extensions.Logging;
Expand All @@ -19,15 +20,46 @@ public MessageBus(AzureServiceBusOptions options, ILogger<MessageBus> logger)
{
_options = options;
_logger = logger;
_client = new ServiceBusClient(options.ConnectionString);
_adminClient = new ServiceBusAdministrationClient(
options.ConnectionString,
new ServiceBusAdministrationClientOptions
{
Diagnostics = { IsDistributedTracingEnabled = true }
});
_client = GetServiceBusClient(options);
_adminClient = GetServiceBusAdministrationClient(options);
}

private ServiceBusClient GetServiceBusClient(AzureServiceBusOptions options) => options switch
{
{ ConnectionString: { } connectionString } => new ServiceBusClient(connectionString),
{ Url: { } url } => new ServiceBusClient(url, new WorkloadIdentityCredential()),
_ => throw new ArgumentException(
"ConnectionString or Url is required for Azure Service Bus.")
};

private ServiceBusAdministrationClient GetServiceBusAdministrationClient(
AzureServiceBusOptions options) => options switch
{
{ ConnectionString: { } connectionString } =>
new ServiceBusAdministrationClient(
connectionString,
new ServiceBusAdministrationClientOptions
{
Diagnostics =
{
IsDistributedTracingEnabled = true
}
}),
{ Url: { } url } =>
new ServiceBusAdministrationClient(
url,
new WorkloadIdentityCredential(),
new ServiceBusAdministrationClientOptions
{
Diagnostics =
{
IsDistributedTracingEnabled = true
}
}),
_ => throw new ArgumentException(
"ConnectionString or Url is required for Azure Service Bus.")
};

public async Task<string> PublishAsync(MagnetMessage message)
{
ServiceBusSender sender = _client.CreateSender(_options.Topic);
Expand All @@ -43,8 +75,7 @@ private static ServiceBusMessage CreateMessage(MagnetMessage message)
var jsonMessage = JsonSerializer.Serialize(message);
var sbMessage = new ServiceBusMessage(jsonMessage)
{
MessageId = message.Id.ToString("N"),
Subject = message.Type
MessageId = message.Id.ToString("N"), Subject = message.Type
};

return sbMessage;
Expand All @@ -61,8 +92,7 @@ public async Task<MagnetMessage> GetNextAsync(
name,
new ServiceBusReceiverOptions
{
PrefetchCount = 1,
ReceiveMode = ServiceBusReceiveMode.PeekLock,
PrefetchCount = 1, ReceiveMode = ServiceBusReceiveMode.PeekLock,
});

ServiceBusReceivedMessage message = default;
Expand Down Expand Up @@ -138,8 +168,7 @@ public async Task<string> SubscribeAsync(
_options.Topic,
SubscriptionName.Create(name))
{
AutoDeleteOnIdle = TimeSpan.FromHours(1),
RequiresSession = false
AutoDeleteOnIdle = TimeSpan.FromHours(1), RequiresSession = false
};

SubscriptionProperties properties = await _adminClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<RootNamespace>Magnet</RootNamespace>
<PackageId>Magnet.Messaging.AzureServiceBus</PackageId>
<IsPackable>true</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
Expand All @@ -19,11 +20,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Azure.Messaging.ServiceBus" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
<PackageReference Include="System.Configuration.ConfigurationManager" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit f147474

Please sign in to comment.