diff --git a/src/broker/CloudStreams.Broker.Api.Server/CloudStreams.Broker.Api.Server.csproj b/src/broker/CloudStreams.Broker.Api.Server/CloudStreams.Broker.Api.Server.csproj index cbbfb447..224d6a48 100644 --- a/src/broker/CloudStreams.Broker.Api.Server/CloudStreams.Broker.Api.Server.csproj +++ b/src/broker/CloudStreams.Broker.Api.Server/CloudStreams.Broker.Api.Server.csproj @@ -33,8 +33,8 @@ - - + + diff --git a/src/broker/CloudStreams.Broker.Api/Configuration/ICloudStreamsApiBuilderExtensions.cs b/src/broker/CloudStreams.Broker.Api/Configuration/ICloudStreamsApiBuilderExtensions.cs index deff0563..778a4e9a 100644 --- a/src/broker/CloudStreams.Broker.Api/Configuration/ICloudStreamsApiBuilderExtensions.cs +++ b/src/broker/CloudStreams.Broker.Api/Configuration/ICloudStreamsApiBuilderExtensions.cs @@ -15,6 +15,7 @@ using CloudStreams.Broker.Application.Services; using CloudStreams.Core.Infrastructure.Configuration; using Hylo.Infrastructure; +using Hylo.Infrastructure.Services; using Microsoft.Extensions.DependencyInjection.Extensions; namespace CloudStreams.Broker.Api.Configuration; @@ -38,8 +39,8 @@ public static ICloudStreamsApplicationBuilder UseBrokerApi(this ICloudStreamsApp builder.WithServiceName(options.Name); builder.Services.Configure(builder.Configuration); - builder.Services.AddResourceController(); builder.Services.TryAddSingleton(); + builder.Services.AddSingleton>(provider => provider.GetRequiredService()); builder.Services.AddHostedService(provider => provider.GetRequiredService()); return builder; diff --git a/src/broker/CloudStreams.Broker.Application/Services/SubscriptionManager.cs b/src/broker/CloudStreams.Broker.Application/Services/SubscriptionManager.cs index 9ed73a40..c494ce31 100644 --- a/src/broker/CloudStreams.Broker.Application/Services/SubscriptionManager.cs +++ b/src/broker/CloudStreams.Broker.Application/Services/SubscriptionManager.cs @@ -13,6 +13,7 @@ using CloudStreams.Broker.Application.Configuration; using Hylo; +using Hylo.Infrastructure.Configuration; using Hylo.Infrastructure.Services; using Microsoft.Extensions.Options; using System.Collections.Concurrent; @@ -25,24 +26,22 @@ namespace CloudStreams.Broker.Application.Services; /// Represents a service used to manage s /// public class SubscriptionManager - : BackgroundService, IAsyncDisposable + : ResourceController { - private bool _Disposed; - /// /// Initializes a new /// /// The current - /// The service used to manage resources - /// The service used to control resources - /// The service used to access the current - public SubscriptionManager(IServiceProvider serviceProvider, IRepository resources, IResourceController subscriptionController, IOptions options) + /// The service used to create s + /// The service used to access the current + /// The service used to manage s + /// The service used to access the current + public SubscriptionManager(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IRepository repository, IOptions brokerOptions) + : base(loggerFactory, controllerOptions, repository) { this.ServiceProvider = serviceProvider; - this.Resources = resources; - this.SubscriptionController = subscriptionController; - this.Options = options.Value; + this.BrokerOptions = brokerOptions.Value; } /// @@ -50,35 +49,20 @@ public SubscriptionManager(IServiceProvider serviceProvider, IRepository resourc /// protected IServiceProvider ServiceProvider { get; } - /// - /// Gets the service used to manage resources - /// - protected IRepository Resources { get; } - - /// - /// Gets the service used to control resources - /// - protected IResourceController SubscriptionController { get; } - /// /// Gets the running 's options /// - protected BrokerOptions Options { get; } + protected BrokerOptions BrokerOptions { get; } /// /// Gets a containing the key/value mappings of handled s /// protected ConcurrentDictionary Subscriptions { get; } = new(); - /// - /// Gets the 's - /// - protected CancellationTokenSource CancellationTokenSource { get; private set; } = null!; - /// /// Gets the service used to monitor the current /// - protected IResourceMonitor? Configuration { get; private set; } + protected IResourceMonitor? Broker { get; private set; } /// /// Gets the 's @@ -86,36 +70,46 @@ public SubscriptionManager(IServiceProvider serviceProvider, IRepository resourc protected CancellationToken CancellationToken => this.CancellationTokenSource.Token; /// - protected override async Task ExecuteAsync(CancellationToken stoppingToken) + public override async Task StartAsync(CancellationToken cancellationToken) { - this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken); + await base.StartAsync(cancellationToken).ConfigureAwait(false); Core.Data.Broker? broker = null; try { - broker = await this.Resources.GetAsync(this.Options.Name, this.Options.Namespace, stoppingToken).ConfigureAwait(false); + broker = await this.Repository.GetAsync(this.BrokerOptions.Name, this.BrokerOptions.Namespace, cancellationToken).ConfigureAwait(false); } catch (HyloException ex) when (ex.Problem.Status == (int)HttpStatusCode.NotFound) { } finally { if (broker == null) { - broker = new Core.Data.Broker(new ResourceMetadata(this.Options.Name, this.Options.Namespace), new BrokerSpec() - { - Dispatch = new() - { + broker = new Core.Data.Broker(new ResourceMetadata(this.BrokerOptions.Name, this.BrokerOptions.Namespace), new BrokerSpec() + { + Dispatch = new() + { Sequencing = CloudEventSequencingConfiguration.Default - } + } }); - broker = await this.Resources.AddAsync(broker, false, stoppingToken).ConfigureAwait(false); + broker = await this.Repository.AddAsync(broker, false, cancellationToken).ConfigureAwait(false); } - this.Configuration = await this.Resources.MonitorAsync(this.Options.Name, this.Options.Namespace, false, this.CancellationToken).ConfigureAwait(false); + this.Broker = await this.Repository.MonitorAsync(this.BrokerOptions.Name, this.BrokerOptions.Namespace, false, cancellationToken).ConfigureAwait(false); } - foreach (var subscription in this.SubscriptionController.Resources.ToList()) + foreach (var subscription in this.Resources.Values.ToList()) { await this.OnSubscriptionCreatedAsync(subscription).ConfigureAwait(false); } - this.SubscriptionController.Where(e => e.Type == ResourceWatchEventType.Created).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionCreatedAsync, stoppingToken); - this.SubscriptionController.Where(e => e.Type == ResourceWatchEventType.Deleted).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionDeletedAsync, stoppingToken); + this.Where(e => e.Type == ResourceWatchEventType.Created).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionCreatedAsync, cancellationToken); + this.Where(e => e.Type == ResourceWatchEventType.Updated).Select(s => s.Resource).DistinctUntilChanged(s => s.Metadata.Labels).SubscribeAsync(this.OnSubscriptionLabelChangedAsync, cancellationToken); + this.Where(e => e.Type == ResourceWatchEventType.Deleted).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionDeletedAsync, cancellationToken); + this.Broker!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnBrokerSelectorChangedAsync, cancellationToken: cancellationToken); + await this.OnBrokerSelectorChangedAsync(this.Broker!.Resource.Spec.Selector).ConfigureAwait(false); + } + + /// + protected override Task ReconcileAsync(CancellationToken cancellationToken = default) + { + this.Options.LabelSelectors = this.Broker?.Resource.Spec.Selector?.Select(s => new LabelSelector(s.Key, LabelSelectionOperator.Equals, s.Value)).ToList(); + return base.ReconcileAsync(cancellationToken); } /// @@ -124,7 +118,44 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) /// The name of the resource to build a new cache key for /// The namespace the resource to build a new cache key for belongs to /// A new cache key - protected virtual string GetResourceCacheKey(string name, string? @namespace) => string.IsNullOrWhiteSpace(@namespace) ? name : $"{@namespace}.{name}"; + protected virtual string GetSubscriptionHandlerCacheKey(string name, string? @namespace) => string.IsNullOrWhiteSpace(@namespace) ? name : $"{@namespace}.{name}"; + + /// + /// Handles changes to the current broker's subscription selector + /// + /// A key/value mapping of the labels the subscriptions to select must define + /// A new awaitable + protected virtual Task OnBrokerSelectorChangedAsync(IDictionary? selector) => this.ReconcileAsync(this.CancellationToken); + + /// + /// Handles changes to the specified subscription's labels + /// + /// The subscription to manage the changes of + /// A new awaitable + protected virtual async Task OnSubscriptionLabelChangedAsync(Subscription subscription) + { + if (this.Broker == null) return; + var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace()); + if (this.Options.LabelSelectors == null || this.Options.LabelSelectors.All(s => s.Selects(subscription)) == true) + { + if (this.Subscriptions.TryGetValue(key, out _)) return; + await this.OnSubscriptionCreatedAsync(subscription).ConfigureAwait(false); + } + else + { + if (!this.Subscriptions.TryGetValue(key, out _)) return; + await this.OnResourceDeletedAsync(subscription).ConfigureAwait(false); + } + } + + /// + protected override async Task OnResourceDeletedAsync(Subscription subscription, CancellationToken cancellationToken = default) + { + await base.OnResourceDeletedAsync(subscription, cancellationToken).ConfigureAwait(false); + var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace()); + if (!this.Subscriptions.TryRemove(key, out var handler) || handler == null) return; + await handler.DisposeAsync().ConfigureAwait(false); + } /// /// Handles the creation of a new @@ -132,8 +163,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) /// The newly created protected virtual async Task OnSubscriptionCreatedAsync(Subscription subscription) { - var key = this.GetResourceCacheKey(subscription.GetName(), subscription.GetNamespace()); - var handler = ActivatorUtilities.CreateInstance(this.ServiceProvider, subscription, this.Configuration!); + var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace()); + var handler = ActivatorUtilities.CreateInstance(this.ServiceProvider, subscription, this.Broker!); await handler.InitializeAsync(this.CancellationToken).ConfigureAwait(false); this.Subscriptions.AddOrUpdate(key, handler, (_, _) => handler); } @@ -144,7 +175,7 @@ protected virtual async Task OnSubscriptionCreatedAsync(Subscription subscriptio /// The newly deleted protected virtual async Task OnSubscriptionDeletedAsync(Subscription subscription) { - var key = this.GetResourceCacheKey(subscription.GetName(), subscription.GetNamespace()); + var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace()); if (this.Subscriptions.Remove(key, out var handler) && handler != null) await handler.DisposeAsync().ConfigureAwait(false); } @@ -152,26 +183,14 @@ protected virtual async Task OnSubscriptionDeletedAsync(Subscription subscriptio /// Disposes of the /// /// A boolean indicating whether or not the is being disposed of - protected virtual async ValueTask DisposeAsync(bool disposing) - { - if (!this._Disposed) - { - if (disposing) - { - this.CancellationTokenSource?.Dispose(); - await this.Subscriptions.ToAsyncEnumerable().ForEachAsync(async s => await s.Value.DisposeAsync().ConfigureAwait(false)).ConfigureAwait(false); - this.Subscriptions.Clear(); - if (this.Configuration != null) await this.Configuration.DisposeAsync().ConfigureAwait(false); - } - this._Disposed = true; - } - } - - /// - public async ValueTask DisposeAsync() + protected override async ValueTask DisposeAsync(bool disposing) { - await this.DisposeAsync(disposing: true).ConfigureAwait(false); - GC.SuppressFinalize(this); + if (!disposing) return; + await base.DisposeAsync(disposing); + this.CancellationTokenSource?.Dispose(); + await this.Subscriptions.ToAsyncEnumerable().ForEachAsync(async s => await s.Value.DisposeAsync().ConfigureAwait(false)).ConfigureAwait(false); + this.Subscriptions.Clear(); + if (this.Broker != null) await this.Broker.DisposeAsync().ConfigureAwait(false); } } diff --git a/src/core/CloudStreams.Core.Api.Client/CloudStreams.Core.Api.Client.csproj b/src/core/CloudStreams.Core.Api.Client/CloudStreams.Core.Api.Client.csproj index 318aa1be..b2a647e6 100644 --- a/src/core/CloudStreams.Core.Api.Client/CloudStreams.Core.Api.Client.csproj +++ b/src/core/CloudStreams.Core.Api.Client/CloudStreams.Core.Api.Client.csproj @@ -34,8 +34,8 @@ - - + + diff --git a/src/core/CloudStreams.Core.Api.Server/CloudStreams.Core.Api.Server.csproj b/src/core/CloudStreams.Core.Api.Server/CloudStreams.Core.Api.Server.csproj index 811039c5..498dd518 100644 --- a/src/core/CloudStreams.Core.Api.Server/CloudStreams.Core.Api.Server.csproj +++ b/src/core/CloudStreams.Core.Api.Server/CloudStreams.Core.Api.Server.csproj @@ -23,8 +23,8 @@ - - + + diff --git a/src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj b/src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj index 0640f97d..edd16f9b 100644 --- a/src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj +++ b/src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj @@ -36,7 +36,7 @@ - + diff --git a/src/core/CloudStreams.Core.Application/CloudStreams.Core.Application.csproj b/src/core/CloudStreams.Core.Application/CloudStreams.Core.Application.csproj index 9714d232..ad0ddd90 100644 --- a/src/core/CloudStreams.Core.Application/CloudStreams.Core.Application.csproj +++ b/src/core/CloudStreams.Core.Application/CloudStreams.Core.Application.csproj @@ -37,8 +37,8 @@ - - + + diff --git a/src/core/CloudStreams.Core.Application/Queries/Streams/ReadEventStreamQuery.cs b/src/core/CloudStreams.Core.Application/Queries/Streams/ReadEventStreamQuery.cs index 7326abdd..cacaa87e 100644 --- a/src/core/CloudStreams.Core.Application/Queries/Streams/ReadEventStreamQuery.cs +++ b/src/core/CloudStreams.Core.Application/Queries/Streams/ReadEventStreamQuery.cs @@ -48,14 +48,14 @@ public class ReadCloudEventStreamQueryHandler : IQueryHandler> { + readonly IEventStoreProvider _eventStoreProvider; + /// public ReadCloudEventStreamQueryHandler(IEventStoreProvider eventStoreProvider) { this._eventStoreProvider = eventStoreProvider; } - IEventStoreProvider _eventStoreProvider; - /// public Task>> Handle(ReadEventStreamQuery query, CancellationToken cancellationToken) { diff --git a/src/core/CloudStreams.Core.Infrastructure.EventSourcing.EventStore/CloudStreams.Core.Infrastructure.EventSourcing.EventStore.csproj b/src/core/CloudStreams.Core.Infrastructure.EventSourcing.EventStore/CloudStreams.Core.Infrastructure.EventSourcing.EventStore.csproj index 82e8f195..da521702 100644 --- a/src/core/CloudStreams.Core.Infrastructure.EventSourcing.EventStore/CloudStreams.Core.Infrastructure.EventSourcing.EventStore.csproj +++ b/src/core/CloudStreams.Core.Infrastructure.EventSourcing.EventStore/CloudStreams.Core.Infrastructure.EventSourcing.EventStore.csproj @@ -54,7 +54,7 @@ - + diff --git a/src/core/CloudStreams.Core.Infrastructure/CloudStreams.Core.Infrastructure.csproj b/src/core/CloudStreams.Core.Infrastructure/CloudStreams.Core.Infrastructure.csproj index 2a3ec8f5..9ba994c3 100644 --- a/src/core/CloudStreams.Core.Infrastructure/CloudStreams.Core.Infrastructure.csproj +++ b/src/core/CloudStreams.Core.Infrastructure/CloudStreams.Core.Infrastructure.csproj @@ -34,8 +34,8 @@ - - + + diff --git a/src/core/CloudStreams.Core/Assets/Definitions/broker.yaml b/src/core/CloudStreams.Core/Assets/Definitions/broker.yaml index 901d6cc2..7221ba0a 100644 --- a/src/core/CloudStreams.Core/Assets/Definitions/broker.yaml +++ b/src/core/CloudStreams.Core/Assets/Definitions/broker.yaml @@ -125,6 +125,9 @@ spec: - attributeConflictResolution - attributeName - fallbackAttributeName + selector: + additionalProperties: + type: string service: type: object properties: diff --git a/src/core/CloudStreams.Core/CloudStreams.Core.csproj b/src/core/CloudStreams.Core/CloudStreams.Core.csproj index 994fff29..46bfa062 100644 --- a/src/core/CloudStreams.Core/CloudStreams.Core.csproj +++ b/src/core/CloudStreams.Core/CloudStreams.Core.csproj @@ -34,7 +34,7 @@ - + diff --git a/src/core/CloudStreams.Core/Data/BrokerSpec.cs b/src/core/CloudStreams.Core/Data/BrokerSpec.cs index b7ab8111..d847b948 100644 --- a/src/core/CloudStreams.Core/Data/BrokerSpec.cs +++ b/src/core/CloudStreams.Core/Data/BrokerSpec.cs @@ -38,12 +38,19 @@ public BrokerSpec(BrokerDispatchConfiguration dispatch) /// Gets/sets an object used to configure the way the broker should dispatch cloud events /// [DataMember(Order = 1, Name = "dispatch"), JsonPropertyOrder(1), JsonPropertyName("dispatch"), YamlMember(Order = 1, Alias = "dispatch")] - public virtual BrokerDispatchConfiguration? Dispatch { get; set; } = null!; + public virtual BrokerDispatchConfiguration? Dispatch { get; set; } + + /// + /// Gets/sets a key/value mapping of the labels to select subscriptions by. + /// If not set, the broker will attempt to pick up all inactive subscriptions + /// + [DataMember(Order = 2, Name = "selector"), JsonPropertyOrder(2), JsonPropertyName("selector"), YamlMember(Order = 2, Alias = "selector")] + public virtual IDictionary? Selector { get; set; } /// /// Gets/sets an object used to configure the broker service, if any /// - [DataMember(Order = 2, Name = "service"), JsonPropertyOrder(2), JsonPropertyName("service"), YamlMember(Order = 2, Alias = "service")] + [DataMember(Order = 3, Name = "service"), JsonPropertyOrder(3), JsonPropertyName("service"), YamlMember(Order = 3, Alias = "service")] public virtual ServiceConfiguration? Service { get; set; } } diff --git a/src/gateway/CloudStreams.Gateway.Api.Server/CloudStreams.Gateway.Api.Server.csproj b/src/gateway/CloudStreams.Gateway.Api.Server/CloudStreams.Gateway.Api.Server.csproj index 372073d6..be939921 100644 --- a/src/gateway/CloudStreams.Gateway.Api.Server/CloudStreams.Gateway.Api.Server.csproj +++ b/src/gateway/CloudStreams.Gateway.Api.Server/CloudStreams.Gateway.Api.Server.csproj @@ -21,8 +21,8 @@ - - + + diff --git a/src/gateway/CloudStreams.Gateway.Api/CloudStreams.Gateway.Api.csproj b/src/gateway/CloudStreams.Gateway.Api/CloudStreams.Gateway.Api.csproj index 7f23677b..1c935912 100644 --- a/src/gateway/CloudStreams.Gateway.Api/CloudStreams.Gateway.Api.csproj +++ b/src/gateway/CloudStreams.Gateway.Api/CloudStreams.Gateway.Api.csproj @@ -40,7 +40,7 @@ - +