Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the capability to the Broker to perform label-based filtering of subscriptions #48

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="6.0.5" />
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="6.0.3" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="7.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="7.0.9" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.9" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.18.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using CloudStreams.Broker.Application.Services;
using CloudStreams.Core.Infrastructure.Configuration;
using Hylo.Infrastructure;
using Hylo.Infrastructure.Services;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace CloudStreams.Broker.Api.Configuration;
Expand All @@ -38,8 +39,8 @@ public static ICloudStreamsApplicationBuilder UseBrokerApi(this ICloudStreamsApp

builder.WithServiceName(options.Name);
builder.Services.Configure<BrokerOptions>(builder.Configuration);
builder.Services.AddResourceController<Subscription>();
builder.Services.TryAddSingleton<SubscriptionManager>();
builder.Services.AddSingleton<IResourceController<Subscription>>(provider => provider.GetRequiredService<SubscriptionManager>());
builder.Services.AddHostedService(provider => provider.GetRequiredService<SubscriptionManager>());

return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

using CloudStreams.Broker.Application.Configuration;
using Hylo;
using Hylo.Infrastructure.Configuration;
using Hylo.Infrastructure.Services;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
Expand All @@ -25,97 +26,90 @@ namespace CloudStreams.Broker.Application.Services;
/// Represents a service used to manage <see cref="Subscription"/>s
/// </summary>
public class SubscriptionManager
: BackgroundService, IAsyncDisposable
: ResourceController<Subscription>
{

private bool _Disposed;

/// <summary>
/// Initializes a new <see cref="SubscriptionManager"/>
/// </summary>
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="resources">The service used to manage resources</param>
/// <param name="subscriptionController">The service used to control <see cref="Subscription"/> resources</param>
/// <param name="options">The service used to access the current <see cref="Configuration.BrokerOptions"/></param>
public SubscriptionManager(IServiceProvider serviceProvider, IRepository resources, IResourceController<Subscription> subscriptionController, IOptions<BrokerOptions> options)
/// <param name="loggerFactory">The service used to create <see cref="ILogger"/>s</param>
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
/// <param name="brokerOptions">The service used to access the current <see cref="Configuration.BrokerOptions"/></param>
public SubscriptionManager(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<Subscription>> controllerOptions, IRepository repository, IOptions<BrokerOptions> brokerOptions)
: base(loggerFactory, controllerOptions, repository)
{
this.ServiceProvider = serviceProvider;
this.Resources = resources;
this.SubscriptionController = subscriptionController;
this.Options = options.Value;
this.BrokerOptions = brokerOptions.Value;
}

/// <summary>
/// Gets the current <see cref="IServiceProvider"/>
/// </summary>
protected IServiceProvider ServiceProvider { get; }

/// <summary>
/// Gets the service used to manage resources
/// </summary>
protected IRepository Resources { get; }

/// <summary>
/// Gets the service used to control <see cref="Subscription"/> resources
/// </summary>
protected IResourceController<Subscription> SubscriptionController { get; }

/// <summary>
/// Gets the running <see cref="Core.Data.Broker"/>'s options
/// </summary>
protected BrokerOptions Options { get; }
protected BrokerOptions BrokerOptions { get; }

/// <summary>
/// Gets a <see cref="ConcurrentDictionary{TKey, TValue}"/> containing the key/value mappings of handled <see cref="Subscription"/>s
/// </summary>
protected ConcurrentDictionary<string, SubscriptionHandler> Subscriptions { get; } = new();

/// <summary>
/// Gets the <see cref="SubscriptionManager"/>'s <see cref="System.Threading.CancellationTokenSource"/>
/// </summary>
protected CancellationTokenSource CancellationTokenSource { get; private set; } = null!;

/// <summary>
/// Gets the service used to monitor the current <see cref="Core.Data.Broker"/>
/// </summary>
protected IResourceMonitor<Core.Data.Broker>? Configuration { get; private set; }
protected IResourceMonitor<Core.Data.Broker>? Broker { get; private set; }

/// <summary>
/// Gets the <see cref="SubscriptionManager"/>'s <see cref="System.Threading.CancellationToken"/>
/// </summary>
protected CancellationToken CancellationToken => this.CancellationTokenSource.Token;

/// <inheritdoc/>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
public override async Task StartAsync(CancellationToken cancellationToken)
{
this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
await base.StartAsync(cancellationToken).ConfigureAwait(false);
Core.Data.Broker? broker = null;
try
{
broker = await this.Resources.GetAsync<Core.Data.Broker>(this.Options.Name, this.Options.Namespace, stoppingToken).ConfigureAwait(false);
broker = await this.Repository.GetAsync<Core.Data.Broker>(this.BrokerOptions.Name, this.BrokerOptions.Namespace, cancellationToken).ConfigureAwait(false);
}
catch (HyloException ex) when (ex.Problem.Status == (int)HttpStatusCode.NotFound) { }
finally
{
if (broker == null)
{
broker = new Core.Data.Broker(new ResourceMetadata(this.Options.Name, this.Options.Namespace), new BrokerSpec()
{
Dispatch = new()
{
broker = new Core.Data.Broker(new ResourceMetadata(this.BrokerOptions.Name, this.BrokerOptions.Namespace), new BrokerSpec()
{
Dispatch = new()
{
Sequencing = CloudEventSequencingConfiguration.Default
}
}
});
broker = await this.Resources.AddAsync(broker, false, stoppingToken).ConfigureAwait(false);
broker = await this.Repository.AddAsync(broker, false, cancellationToken).ConfigureAwait(false);
}
this.Configuration = await this.Resources.MonitorAsync<Core.Data.Broker>(this.Options.Name, this.Options.Namespace, false, this.CancellationToken).ConfigureAwait(false);
this.Broker = await this.Repository.MonitorAsync<Core.Data.Broker>(this.BrokerOptions.Name, this.BrokerOptions.Namespace, false, cancellationToken).ConfigureAwait(false);
}
foreach (var subscription in this.SubscriptionController.Resources.ToList())
foreach (var subscription in this.Resources.Values.ToList())
{
await this.OnSubscriptionCreatedAsync(subscription).ConfigureAwait(false);
}
this.SubscriptionController.Where(e => e.Type == ResourceWatchEventType.Created).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionCreatedAsync, stoppingToken);
this.SubscriptionController.Where(e => e.Type == ResourceWatchEventType.Deleted).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionDeletedAsync, stoppingToken);
this.Where(e => e.Type == ResourceWatchEventType.Created).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionCreatedAsync, cancellationToken);
this.Where(e => e.Type == ResourceWatchEventType.Updated).Select(s => s.Resource).DistinctUntilChanged(s => s.Metadata.Labels).SubscribeAsync(this.OnSubscriptionLabelChangedAsync, cancellationToken);
this.Where(e => e.Type == ResourceWatchEventType.Deleted).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionDeletedAsync, cancellationToken);
this.Broker!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnBrokerSelectorChangedAsync, cancellationToken: cancellationToken);
await this.OnBrokerSelectorChangedAsync(this.Broker!.Resource.Spec.Selector).ConfigureAwait(false);
}

