diff --git a/docs/provider_azure_servicebus.md b/docs/provider_azure_servicebus.md index 8cab0796..856a4be9 100644 --- a/docs/provider_azure_servicebus.md +++ b/docs/provider_azure_servicebus.md @@ -216,7 +216,7 @@ mbb.Consume(x => x .MaxAutoLockRenewalDuration(TimeSpan.FromMinutes(7)) .SubQueue(SubQueue.DeadLetter) .PrefetchCount(10) - .SubscriptionSqlFilter("1=1") // ASB subscription SQL filters can also be created - see topology creation section + .SubscriptionSqlFilter("1=1") // ASB subscription filters can also be created - see topology creation section .Instances(1)); ``` @@ -408,7 +408,7 @@ mbb.Consume(x => x .Topic("some-topic") .WithConsumer() .SubscriptionName("some-service") - .SubscriptionSqlFilter("1=1") // this will create a rule with SQL filter + .SubscriptionCorrelationFilter(correlationId: "Sample") // this will create a rule to filter messages with a CorrelationId of 'Sample' .CreateTopicOptions((options) => { options.RequiresDuplicateDetection = false; diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbAbstractConsumerSettingsExtensions.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbAbstractConsumerSettingsExtensions.cs index eeaab5e1..5ab28840 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbAbstractConsumerSettingsExtensions.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbAbstractConsumerSettingsExtensions.cs @@ -55,12 +55,12 @@ static internal void SetMaxConcurrentSessions(this AbstractConsumerSettings cons static internal int? GetMaxConcurrentSessions(this AbstractConsumerSettings consumerSettings) => consumerSettings.GetOrDefault(AsbProperties.MaxConcurrentSessionsKey); - static internal IDictionary GetRules(this AbstractConsumerSettings consumerSettings, bool createIfNotExists = false) + static internal IDictionary GetRules(this AbstractConsumerSettings consumerSettings, bool createIfNotExists = false) { - var filterByName = consumerSettings.GetOrDefault>(AsbProperties.RulesKey); + var filterByName = consumerSettings.GetOrDefault>(AsbProperties.RulesKey); if (filterByName == null && createIfNotExists) { - filterByName = new Dictionary(); + filterByName = new Dictionary(); consumerSettings.Properties[AsbProperties.RulesKey] = filterByName; } return filterByName; diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbConsumerBuilderExtensions.cs index dca52ffa..217d3b1c 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbConsumerBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/AsbConsumerBuilderExtensions.cs @@ -160,6 +160,70 @@ public static TConsumerBuilder SubscriptionSqlFilter(this TCon return builder; } + /// + /// Adds a named correlation filter to the subscription (Azure Service Bus). Setting relevant only if topology provisioning enabled. + /// + /// + /// + /// The name of the filter + /// Value to be applied as the 'CorrelationId' filter. + /// Value to be applied as the 'MessageId' filter. + /// Value to be applied as the 'To' filter. + /// Value to be applied as the 'ReplyTo' filter. + /// Value to be applied as the 'Subject' filter. + /// Value to be applied as the 'SessionId' filter. + /// Value to be applied as the 'ReplyToSessionId' filter. + /// Value to be applied as the 'ContentType' filter. + /// Filters to be applied to application specific properties. + /// + /// + public static TConsumerBuilder SubscriptionCorrelationFilter( + this TConsumerBuilder builder, + string ruleName = "default", + string correlationId = "", + string messageId = "", + string to = "", + string replyTo = "", + string subject = "", + string sessionId = "", + string replyToSessionId = "", + string contentType = "", + IDictionary applicationProperties = null) + where TConsumerBuilder : IAbstractConsumerBuilder + { + if (builder is null) throw new ArgumentNullException(nameof(builder)); + + if (string.IsNullOrWhiteSpace(correlationId) + && string.IsNullOrWhiteSpace(messageId) + && string.IsNullOrWhiteSpace(to) + && string.IsNullOrWhiteSpace(replyTo) + && string.IsNullOrWhiteSpace(subject) + && string.IsNullOrWhiteSpace(sessionId) + && string.IsNullOrWhiteSpace(replyToSessionId) + && string.IsNullOrWhiteSpace(contentType) + && (applicationProperties == null || applicationProperties?.Count == 0)) + { + throw new ArgumentException("At least one property must contain a value to use as a filter"); + } + + var filterByName = builder.ConsumerSettings.GetRules(createIfNotExists: true); + filterByName[ruleName] = new SubscriptionCorrelationRule + { + Name = ruleName, + CorrelationId = correlationId, + MessageId = messageId, + To = to, + ReplyTo = replyTo, + Subject = subject, + SessionId = sessionId, + ReplyToSessionId = replyToSessionId, + ContentType = contentType, + ApplicationProperties = applicationProperties + }; + + return builder; + } + /// /// when the ASB queue does not exist and needs to be created /// diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionCorrelationRule.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionCorrelationRule.cs new file mode 100644 index 00000000..6acfb898 --- /dev/null +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionCorrelationRule.cs @@ -0,0 +1,14 @@ +namespace SlimMessageBus.Host.AzureServiceBus; + +public record SubscriptionCorrelationRule : SubscriptionRule +{ + public string CorrelationId { get; set; } + public string MessageId { get; set; } + public string To { get; set; } + public string ReplyTo { get; set; } + public string Subject { get; set; } + public string SessionId { get; set; } + public string ReplyToSessionId { get; set; } + public string ContentType { get; set; } + public IDictionary ApplicationProperties { get; set; } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionRule.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionRule.cs new file mode 100644 index 00000000..1b1d1256 --- /dev/null +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionRule.cs @@ -0,0 +1,6 @@ +namespace SlimMessageBus.Host.AzureServiceBus; + +public abstract record SubscriptionRule +{ + public string Name { get; set; } +} diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionSqlFilter.cs b/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionSqlFilter.cs index e7f3be22..0055a9c7 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionSqlFilter.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Config/SubscriptionSqlFilter.cs @@ -1,8 +1,7 @@ namespace SlimMessageBus.Host.AzureServiceBus; -public record SubscriptionSqlRule +public record SubscriptionSqlRule : SubscriptionRule { - public string Name { get; set; } public string SqlFilter { get; set; } public string SqlAction { get; set; } -} \ No newline at end of file +} diff --git a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs index e9a1568e..8a86b742 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs @@ -331,10 +331,51 @@ IReadOnlyCollection MergeFilters(string path, string subscrip throw new ConfigurationMessageBusException($"All rules across the same path/subscription {path}/{subscriptionName} must have unique names (Duplicate: '{name}')."); } + RuleFilter filter; + RuleAction action = null; + switch (rule) + { + case SubscriptionSqlRule sqlRule: + filter = new SqlRuleFilter(sqlRule.SqlFilter); + if (!string.IsNullOrWhiteSpace(sqlRule.SqlAction)) + { + action = new SqlRuleAction(sqlRule.SqlAction); + } + + break; + + case SubscriptionCorrelationRule correlationRule: + var correlationRuleFilter = new CorrelationRuleFilter + { + CorrelationId = correlationRule.CorrelationId, + MessageId = correlationRule.MessageId, + To = correlationRule.To, + ReplyTo = correlationRule.ReplyTo, + Subject = correlationRule.Subject, + SessionId = correlationRule.SessionId, + ReplyToSessionId = correlationRule.ReplyToSessionId, + ContentType = correlationRule.ContentType, + }; + + if (correlationRule.ApplicationProperties != null) + { + foreach (var (key, value) in correlationRule.ApplicationProperties) + { + correlationRuleFilter.ApplicationProperties.Add(key, value); + } + } + + filter = correlationRuleFilter; + break; + + default: + throw new NotSupportedException($"Filter of type '{rule.GetType()}' is not supported"); + } + var createRuleOptions = new CreateRuleOptions(name) { - Filter = new SqlRuleFilter(rule.SqlFilter), - Action = !string.IsNullOrWhiteSpace(rule.SqlAction) ? new SqlRuleAction(rule.SqlAction) : null + Filter = filter, + Action = action }; _providerSettings.TopologyProvisioning?.CreateSubscriptionFilterOptions?.Invoke(createRuleOptions); diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusTopologyServiceTests.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusTopologyServiceTests.cs index 7c44da35..bbb815c3 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusTopologyServiceTests.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusTopologyServiceTests.cs @@ -344,6 +344,34 @@ public async Task When_FilterConfigurationDiffersWithServer_And_CannotReplaceSub _mockAdminClient.Verify(x => x.UpdateRuleAsync(_topicName, _subscriptionName, It.IsAny(), It.IsAny()), Times.Never); } + [Fact] + public async Task When_FilterTypeConfigurationDiffersWithServer_And_CannotReplaceSubscriptionFilters_Then_DoNotUpdateRule() + { + // arrange + const string ruleName = "rule-name"; + + var applicationProperties = new Dictionary(StringComparer.OrdinalIgnoreCase) + { + { "Sample", "Value"} + }; + + _defaultConsumerBuilder + .SubscriptionCorrelationFilter(ruleName, applicationProperties: applicationProperties); + + ProviderBusSettings.TopologyProvisioning.CanConsumerReplaceSubscriptionFilters = false; + + _mockAdminClient.Setup(x => x.TopicExistsAsync(_topicName, It.IsAny())).Returns(ResponseTask(true)); + _mockAdminClient.Setup(x => x.SubscriptionExistsAsync(It.IsAny(), It.IsAny(), It.IsAny())).Returns(ResponseTask(true)); + _mockAdminClient.Setup(x => x.GetRulesAsync(_topicName, _subscriptionName, It.IsAny())).Returns(AsyncPage(ServiceBusModelFactory.RuleProperties(ruleName, new SqlRuleFilter("1 = 1")))); + _mockAdminClient.Setup(x => x.UpdateRuleAsync(_topicName, _subscriptionName, It.IsAny(), It.IsAny())).Verifiable(); + + // act + await _target.ProvisionTopology(); + + // assert + _mockAdminClient.Verify(x => x.UpdateRuleAsync(_topicName, _subscriptionName, It.IsAny(), It.IsAny()), Times.Never); + } + [Fact] public async Task When_FilterConfigurationDiffersWithServer_And_CanValidateSubscriptionFiltersOnly_Then_DoNotChangeAnyRules() { @@ -384,6 +412,17 @@ public async Task When_FilterConfigurationDiffersWithServer_And_CanValidateSubsc _mockAdminClient.Verify(x => x.UpdateRuleAsync(_topicName, _subscriptionName, It.IsAny(), It.IsAny()), Times.Never); _mockAdminClient.Verify(x => x.DeleteRuleAsync(_topicName, _subscriptionName, It.IsAny(), It.IsAny()), Times.Never); } + + [Fact] + public void When_SubscriptionCorrelationFilter_ContainsNoConfiguration_ThrowException() + { + // act + var act = () => _defaultConsumerBuilder + .SubscriptionCorrelationFilter("rule-name"); + + // assert + act.Should().Throw(); + } } private static Task> ResponseTask(T value)