Skip to content

Commit

Permalink
Merge pull request #48 from neuroglia-io/feat-broker-filter-subs-by-l…
Browse files Browse the repository at this point in the history
…abel

Added the capability to the Broker to perform label-based filtering of subscriptions
  • Loading branch information
cdavernas authored Jul 31, 2023
2 parents 191a66e + d1691f5 commit 86f52b5
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="6.0.5" />
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="6.0.3" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="7.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="7.0.9" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.9" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.18.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using CloudStreams.Broker.Application.Services;
using CloudStreams.Core.Infrastructure.Configuration;
using Hylo.Infrastructure;
using Hylo.Infrastructure.Services;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace CloudStreams.Broker.Api.Configuration;
Expand All @@ -38,8 +39,8 @@ public static ICloudStreamsApplicationBuilder UseBrokerApi(this ICloudStreamsApp

builder.WithServiceName(options.Name);
builder.Services.Configure<BrokerOptions>(builder.Configuration);
builder.Services.AddResourceController<Subscription>();
builder.Services.TryAddSingleton<SubscriptionManager>();
builder.Services.AddSingleton<IResourceController<Subscription>>(provider => provider.GetRequiredService<SubscriptionManager>());
builder.Services.AddHostedService(provider => provider.GetRequiredService<SubscriptionManager>());

return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

using CloudStreams.Broker.Application.Configuration;
using Hylo;
using Hylo.Infrastructure.Configuration;
using Hylo.Infrastructure.Services;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
Expand All @@ -25,97 +26,90 @@ namespace CloudStreams.Broker.Application.Services;
/// Represents a service used to manage <see cref="Subscription"/>s
/// </summary>
public class SubscriptionManager
: BackgroundService, IAsyncDisposable
: ResourceController<Subscription>
{

private bool _Disposed;

/// <summary>
/// Initializes a new <see cref="SubscriptionManager"/>
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="resources">The service used to manage resources</param>
/// <param name="subscriptionController">The service used to control <see cref="Subscription"/> resources</param>
/// <param name="options">The service used to access the current <see cref="Configuration.BrokerOptions"/></param>
public SubscriptionManager(IServiceProvider serviceProvider, IRepository resources, IResourceController<Subscription> subscriptionController, IOptions<BrokerOptions> options)
/// <param name="loggerFactory">The service used to create <see cref="ILogger"/>s</param>
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
/// <param name="brokerOptions">The service used to access the current <see cref="Configuration.BrokerOptions"/></param>
public SubscriptionManager(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<Subscription>> controllerOptions, IRepository repository, IOptions<BrokerOptions> brokerOptions)
: base(loggerFactory, controllerOptions, repository)
{
this.ServiceProvider = serviceProvider;
this.Resources = resources;
this.SubscriptionController = subscriptionController;
this.Options = options.Value;
this.BrokerOptions = brokerOptions.Value;
}

/// <summary>
/// Gets the current <see cref="IServiceProvider"/>
/// </summary>
protected IServiceProvider ServiceProvider { get; }

/// <summary>
/// Gets the service used to manage resources
/// </summary>
protected IRepository Resources { get; }

/// <summary>
/// Gets the service used to control <see cref="Subscription"/> resources
/// </summary>
protected IResourceController<Subscription> SubscriptionController { get; }

/// <summary>
/// Gets the running <see cref="Core.Data.Broker"/>'s options
/// </summary>
protected BrokerOptions Options { get; }
protected BrokerOptions BrokerOptions { get; }

/// <summary>
/// Gets a <see cref="ConcurrentDictionary{TKey, TValue}"/> containing the key/value mappings of handled <see cref="Subscription"/>s
/// </summary>
protected ConcurrentDictionary<string, SubscriptionHandler> Subscriptions { get; } = new();

/// <summary>
/// Gets the <see cref="SubscriptionManager"/>'s <see cref="System.Threading.CancellationTokenSource"/>
/// </summary>
protected CancellationTokenSource CancellationTokenSource { get; private set; } = null!;

/// <summary>
/// Gets the service used to monitor the current <see cref="Core.Data.Broker"/>
/// </summary>
protected IResourceMonitor<Core.Data.Broker>? Configuration { get; private set; }
protected IResourceMonitor<Core.Data.Broker>? Broker { get; private set; }

/// <summary>
/// Gets the <see cref="SubscriptionManager"/>'s <see cref="System.Threading.CancellationToken"/>
/// </summary>
protected CancellationToken CancellationToken => this.CancellationTokenSource.Token;

/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
public override async Task StartAsync(CancellationToken cancellationToken)
{
this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
await base.StartAsync(cancellationToken).ConfigureAwait(false);
Core.Data.Broker? broker = null;
try
{
broker = await this.Resources.GetAsync<Core.Data.Broker>(this.Options.Name, this.Options.Namespace, stoppingToken).ConfigureAwait(false);
broker = await this.Repository.GetAsync<Core.Data.Broker>(this.BrokerOptions.Name, this.BrokerOptions.Namespace, cancellationToken).ConfigureAwait(false);
}
catch (HyloException ex) when (ex.Problem.Status == (int)HttpStatusCode.NotFound) { }
finally
{
if (broker == null)
{
broker = new Core.Data.Broker(new ResourceMetadata(this.Options.Name, this.Options.Namespace), new BrokerSpec()
{
Dispatch = new()
{
broker = new Core.Data.Broker(new ResourceMetadata(this.BrokerOptions.Name, this.BrokerOptions.Namespace), new BrokerSpec()
{
Dispatch = new()
{
Sequencing = CloudEventSequencingConfiguration.Default
}
}
});
broker = await this.Resources.AddAsync(broker, false, stoppingToken).ConfigureAwait(false);
broker = await this.Repository.AddAsync(broker, false, cancellationToken).ConfigureAwait(false);
}
this.Configuration = await this.Resources.MonitorAsync<Core.Data.Broker>(this.Options.Name, this.Options.Namespace, false, this.CancellationToken).ConfigureAwait(false);
this.Broker = await this.Repository.MonitorAsync<Core.Data.Broker>(this.BrokerOptions.Name, this.BrokerOptions.Namespace, false, cancellationToken).ConfigureAwait(false);
}
foreach (var subscription in this.SubscriptionController.Resources.ToList())
foreach (var subscription in this.Resources.Values.ToList())
{
await this.OnSubscriptionCreatedAsync(subscription).ConfigureAwait(false);
}
this.SubscriptionController.Where(e => e.Type == ResourceWatchEventType.Created).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionCreatedAsync, stoppingToken);
this.SubscriptionController.Where(e => e.Type == ResourceWatchEventType.Deleted).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionDeletedAsync, stoppingToken);
this.Where(e => e.Type == ResourceWatchEventType.Created).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionCreatedAsync, cancellationToken);
this.Where(e => e.Type == ResourceWatchEventType.Updated).Select(s => s.Resource).DistinctUntilChanged(s => s.Metadata.Labels).SubscribeAsync(this.OnSubscriptionLabelChangedAsync, cancellationToken);
this.Where(e => e.Type == ResourceWatchEventType.Deleted).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionDeletedAsync, cancellationToken);
this.Broker!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnBrokerSelectorChangedAsync, cancellationToken: cancellationToken);
await this.OnBrokerSelectorChangedAsync(this.Broker!.Resource.Spec.Selector).ConfigureAwait(false);
}