/// <inheritdoc/>
protected override Task ReconcileAsync(CancellationToken cancellationToken = default)
{
this.Options.LabelSelectors = this.Broker?.Resource.Spec.Selector?.Select(s => new LabelSelector(s.Key, LabelSelectionOperator.Equals, s.Value)).ToList();
return base.ReconcileAsync(cancellationToken);
}

/// <summary>
Expand All @@ -124,16 +118,53 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// <param name="name">The name of the resource to build a new cache key for</param>
/// <param name="namespace">The namespace the resource to build a new cache key for belongs to</param>
/// <returns>A new cache key</returns>
protected virtual string GetResourceCacheKey(string name, string? @namespace) => string.IsNullOrWhiteSpace(@namespace) ? name : $"{@namespace}.{name}";
protected virtual string GetSubscriptionHandlerCacheKey(string name, string? @namespace) => string.IsNullOrWhiteSpace(@namespace) ? name : $"{@namespace}.{name}";

/// <summary>
/// Handles changes to the current broker's subscription selector
/// </summary>
/// <param name="selector">A key/value mapping of the labels the subscriptions to select must define</param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual Task OnBrokerSelectorChangedAsync(IDictionary<string, string>? selector) => this.ReconcileAsync(this.CancellationToken);

/// <summary>
/// Handles changes to the specified subscription's labels
/// </summary>
/// <param name="subscription">The subscription to manage the changes of</param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task OnSubscriptionLabelChangedAsync(Subscription subscription)
{
if (this.Broker == null) return;
var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
if (this.Options.LabelSelectors == null || this.Options.LabelSelectors.All(s => s.Selects(subscription)) == true)
{
if (this.Subscriptions.TryGetValue(key, out _)) return;
await this.OnSubscriptionCreatedAsync(subscription).ConfigureAwait(false);
}
else
{
if (!this.Subscriptions.TryGetValue(key, out _)) return;
await this.OnResourceDeletedAsync(subscription).ConfigureAwait(false);
}
}

