diff --git a/src/broker/CloudStreams.Broker.Application/Services/SubscriptionHandler.cs b/src/broker/CloudStreams.Broker.Application/Services/SubscriptionHandler.cs index 846b9512..cdbe9a78 100644 --- a/src/broker/CloudStreams.Broker.Application/Services/SubscriptionHandler.cs +++ b/src/broker/CloudStreams.Broker.Application/Services/SubscriptionHandler.cs @@ -273,7 +273,7 @@ protected virtual bool Filters(CloudEventRecord e, IDictionary { if (e == null) throw new ArgumentNullException(nameof(e)); if (attributeFilters == null) throw new ArgumentNullException(nameof(attributeFilters)); - var attributes = e.ToDictionary()!.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString()); + var attributes = e.Metadata.ContextAttributes.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString()); foreach (var attributeFilter in attributeFilters) { if (!attributes.TryGetValue(attributeFilter.Key, out var attributeValue) || string.IsNullOrWhiteSpace(attributeValue)) return false; @@ -374,6 +374,7 @@ protected virtual async Task DispatchAsync(CloudEventRecord e, bool retryOnError { if (e == null) throw new ArgumentNullException(nameof(e)); var cloudEvent = e.ToCloudEvent(this.Broker.Resource.Spec.Dispatch?.Sequencing); + if (!this.Filters(e)) return; cloudEvent = await this.MutateAsync(cloudEvent).ConfigureAwait(false); await this.DispatchAsync(cloudEvent, e.Sequence, retryOnError, catchUpWhenAvailable).ConfigureAwait(false); }