/// <inheritdoc/>
protected override Task ReconcileAsync(CancellationToken cancellationToken = default)
{
this.Options.LabelSelectors = this.Broker?.Resource.Spec.Selector?.Select(s => new LabelSelector(s.Key, LabelSelectionOperator.Equals, s.Value)).ToList();
return base.ReconcileAsync(cancellationToken);
}

/// <summary>
Expand All @@ -124,16 +118,53 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// <param name="name">The name of the resource to build a new cache key for</param>
/// <param name="namespace">The namespace the resource to build a new cache key for belongs to</param>
/// <returns>A new cache key</returns>
protected virtual string GetResourceCacheKey(string name, string? @namespace) => string.IsNullOrWhiteSpace(@namespace) ? name : $"{@namespace}.{name}";
protected virtual string GetSubscriptionHandlerCacheKey(string name, string? @namespace) => string.IsNullOrWhiteSpace(@namespace) ? name : $"{@namespace}.{name}";

/// <summary>
/// Handles changes to the current broker's subscription selector
/// </summary>
/// <param name="selector">A key/value mapping of the labels the subscriptions to select must define</param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual Task OnBrokerSelectorChangedAsync(IDictionary<string, string>? selector) => this.ReconcileAsync(this.CancellationToken);

/// <summary>
/// Handles changes to the specified subscription's labels
/// </summary>
/// <param name="subscription">The subscription to manage the changes of</param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task OnSubscriptionLabelChangedAsync(Subscription subscription)
{
if (this.Broker == null) return;
var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
if (this.Options.LabelSelectors == null || this.Options.LabelSelectors.All(s => s.Selects(subscription)) == true)
{
if (this.Subscriptions.TryGetValue(key, out _)) return;
await this.OnSubscriptionCreatedAsync(subscription).ConfigureAwait(false);
}
else
{
if (!this.Subscriptions.TryGetValue(key, out _)) return;
await this.OnResourceDeletedAsync(subscription).ConfigureAwait(false);
}
}

/// <inheritdoc/>
protected override async Task OnResourceDeletedAsync(Subscription subscription, CancellationToken cancellationToken = default)
{
await base.OnResourceDeletedAsync(subscription, cancellationToken).ConfigureAwait(false);
var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
if (!this.Subscriptions.TryRemove(key, out var handler) || handler == null) return;
await handler.DisposeAsync().ConfigureAwait(false);
}

