diff --git a/src/broker/CloudStreams.Broker.Api.Server/CloudStreams.Broker.Api.Server.csproj b/src/broker/CloudStreams.Broker.Api.Server/CloudStreams.Broker.Api.Server.csproj
index cbbfb447..224d6a48 100644
--- a/src/broker/CloudStreams.Broker.Api.Server/CloudStreams.Broker.Api.Server.csproj
+++ b/src/broker/CloudStreams.Broker.Api.Server/CloudStreams.Broker.Api.Server.csproj
@@ -33,8 +33,8 @@
-
-
+
+
diff --git a/src/broker/CloudStreams.Broker.Api/Configuration/ICloudStreamsApiBuilderExtensions.cs b/src/broker/CloudStreams.Broker.Api/Configuration/ICloudStreamsApiBuilderExtensions.cs
index deff0563..778a4e9a 100644
--- a/src/broker/CloudStreams.Broker.Api/Configuration/ICloudStreamsApiBuilderExtensions.cs
+++ b/src/broker/CloudStreams.Broker.Api/Configuration/ICloudStreamsApiBuilderExtensions.cs
@@ -15,6 +15,7 @@
using CloudStreams.Broker.Application.Services;
using CloudStreams.Core.Infrastructure.Configuration;
using Hylo.Infrastructure;
+using Hylo.Infrastructure.Services;
using Microsoft.Extensions.DependencyInjection.Extensions;
namespace CloudStreams.Broker.Api.Configuration;
@@ -38,8 +39,8 @@ public static ICloudStreamsApplicationBuilder UseBrokerApi(this ICloudStreamsApp
builder.WithServiceName(options.Name);
builder.Services.Configure(builder.Configuration);
- builder.Services.AddResourceController();
builder.Services.TryAddSingleton();
+ builder.Services.AddSingleton>(provider => provider.GetRequiredService());
builder.Services.AddHostedService(provider => provider.GetRequiredService());
return builder;
diff --git a/src/broker/CloudStreams.Broker.Application/Services/SubscriptionManager.cs b/src/broker/CloudStreams.Broker.Application/Services/SubscriptionManager.cs
index 9ed73a40..c494ce31 100644
--- a/src/broker/CloudStreams.Broker.Application/Services/SubscriptionManager.cs
+++ b/src/broker/CloudStreams.Broker.Application/Services/SubscriptionManager.cs
@@ -13,6 +13,7 @@
using CloudStreams.Broker.Application.Configuration;
using Hylo;
+using Hylo.Infrastructure.Configuration;
using Hylo.Infrastructure.Services;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
@@ -25,24 +26,22 @@ namespace CloudStreams.Broker.Application.Services;
/// Represents a service used to manage s
///
public class SubscriptionManager
- : BackgroundService, IAsyncDisposable
+ : ResourceController
{
- private bool _Disposed;
-
///
/// Initializes a new
///
/// The current
- /// The service used to manage resources
- /// The service used to control resources
- /// The service used to access the current
- public SubscriptionManager(IServiceProvider serviceProvider, IRepository resources, IResourceController subscriptionController, IOptions options)
+ /// The service used to create s
+ /// The service used to access the current
+ /// The service used to manage s
+ /// The service used to access the current
+ public SubscriptionManager(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IRepository repository, IOptions brokerOptions)
+ : base(loggerFactory, controllerOptions, repository)
{
this.ServiceProvider = serviceProvider;
- this.Resources = resources;
- this.SubscriptionController = subscriptionController;
- this.Options = options.Value;
+ this.BrokerOptions = brokerOptions.Value;
}
///
@@ -50,35 +49,20 @@ public SubscriptionManager(IServiceProvider serviceProvider, IRepository resourc
///
protected IServiceProvider ServiceProvider { get; }
- ///
- /// Gets the service used to manage resources
- ///
- protected IRepository Resources { get; }
-
- ///
- /// Gets the service used to control resources
- ///
- protected IResourceController SubscriptionController { get; }
-
///
/// Gets the running 's options
///
- protected BrokerOptions Options { get; }
+ protected BrokerOptions BrokerOptions { get; }
///
/// Gets a containing the key/value mappings of handled s
///
protected ConcurrentDictionary Subscriptions { get; } = new();
- ///
- /// Gets the 's
- ///
- protected CancellationTokenSource CancellationTokenSource { get; private set; } = null!;
-
///
/// Gets the service used to monitor the current
///
- protected IResourceMonitor? Configuration { get; private set; }
+ protected IResourceMonitor? Broker { get; private set; }
///
/// Gets the 's
@@ -86,36 +70,46 @@ public SubscriptionManager(IServiceProvider serviceProvider, IRepository resourc
protected CancellationToken CancellationToken => this.CancellationTokenSource.Token;
///
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ public override async Task StartAsync(CancellationToken cancellationToken)
{
- this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
+ await base.StartAsync(cancellationToken).ConfigureAwait(false);
Core.Data.Broker? broker = null;
try
{
- broker = await this.Resources.GetAsync(this.Options.Name, this.Options.Namespace, stoppingToken).ConfigureAwait(false);
+ broker = await this.Repository.GetAsync(this.BrokerOptions.Name, this.BrokerOptions.Namespace, cancellationToken).ConfigureAwait(false);
}
catch (HyloException ex) when (ex.Problem.Status == (int)HttpStatusCode.NotFound) { }
finally
{
if (broker == null)
{
- broker = new Core.Data.Broker(new ResourceMetadata(this.Options.Name, this.Options.Namespace), new BrokerSpec()
- {
- Dispatch = new()
- {
+ broker = new Core.Data.Broker(new ResourceMetadata(this.BrokerOptions.Name, this.BrokerOptions.Namespace), new BrokerSpec()
+ {
+ Dispatch = new()
+ {
Sequencing = CloudEventSequencingConfiguration.Default
- }
+ }
});
- broker = await this.Resources.AddAsync(broker, false, stoppingToken).ConfigureAwait(false);
+ broker = await this.Repository.AddAsync(broker, false, cancellationToken).ConfigureAwait(false);
}
- this.Configuration = await this.Resources.MonitorAsync(this.Options.Name, this.Options.Namespace, false, this.CancellationToken).ConfigureAwait(false);
+ this.Broker = await this.Repository.MonitorAsync(this.BrokerOptions.Name, this.BrokerOptions.Namespace, false, cancellationToken).ConfigureAwait(false);
}
- foreach (var subscription in this.SubscriptionController.Resources.ToList())
+ foreach (var subscription in this.Resources.Values.ToList())
{
await this.OnSubscriptionCreatedAsync(subscription).ConfigureAwait(false);
}
- this.SubscriptionController.Where(e => e.Type == ResourceWatchEventType.Created).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionCreatedAsync, stoppingToken);
- this.SubscriptionController.Where(e => e.Type == ResourceWatchEventType.Deleted).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionDeletedAsync, stoppingToken);
+ this.Where(e => e.Type == ResourceWatchEventType.Created).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionCreatedAsync, cancellationToken);
+ this.Where(e => e.Type == ResourceWatchEventType.Updated).Select(s => s.Resource).DistinctUntilChanged(s => s.Metadata.Labels).SubscribeAsync(this.OnSubscriptionLabelChangedAsync, cancellationToken);
+ this.Where(e => e.Type == ResourceWatchEventType.Deleted).Select(e => e.Resource).SubscribeAsync(this.OnSubscriptionDeletedAsync, cancellationToken);
+ this.Broker!.Select(b => b.Resource.Spec.Selector).SubscribeAsync(this.OnBrokerSelectorChangedAsync, cancellationToken: cancellationToken);
+ await this.OnBrokerSelectorChangedAsync(this.Broker!.Resource.Spec.Selector).ConfigureAwait(false);
+ }
+
+ ///
+ protected override Task ReconcileAsync(CancellationToken cancellationToken = default)
+ {
+ this.Options.LabelSelectors = this.Broker?.Resource.Spec.Selector?.Select(s => new LabelSelector(s.Key, LabelSelectionOperator.Equals, s.Value)).ToList();
+ return base.ReconcileAsync(cancellationToken);
}
///
@@ -124,7 +118,44 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// The name of the resource to build a new cache key for
/// The namespace the resource to build a new cache key for belongs to
/// A new cache key
- protected virtual string GetResourceCacheKey(string name, string? @namespace) => string.IsNullOrWhiteSpace(@namespace) ? name : $"{@namespace}.{name}";
+ protected virtual string GetSubscriptionHandlerCacheKey(string name, string? @namespace) => string.IsNullOrWhiteSpace(@namespace) ? name : $"{@namespace}.{name}";
+
+ ///
+ /// Handles changes to the current broker's subscription selector
+ ///
+ /// A key/value mapping of the labels the subscriptions to select must define
+ /// A new awaitable
+ protected virtual Task OnBrokerSelectorChangedAsync(IDictionary? selector) => this.ReconcileAsync(this.CancellationToken);
+
+ ///
+ /// Handles changes to the specified subscription's labels
+ ///
+ /// The subscription to manage the changes of
+ /// A new awaitable
+ protected virtual async Task OnSubscriptionLabelChangedAsync(Subscription subscription)
+ {
+ if (this.Broker == null) return;
+ var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
+ if (this.Options.LabelSelectors == null || this.Options.LabelSelectors.All(s => s.Selects(subscription)) == true)
+ {
+ if (this.Subscriptions.TryGetValue(key, out _)) return;
+ await this.OnSubscriptionCreatedAsync(subscription).ConfigureAwait(false);
+ }
+ else
+ {
+ if (!this.Subscriptions.TryGetValue(key, out _)) return;
+ await this.OnResourceDeletedAsync(subscription).ConfigureAwait(false);
+ }
+ }
+
+ ///
+ protected override async Task OnResourceDeletedAsync(Subscription subscription, CancellationToken cancellationToken = default)
+ {
+ await base.OnResourceDeletedAsync(subscription, cancellationToken).ConfigureAwait(false);
+ var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
+ if (!this.Subscriptions.TryRemove(key, out var handler) || handler == null) return;
+ await handler.DisposeAsync().ConfigureAwait(false);
+ }
///
/// Handles the creation of a new
@@ -132,8 +163,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
/// The newly created
protected virtual async Task OnSubscriptionCreatedAsync(Subscription subscription)
{
- var key = this.GetResourceCacheKey(subscription.GetName(), subscription.GetNamespace());
- var handler = ActivatorUtilities.CreateInstance(this.ServiceProvider, subscription, this.Configuration!);
+ var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
+ var handler = ActivatorUtilities.CreateInstance(this.ServiceProvider, subscription, this.Broker!);
await handler.InitializeAsync(this.CancellationToken).ConfigureAwait(false);
this.Subscriptions.AddOrUpdate(key, handler, (_, _) => handler);
}
@@ -144,7 +175,7 @@ protected virtual async Task OnSubscriptionCreatedAsync(Subscription subscriptio
/// The newly deleted
protected virtual async Task OnSubscriptionDeletedAsync(Subscription subscription)
{
- var key = this.GetResourceCacheKey(subscription.GetName(), subscription.GetNamespace());
+ var key = this.GetSubscriptionHandlerCacheKey(subscription.GetName(), subscription.GetNamespace());
if (this.Subscriptions.Remove(key, out var handler) && handler != null) await handler.DisposeAsync().ConfigureAwait(false);
}
@@ -152,26 +183,14 @@ protected virtual async Task OnSubscriptionDeletedAsync(Subscription subscriptio
/// Disposes of the
///
/// A boolean indicating whether or not the is being disposed of
- protected virtual async ValueTask DisposeAsync(bool disposing)
- {
- if (!this._Disposed)
- {
- if (disposing)
- {
- this.CancellationTokenSource?.Dispose();
- await this.Subscriptions.ToAsyncEnumerable().ForEachAsync(async s => await s.Value.DisposeAsync().ConfigureAwait(false)).ConfigureAwait(false);
- this.Subscriptions.Clear();
- if (this.Configuration != null) await this.Configuration.DisposeAsync().ConfigureAwait(false);
- }
- this._Disposed = true;
- }
- }
-
- ///
- public async ValueTask DisposeAsync()
+ protected override async ValueTask DisposeAsync(bool disposing)
{
- await this.DisposeAsync(disposing: true).ConfigureAwait(false);
- GC.SuppressFinalize(this);
+ if (!disposing) return;
+ await base.DisposeAsync(disposing);
+ this.CancellationTokenSource?.Dispose();
+ await this.Subscriptions.ToAsyncEnumerable().ForEachAsync(async s => await s.Value.DisposeAsync().ConfigureAwait(false)).ConfigureAwait(false);
+ this.Subscriptions.Clear();
+ if (this.Broker != null) await this.Broker.DisposeAsync().ConfigureAwait(false);
}
}
diff --git a/src/core/CloudStreams.Core.Api.Client/CloudStreams.Core.Api.Client.csproj b/src/core/CloudStreams.Core.Api.Client/CloudStreams.Core.Api.Client.csproj
index 318aa1be..b2a647e6 100644
--- a/src/core/CloudStreams.Core.Api.Client/CloudStreams.Core.Api.Client.csproj
+++ b/src/core/CloudStreams.Core.Api.Client/CloudStreams.Core.Api.Client.csproj
@@ -34,8 +34,8 @@
-
-
+
+
diff --git a/src/core/CloudStreams.Core.Api.Server/CloudStreams.Core.Api.Server.csproj b/src/core/CloudStreams.Core.Api.Server/CloudStreams.Core.Api.Server.csproj
index 811039c5..498dd518 100644
--- a/src/core/CloudStreams.Core.Api.Server/CloudStreams.Core.Api.Server.csproj
+++ b/src/core/CloudStreams.Core.Api.Server/CloudStreams.Core.Api.Server.csproj
@@ -23,8 +23,8 @@
-
-
+
+
diff --git a/src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj b/src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj
index 0640f97d..edd16f9b 100644
--- a/src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj
+++ b/src/core/CloudStreams.Core.Api/CloudStreams.Core.Api.csproj
@@ -36,7 +36,7 @@
-
+
diff --git a/src/core/CloudStreams.Core.Application/CloudStreams.Core.Application.csproj b/src/core/CloudStreams.Core.Application/CloudStreams.Core.Application.csproj
index 9714d232..ad0ddd90 100644
--- a/src/core/CloudStreams.Core.Application/CloudStreams.Core.Application.csproj
+++ b/src/core/CloudStreams.Core.Application/CloudStreams.Core.Application.csproj
@@ -37,8 +37,8 @@
-
-
+
+
diff --git a/src/core/CloudStreams.Core.Application/Queries/Streams/ReadEventStreamQuery.cs b/src/core/CloudStreams.Core.Application/Queries/Streams/ReadEventStreamQuery.cs
index 7326abdd..cacaa87e 100644
--- a/src/core/CloudStreams.Core.Application/Queries/Streams/ReadEventStreamQuery.cs
+++ b/src/core/CloudStreams.Core.Application/Queries/Streams/ReadEventStreamQuery.cs
@@ -48,14 +48,14 @@ public class ReadCloudEventStreamQueryHandler
: IQueryHandler>
{
+ readonly IEventStoreProvider _eventStoreProvider;
+
///
public ReadCloudEventStreamQueryHandler(IEventStoreProvider eventStoreProvider)
{
this._eventStoreProvider = eventStoreProvider;
}
- IEventStoreProvider _eventStoreProvider;
-
///
public Task>> Handle(ReadEventStreamQuery query, CancellationToken cancellationToken)
{
diff --git a/src/core/CloudStreams.Core.Infrastructure.EventSourcing.EventStore/CloudStreams.Core.Infrastructure.EventSourcing.EventStore.csproj b/src/core/CloudStreams.Core.Infrastructure.EventSourcing.EventStore/CloudStreams.Core.Infrastructure.EventSourcing.EventStore.csproj
index 82e8f195..da521702 100644
--- a/src/core/CloudStreams.Core.Infrastructure.EventSourcing.EventStore/CloudStreams.Core.Infrastructure.EventSourcing.EventStore.csproj
+++ b/src/core/CloudStreams.Core.Infrastructure.EventSourcing.EventStore/CloudStreams.Core.Infrastructure.EventSourcing.EventStore.csproj
@@ -54,7 +54,7 @@
-
+
diff --git a/src/core/CloudStreams.Core.Infrastructure/CloudStreams.Core.Infrastructure.csproj b/src/core/CloudStreams.Core.Infrastructure/CloudStreams.Core.Infrastructure.csproj
index 2a3ec8f5..9ba994c3 100644
--- a/src/core/CloudStreams.Core.Infrastructure/CloudStreams.Core.Infrastructure.csproj
+++ b/src/core/CloudStreams.Core.Infrastructure/CloudStreams.Core.Infrastructure.csproj
@@ -34,8 +34,8 @@
-
-
+
+
diff --git a/src/core/CloudStreams.Core/Assets/Definitions/broker.yaml b/src/core/CloudStreams.Core/Assets/Definitions/broker.yaml
index 901d6cc2..7221ba0a 100644
--- a/src/core/CloudStreams.Core/Assets/Definitions/broker.yaml
+++ b/src/core/CloudStreams.Core/Assets/Definitions/broker.yaml
@@ -125,6 +125,9 @@ spec:
- attributeConflictResolution
- attributeName
- fallbackAttributeName
+ selector:
+ additionalProperties:
+ type: string
service:
type: object
properties:
diff --git a/src/core/CloudStreams.Core/CloudStreams.Core.csproj b/src/core/CloudStreams.Core/CloudStreams.Core.csproj
index 994fff29..46bfa062 100644
--- a/src/core/CloudStreams.Core/CloudStreams.Core.csproj
+++ b/src/core/CloudStreams.Core/CloudStreams.Core.csproj
@@ -34,7 +34,7 @@
-
+
diff --git a/src/core/CloudStreams.Core/Data/BrokerSpec.cs b/src/core/CloudStreams.Core/Data/BrokerSpec.cs
index b7ab8111..d847b948 100644
--- a/src/core/CloudStreams.Core/Data/BrokerSpec.cs
+++ b/src/core/CloudStreams.Core/Data/BrokerSpec.cs
@@ -38,12 +38,19 @@ public BrokerSpec(BrokerDispatchConfiguration dispatch)
/// Gets/sets an object used to configure the way the broker should dispatch cloud events
///
[DataMember(Order = 1, Name = "dispatch"), JsonPropertyOrder(1), JsonPropertyName("dispatch"), YamlMember(Order = 1, Alias = "dispatch")]
- public virtual BrokerDispatchConfiguration? Dispatch { get; set; } = null!;
+ public virtual BrokerDispatchConfiguration? Dispatch { get; set; }
+
+ ///
+ /// Gets/sets a key/value mapping of the labels to select subscriptions by.
+ /// If not set, the broker will attempt to pick up all inactive subscriptions
+ ///
+ [DataMember(Order = 2, Name = "selector"), JsonPropertyOrder(2), JsonPropertyName("selector"), YamlMember(Order = 2, Alias = "selector")]
+ public virtual IDictionary? Selector { get; set; }
///
/// Gets/sets an object used to configure the broker service, if any
///
- [DataMember(Order = 2, Name = "service"), JsonPropertyOrder(2), JsonPropertyName("service"), YamlMember(Order = 2, Alias = "service")]
+ [DataMember(Order = 3, Name = "service"), JsonPropertyOrder(3), JsonPropertyName("service"), YamlMember(Order = 3, Alias = "service")]
public virtual ServiceConfiguration? Service { get; set; }
}
diff --git a/src/gateway/CloudStreams.Gateway.Api.Server/CloudStreams.Gateway.Api.Server.csproj b/src/gateway/CloudStreams.Gateway.Api.Server/CloudStreams.Gateway.Api.Server.csproj
index 372073d6..be939921 100644
--- a/src/gateway/CloudStreams.Gateway.Api.Server/CloudStreams.Gateway.Api.Server.csproj
+++ b/src/gateway/CloudStreams.Gateway.Api.Server/CloudStreams.Gateway.Api.Server.csproj
@@ -21,8 +21,8 @@
-
-
+
+
diff --git a/src/gateway/CloudStreams.Gateway.Api/CloudStreams.Gateway.Api.csproj b/src/gateway/CloudStreams.Gateway.Api/CloudStreams.Gateway.Api.csproj
index 7f23677b..1c935912 100644
--- a/src/gateway/CloudStreams.Gateway.Api/CloudStreams.Gateway.Api.csproj
+++ b/src/gateway/CloudStreams.Gateway.Api/CloudStreams.Gateway.Api.csproj
@@ -40,7 +40,7 @@
-
+