Skip to content

Commit

Permalink
Merge pull request #49 from neuroglia-io/fix-subscription-filter
Browse files Browse the repository at this point in the history
Fixed the SubscriptionHandler
  • Loading branch information
cdavernas authored Jul 31, 2023
2 parents 618daf6 + 14789b8 commit acf5db2
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ protected virtual bool Filters(CloudEventRecord e, IDictionary<string, string?>
{
if (e == null) throw new ArgumentNullException(nameof(e));
if (attributeFilters == null) throw new ArgumentNullException(nameof(attributeFilters));
var attributes = e.ToDictionary()!.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString());
var attributes = e.Metadata.ContextAttributes.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString());
foreach (var attributeFilter in attributeFilters)
{
if (!attributes.TryGetValue(attributeFilter.Key, out var attributeValue) || string.IsNullOrWhiteSpace(attributeValue)) return false;
Expand Down Expand Up @@ -374,6 +374,7 @@ protected virtual async Task DispatchAsync(CloudEventRecord e, bool retryOnError
{
if (e == null) throw new ArgumentNullException(nameof(e));
var cloudEvent = e.ToCloudEvent(this.Broker.Resource.Spec.Dispatch?.Sequencing);
if (!this.Filters(e)) return;
cloudEvent = await this.MutateAsync(cloudEvent).ConfigureAwait(false);
await this.DispatchAsync(cloudEvent, e.Sequence, retryOnError, catchUpWhenAvailable).ConfigureAwait(false);
}
Expand Down

0 comments on commit acf5db2

Please sign in to comment.