/// <summary>
/// Handles the creation of a new <see cref="Subscription"/>
/// </summary>
/// <param name="subscription">The newly created <see cref="Subscription"/></param>
protected virtual async Task OnSubscriptionCreatedAsync(Subscription subscription)
{
var key = this.GetResourceCacheKey(subscription.GetName(), subscription.GetNamespace());
var handler = ActivatorUtilities.CreateInstance<SubscriptionHandler>(this.ServiceProvider, subscription, this.Configuration!);
var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
var handler = ActivatorUtilities.CreateInstance<SubscriptionHandler>(this.ServiceProvider, subscription, this.Broker!);
await handler.InitializeAsync(this.CancellationToken).ConfigureAwait(false);
this.Subscriptions.AddOrUpdate(key, handler, (_, _) => handler);
}
Expand All @@ -144,34 +175,22 @@ protected virtual async Task OnSubscriptionCreatedAsync(Subscription subscriptio
/// <param name="subscription">The newly deleted <see cref="Subscription"/></param>
protected virtual async Task OnSubscriptionDeletedAsync(Subscription subscription)
{
var key = this.GetResourceCacheKey(subscription.GetName(), subscription.GetNamespace());
var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
if (this.Subscriptions.Remove(key, out var handler) && handler != null) await handler.DisposeAsync().ConfigureAwait(false);
}

/// <summary>
/// Disposes of the <see cref="SubscriptionHandler"/>
/// </summary>
/// <param name="disposing">A boolean indicating whether or not the <see cref="SubscriptionHandler"/> is being disposed of</param>
protected virtual async ValueTask DisposeAsync(bool disposing)
{
if (!this._Disposed)
{
if (disposing)
{
this.CancellationTokenSource?.Dispose();
await this.Subscriptions.ToAsyncEnumerable().ForEachAsync(async s => await s.Value.DisposeAsync().ConfigureAwait(false)).ConfigureAwait(false);
this.Subscriptions.Clear();
if (this.Configuration != null) await this.Configuration.DisposeAsync().ConfigureAwait(false);
}
this._Disposed = true;
}
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
protected override async ValueTask DisposeAsync(bool disposing)
{
await this.DisposeAsync(disposing: true).ConfigureAwait(false);
GC.SuppressFinalize(this);
if (!disposing) return;
await base.DisposeAsync(disposing);
this.CancellationTokenSource?.Dispose();
await this.Subscriptions.ToAsyncEnumerable().ForEachAsync(async s => await s.Value.DisposeAsync().ConfigureAwait(false)).ConfigureAwait(false);
this.Subscriptions.Clear();
if (this.Broker != null) await this.Broker.DisposeAsync().ConfigureAwait(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hylo.Api.Application" Version="0.7.4" />
<PackageReference Include="Hylo.Core" Version="0.7.4" />
<PackageReference Include="Hylo.Api.Application" Version="0.7.5" />
<PackageReference Include="Hylo.Core" Version="0.7.5" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="7.0.9" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="6.0.5" />
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="6.0.3" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="7.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="7.0.9" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.9" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.18.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hylo.Api.Http" Version="0.7.4" />
<PackageReference Include="Hylo.Api.Http" Version="0.7.5" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@

<ItemGroup>
<PackageReference Include="FluentValidation.DependencyInjectionExtensions" Version="11.6.0" />
<PackageReference Include="Hylo.Api.Application" Version="0.7.4" />
<PackageReference Include="Hylo.Core" Version="0.7.4" />
<PackageReference Include="Hylo.Api.Application" Version="0.7.5" />
<PackageReference Include="Hylo.Core" Version="0.7.5" />
<PackageReference Include="MediatR" Version="12.1.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.5.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol.Logs" Version="1.5.0-rc.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ public class ReadCloudEventStreamQueryHandler
: IQueryHandler<ReadEventStreamQuery, IAsyncEnumerable<object>>
{

readonly IEventStoreProvider _eventStoreProvider;

/// <inheritdoc/>
public ReadCloudEventStreamQueryHandler(IEventStoreProvider eventStoreProvider)
{
this._eventStoreProvider = eventStoreProvider;
}

IEventStoreProvider _eventStoreProvider;

/// <inheritdoc/>
public Task<ApiResponse<IAsyncEnumerable<object>>> Handle(ReadEventStreamQuery query, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<PackageReference Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="23.0.0" />
<PackageReference Include="EventStore.Client.Grpc.ProjectionManagement" Version="23.0.0" />
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="23.0.0" />
<PackageReference Include="Hylo.Core" Version="0.7.4" />
<PackageReference Include="Hylo.Core" Version="0.7.5" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.0.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hylo.Infrastructure" Version="0.7.4" />
<PackageReference Include="Hylo.Providers.FileSystem" Version="0.7.4" />
<PackageReference Include="Hylo.Infrastructure" Version="0.7.5" />
<PackageReference Include="Hylo.Providers.FileSystem" Version="0.7.5" />
<PackageReference Include="JsonSchema.Net.Generation" Version="3.3.1" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="7.0.0" />
Expand Down
3 changes: 3 additions & 0 deletions src/core/CloudStreams.Core/Assets/Definitions/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ spec:
- attributeConflictResolution
- attributeName
- fallbackAttributeName
selector:
additionalProperties:
type: string
service:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion src/core/CloudStreams.Core/CloudStreams.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hylo.Core" Version="0.7.4" />
<PackageReference Include="Hylo.Core" Version="0.7.5" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="7.0.0" />
</ItemGroup>

Expand Down
Loading

0 comments on commit 86f52b5

Please sign in to comment.