Skip to content

Commit

Permalink
#349 Azure Service Bus correlation filters
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
  • Loading branch information
EtherZa committed Dec 11, 2024
1 parent 2716054 commit 2610885
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 10 deletions.
4 changes: 2 additions & 2 deletions docs/provider_azure_servicebus.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ mbb.Consume<TMessage>(x => x
.MaxAutoLockRenewalDuration(TimeSpan.FromMinutes(7))
.SubQueue(SubQueue.DeadLetter)
.PrefetchCount(10)
.SubscriptionSqlFilter("1=1") // ASB subscription SQL filters can also be created - see topology creation section
.SubscriptionSqlFilter("1=1") // ASB subscription filters can also be created - see topology creation section
.Instances(1));
```

Expand Down Expand Up @@ -408,7 +408,7 @@ mbb.Consume<SomeMessage>(x => x
.Topic("some-topic")
.WithConsumer<SomeMessageConsumer>()
.SubscriptionName("some-service")
.SubscriptionSqlFilter("1=1") // this will create a rule with SQL filter
.SubscriptionCorrelationFilter(correlationId: "Sample") // this will create a rule to filter messages with a CorrelationId of 'Sample'
.CreateTopicOptions((options) =>
{
options.RequiresDuplicateDetection = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ static internal void SetMaxConcurrentSessions(this AbstractConsumerSettings cons
static internal int? GetMaxConcurrentSessions(this AbstractConsumerSettings consumerSettings)
=> consumerSettings.GetOrDefault<int?>(AsbProperties.MaxConcurrentSessionsKey);

static internal IDictionary<string, SubscriptionSqlRule> GetRules(this AbstractConsumerSettings consumerSettings, bool createIfNotExists = false)
static internal IDictionary<string, SubscriptionRule> GetRules(this AbstractConsumerSettings consumerSettings, bool createIfNotExists = false)
{
var filterByName = consumerSettings.GetOrDefault<IDictionary<string, SubscriptionSqlRule>>(AsbProperties.RulesKey);
var filterByName = consumerSettings.GetOrDefault<IDictionary<string, SubscriptionRule>>(AsbProperties.RulesKey);
if (filterByName == null && createIfNotExists)
{
filterByName = new Dictionary<string, SubscriptionSqlRule>();
filterByName = new Dictionary<string, SubscriptionRule>();
consumerSettings.Properties[AsbProperties.RulesKey] = filterByName;
}
return filterByName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,70 @@ public static TConsumerBuilder SubscriptionSqlFilter<TConsumerBuilder>(this TCon
return builder;
}

/// <summary>
/// Adds a named correlation filter to the subscription (Azure Service Bus). Setting relevant only if topology provisioning enabled.
/// </summary>
/// <typeparam name="TConsumerBuilder"></typeparam>
/// <param name="builder"></param>
/// <param name="ruleName">The name of the filter</param>
/// <param name="correlationId">Value to be applied as the 'CorrelationId' filter.<param>
/// <param name="messageId">Value to be applied as the 'MessageId' filter.<param>
/// <param name="to">Value to be applied as the 'To' filter.<param>
/// <param name="replyTo">Value to be applied as the 'ReplyTo' filter.<param>
/// <param name="subject">Value to be applied as the 'Subject' filter.<param>
/// <param name="sessionId">Value to be applied as the 'SessionId' filter.<param>
/// <param name="replyToSessionId">Value to be applied as the 'ReplyToSessionId' filter.<param>
/// <param name="contentType">Value to be applied as the 'ContentType' filter.<param></param>
/// <param name="applicationProperties">Filters to be applied to application specific properties.</param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static TConsumerBuilder SubscriptionCorrelationFilter<TConsumerBuilder>(
this TConsumerBuilder builder,
string ruleName = "default",
string correlationId = "",
string messageId = "",
string to = "",
string replyTo = "",
string subject = "",
string sessionId = "",
string replyToSessionId = "",
string contentType = "",
IDictionary<string, object> applicationProperties = null)
where TConsumerBuilder : IAbstractConsumerBuilder
{
if (builder is null) throw new ArgumentNullException(nameof(builder));

if (string.IsNullOrWhiteSpace(correlationId)
&& string.IsNullOrWhiteSpace(messageId)
&& string.IsNullOrWhiteSpace(to)
&& string.IsNullOrWhiteSpace(replyTo)
&& string.IsNullOrWhiteSpace(subject)
&& string.IsNullOrWhiteSpace(sessionId)
&& string.IsNullOrWhiteSpace(replyToSessionId)
&& string.IsNullOrWhiteSpace(contentType)
&& (applicationProperties == null || applicationProperties?.Count == 0))
{
throw new ArgumentException("At least one property must contain a value to use as a filter");
}

var filterByName = builder.ConsumerSettings.GetRules(createIfNotExists: true);
filterByName[ruleName] = new SubscriptionCorrelationRule
{
Name = ruleName,
CorrelationId = correlationId,
MessageId = messageId,
To = to,
ReplyTo = replyTo,
Subject = subject,
SessionId = sessionId,
ReplyToSessionId = replyToSessionId,
ContentType = contentType,
ApplicationProperties = applicationProperties
};

return builder;
}

/// <summary>
/// <see cref="CreateQueueOptions"/> when the ASB queue does not exist and needs to be created
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public record SubscriptionCorrelationRule : SubscriptionRule
{
public string CorrelationId { get; set; }
public string MessageId { get; set; }
public string To { get; set; }
public string ReplyTo { get; set; }
public string Subject { get; set; }
public string SessionId { get; set; }
public string ReplyToSessionId { get; set; }
public string ContentType { get; set; }
public IDictionary<string, object> ApplicationProperties { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public abstract record SubscriptionRule
{
public string Name { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
namespace SlimMessageBus.Host.AzureServiceBus;

public record SubscriptionSqlRule
public record SubscriptionSqlRule : SubscriptionRule
{
public string Name { get; set; }
public string SqlFilter { get; set; }
public string SqlAction { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,51 @@ IReadOnlyCollection<CreateRuleOptions> MergeFilters(string path, string subscrip
throw new ConfigurationMessageBusException($"All rules across the same path/subscription {path}/{subscriptionName} must have unique names (Duplicate: '{name}').");
}

RuleFilter filter;
RuleAction action = null;
switch (rule)
{
case SubscriptionSqlRule sqlRule:
filter = new SqlRuleFilter(sqlRule.SqlFilter);
if (!string.IsNullOrWhiteSpace(sqlRule.SqlAction))
{
action = new SqlRuleAction(sqlRule.SqlAction);
}

break;

case SubscriptionCorrelationRule correlationRule:
var correlationRuleFilter = new CorrelationRuleFilter
{
CorrelationId = correlationRule.CorrelationId,
MessageId = correlationRule.MessageId,
To = correlationRule.To,
ReplyTo = correlationRule.ReplyTo,
Subject = correlationRule.Subject,
SessionId = correlationRule.SessionId,
ReplyToSessionId = correlationRule.ReplyToSessionId,
ContentType = correlationRule.ContentType,
};

if (correlationRule.ApplicationProperties != null)
{
foreach (var (key, value) in correlationRule.ApplicationProperties)
{
correlationRuleFilter.ApplicationProperties.Add(key, value);
}
}

filter = correlationRuleFilter;
break;

default:
throw new NotSupportedException($"Filter of type '{rule.GetType()}' is not supported");
}

var createRuleOptions = new CreateRuleOptions(name)
{
Filter = new SqlRuleFilter(rule.SqlFilter),
Action = !string.IsNullOrWhiteSpace(rule.SqlAction) ? new SqlRuleAction(rule.SqlAction) : null
Filter = filter,
Action = action
};

_providerSettings.TopologyProvisioning?.CreateSubscriptionFilterOptions?.Invoke(createRuleOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,34 @@ public async Task When_FilterConfigurationDiffersWithServer_And_CannotReplaceSub
_mockAdminClient.Verify(x => x.UpdateRuleAsync(_topicName, _subscriptionName, It.IsAny<RuleProperties>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
public async Task When_FilterTypeConfigurationDiffersWithServer_And_CannotReplaceSubscriptionFilters_Then_DoNotUpdateRule()
{
// arrange
const string ruleName = "rule-name";

var applicationProperties = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
{
{ "Sample", "Value"}
};

_defaultConsumerBuilder
.SubscriptionCorrelationFilter(ruleName, applicationProperties: applicationProperties);

ProviderBusSettings.TopologyProvisioning.CanConsumerReplaceSubscriptionFilters = false;

_mockAdminClient.Setup(x => x.TopicExistsAsync(_topicName, It.IsAny<CancellationToken>())).Returns(ResponseTask(true));
_mockAdminClient.Setup(x => x.SubscriptionExistsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<CancellationToken>())).Returns(ResponseTask(true));
_mockAdminClient.Setup(x => x.GetRulesAsync(_topicName, _subscriptionName, It.IsAny<CancellationToken>())).Returns(AsyncPage(ServiceBusModelFactory.RuleProperties(ruleName, new SqlRuleFilter("1 = 1"))));
_mockAdminClient.Setup(x => x.UpdateRuleAsync(_topicName, _subscriptionName, It.IsAny<RuleProperties>(), It.IsAny<CancellationToken>())).Verifiable();

// act
await _target.ProvisionTopology();

// assert
_mockAdminClient.Verify(x => x.UpdateRuleAsync(_topicName, _subscriptionName, It.IsAny<RuleProperties>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
public async Task When_FilterConfigurationDiffersWithServer_And_CanValidateSubscriptionFiltersOnly_Then_DoNotChangeAnyRules()
{
Expand Down Expand Up @@ -384,6 +412,17 @@ public async Task When_FilterConfigurationDiffersWithServer_And_CanValidateSubsc
_mockAdminClient.Verify(x => x.UpdateRuleAsync(_topicName, _subscriptionName, It.IsAny<RuleProperties>(), It.IsAny<CancellationToken>()), Times.Never);
_mockAdminClient.Verify(x => x.DeleteRuleAsync(_topicName, _subscriptionName, It.IsAny<string>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
public void When_SubscriptionCorrelationFilter_ContainsNoConfiguration_ThrowException()
{
// act
var act = () => _defaultConsumerBuilder
.SubscriptionCorrelationFilter("rule-name");

// assert
act.Should().Throw<ArgumentException>();
}
}

private static Task<Response<T>> ResponseTask<T>(T value)
Expand Down

0 comments on commit 2610885

Please sign in to comment.