/// <inheritdoc/>
protected override async Task OnResourceDeletedAsync(Subscription subscription, CancellationToken cancellationToken = default)
{
await base.OnResourceDeletedAsync(subscription, cancellationToken).ConfigureAwait(false);
var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
if (!this.Subscriptions.TryRemove(key, out var handler) || handler == null) return;
await handler.DisposeAsync().ConfigureAwait(false);
}

/// <summary>
/// Handles the creation of a new <see cref="Subscription"/>
/// </summary>
/// <param name="subscription">The newly created <see cref="Subscription"/></param>
protected virtual async Task OnSubscriptionCreatedAsync(Subscription subscription)
{
var key = this.GetResourceCacheKey(subscription.GetName(), subscription.GetNamespace());
var handler = ActivatorUtilities.CreateInstance<SubscriptionHandler>(this.ServiceProvider, subscription, this.Configuration!);
var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
var handler = ActivatorUtilities.CreateInstance<SubscriptionHandler>(this.ServiceProvider, subscription, this.Broker!);
await handler.InitializeAsync(this.CancellationToken).ConfigureAwait(false);
this.Subscriptions.AddOrUpdate(key, handler, (_, _) => handler);
}
Expand All @@ -144,34 +175,22 @@ protected virtual async Task OnSubscriptionCreatedAsync(Subscription subscriptio
/// <param name="subscription">The newly deleted <see cref="Subscription"/></param>
protected virtual async Task OnSubscriptionDeletedAsync(Subscription subscription)
{
var key = this.GetResourceCacheKey(subscription.GetName(), subscription.GetNamespace());
var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
if (this.Subscriptions.Remove(key, out var handler) && handler != null) await handler.DisposeAsync().ConfigureAwait(false);
}

/// <summary>
/// Disposes of the <see cref="SubscriptionHandler"/>
/// </summary>
/// <param name="disposing">A boolean indicating whether or not the <see cref="SubscriptionHandler"/> is being disposed of</param>
protected virtual async ValueTask DisposeAsync(bool disposing)
{
if (!this._Disposed)
{
if (disposing)
{
this.CancellationTokenSource?.Dispose();
await this.Subscriptions.ToAsyncEnumerable().ForEachAsync(async s => await s.Value.DisposeAsync().ConfigureAwait(false)).ConfigureAwait(false);
this.Subscriptions.Clear();
if (this.Configuration != null) await this.Configuration.DisposeAsync().ConfigureAwait(false);
}
this._Disposed = true;
}
}

/// <inheritdoc/>
public async ValueTask DisposeAsync()
protected override async ValueTask DisposeAsync(bool disposing)
{
await this.DisposeAsync(disposing: true).ConfigureAwait(false);
GC.SuppressFinalize(this);
if (!disposing) return;
await base.DisposeAsync(disposing);
this.CancellationTokenSource?.Dispose();
await this.Subscriptions.ToAsyncEnumerable().ForEachAsync(async s => await s.Value.DisposeAsync().ConfigureAwait(false)).ConfigureAwait(false);
this.Subscriptions.Clear();
if (this.Broker != null) await this.Broker.DisposeAsync().ConfigureAwait(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hylo.Api.Application" Version="0.7.4" />
<PackageReference Include="Hylo.Core" Version="0.7.4" />
<PackageReference Include="Hylo.Api.Application" Version="0.7.5" />
<PackageReference Include="Hylo.Core" Version="0.7.5" />
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="7.0.9" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="6.0.5" />
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="6.0.3" />
<PackageReference Include="AspNetCore.HealthChecks.UI.Client" Version="7.0.0" />
<PackageReference Include="AspNetCore.HealthChecks.Uris" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Components.WebAssembly.Server" Version="7.0.9" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.9" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.18.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hylo.Api.Http" Version="0.7.4" />
<PackageReference Include="Hylo.Api.Http" Version="0.7.5" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@

<ItemGroup>
<PackageReference Include="FluentValidation.DependencyInjectionExtensions" Version="11.6.0" />
<PackageReference Include="Hylo.Api.Application" Version="0.7.4" />
<PackageReference Include="Hylo.Core" Version="0.7.4" />
<PackageReference Include="Hylo.Api.Application" Version="0.7.5" />
<PackageReference Include="Hylo.Core" Version="0.7.5" />
<PackageReference Include="MediatR" Version="12.1.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.5.1" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol.Logs" Version="1.5.0-rc.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ public class ReadCloudEventStreamQueryHandler
: IQueryHandler<ReadEventStreamQuery, IAsyncEnumerable<object>>
{

readonly IEventStoreProvider _eventStoreProvider;

/// <inheritdoc/>
public ReadCloudEventStreamQueryHandler(IEventStoreProvider eventStoreProvider)
{
this._eventStoreProvider = eventStoreProvider;
}

IEventStoreProvider _eventStoreProvider;

/// <inheritdoc/>
public Task<ApiResponse<IAsyncEnumerable<object>>> Handle(ReadEventStreamQuery query, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<PackageReference Include="EventStore.Client.Grpc.PersistentSubscriptions" Version="23.0.0" />
<PackageReference Include="EventStore.Client.Grpc.ProjectionManagement" Version="23.0.0" />
<PackageReference Include="EventStore.Client.Grpc.Streams" Version="23.0.0" />
<PackageReference Include="Hylo.Core" Version="0.7.4" />
<PackageReference Include="Hylo.Core" Version="0.7.5" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="7.0.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hylo.Infrastructure" Version="0.7.4" />
<PackageReference Include="Hylo.Providers.FileSystem" Version="0.7.4" />
<PackageReference Include="Hylo.Infrastructure" Version="0.7.5" />
<PackageReference Include="Hylo.Providers.FileSystem" Version="0.7.5" />
<PackageReference Include="JsonSchema.Net.Generation" Version="3.3.1" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="7.0.0" />
Expand Down
3 changes: 3 additions & 0 deletions src/core/CloudStreams.Core/Assets/Definitions/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ spec:
- attributeConflictResolution
- attributeName
- fallbackAttributeName
selector:
additionalProperties:
type: string
service:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion src/core/CloudStreams.Core/CloudStreams.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hylo.Core" Version="0.7.4" />
<PackageReference Include="Hylo.Core" Version="0.7.5" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="7.0.0" />
</ItemGroup>

Expand Down
Loading
Loading