From fb831175ec9fcdb5559e3e3a65d1e1defaf81253 Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Tue, 9 Apr 2024 14:18:58 +0200 Subject: [PATCH 1/2] fix(Core): Added new ICollection and Object related extensions fix(Core): Scoped ICollection extensions in a new Neuroglia.Collections namespace --- .../Extensions/ICollectionExtensions.cs | 2 +- .../Extensions/ObjectExtensions.cs | 25 + src/Neuroglia.Core/PropertyPath.cs | 2 +- .../EventStoreServiceCollectionExtensions.cs | 37 -- ...astructure.EventSourcing.EventStore.csproj | 2 + .../Services/ESEventStore.cs | 557 ------------------ .../Services/ESEventStoreFactory.cs | 53 -- .../JsonPatchTypeInfo.cs | 2 +- .../Services/JsonSchemaResolver.cs | 2 +- .../Extensions/ControllerBaseExtensions.cs | 6 +- .../EventStores/ESEventStoreTests.cs | 44 -- .../EventSourcingRepositoryTests.cs | 2 +- 12 files changed, 35 insertions(+), 699 deletions(-) delete mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Extensions/EventStoreServiceCollectionExtensions.cs delete mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs delete mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStoreFactory.cs delete mode 100644 test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/ESEventStoreTests.cs diff --git a/src/Neuroglia.Core/Extensions/ICollectionExtensions.cs b/src/Neuroglia.Core/Extensions/ICollectionExtensions.cs index 34430d3fd..0d125e1f4 100644 --- a/src/Neuroglia.Core/Extensions/ICollectionExtensions.cs +++ b/src/Neuroglia.Core/Extensions/ICollectionExtensions.cs @@ -13,7 +13,7 @@ using System.Collections; -namespace Neuroglia; +namespace Neuroglia.Collections; /// /// Defines extensions for s diff --git a/src/Neuroglia.Core/Extensions/ObjectExtensions.cs b/src/Neuroglia.Core/Extensions/ObjectExtensions.cs index ed7df2a39..91cdca468 100644 --- a/src/Neuroglia.Core/Extensions/ObjectExtensions.cs +++ b/src/Neuroglia.Core/Extensions/ObjectExtensions.cs @@ -47,4 +47,29 @@ public static class ObjectExtensions return expando; } + /// + /// Gets the value returned by the specified property + /// + /// The extended object + /// The name of the property to get + /// The value of the specified property + /// This method is used to dynamically get the property of an object, specifically when building an expression, which does not allow dynamic operations + public static object GetProperty(this object source, string name) + { + ArgumentNullException.ThrowIfNull(source); + ArgumentException.ThrowIfNullOrWhiteSpace(name); + var property = source.GetType().GetProperty(name) ?? throw new MissingMemberException($"Failed to find a property with the specified name '{name}'", name); + return property.GetValue(source)!; + } + + /// + /// Gets the value returned by the specified property + /// + /// The type of the property to get + /// The extended object + /// The name of the property to get + /// The value of the specified property + /// This method is used to dynamically get the property of an object, specifically when building an expression, which does not allow dynamic operations + public static T GetProperty(this object source, string name) => (T)source.GetProperty(name); + } \ No newline at end of file diff --git a/src/Neuroglia.Core/PropertyPath.cs b/src/Neuroglia.Core/PropertyPath.cs index 227ddf674..15bb47d5e 100644 --- a/src/Neuroglia.Core/PropertyPath.cs +++ b/src/Neuroglia.Core/PropertyPath.cs @@ -72,7 +72,7 @@ public MemberExpression ToExpression(Expression target) /// /// The input to parse /// A new based on the specified input - public static PropertyPath Parse(string input) => new PropertyPath(input); + public static PropertyPath Parse(string input) => new(input); /// /// Attempts to parse the specified input into a new diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Extensions/EventStoreServiceCollectionExtensions.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Extensions/EventStoreServiceCollectionExtensions.cs deleted file mode 100644 index 236a52608..000000000 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Extensions/EventStoreServiceCollectionExtensions.cs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright © 2021-Present Neuroglia SRL. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"), -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using Microsoft.Extensions.DependencyInjection; -using Neuroglia.Data.Infrastructure.EventSourcing.Services; - -namespace Neuroglia.Data.Infrastructure.EventSourcing; - -/// -/// Defines extensions for s -/// -public static class EventStoreServiceCollectionExtensions -{ - - /// - /// Adds and configures a - /// - /// The to configure - /// An used to configure the - /// The configured - public static IServiceCollection AddESEventStore(this IServiceCollection services, Action? setup = null) - { - services.AddEventStore(setup); - return services; - } - -} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Neuroglia.Data.Infrastructure.EventSourcing.EventStore.csproj b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Neuroglia.Data.Infrastructure.EventSourcing.EventStore.csproj index 5838a735b..7d38e2dc0 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Neuroglia.Data.Infrastructure.EventSourcing.EventStore.csproj +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Neuroglia.Data.Infrastructure.EventSourcing.EventStore.csproj @@ -30,7 +30,9 @@ + + diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs deleted file mode 100644 index 2a644fa68..000000000 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs +++ /dev/null @@ -1,557 +0,0 @@ -// Copyright © 2021-Present Neuroglia SRL. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"), -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using EventStore.Client; -using Grpc.Core; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Neuroglia.Data.Infrastructure.EventSourcing.Configuration; -using Neuroglia.Data.Infrastructure.EventSourcing.EventStore.Services; -using Neuroglia.Data.Infrastructure.EventSourcing.Services; -using Neuroglia.Plugins; -using Neuroglia.Serialization; -using System.Reactive.Linq; -using System.Reactive.Subjects; -using System.Runtime.CompilerServices; -using ESStreamPosition = EventStore.Client.StreamPosition; - -namespace Neuroglia.Data.Infrastructure.EventSourcing; - -/// -/// Represents the default Event Store implementation of the interface -/// -[Plugin(Tags = ["event-store"]), Factory(typeof(ESEventStoreFactory))] -public class ESEventStore - : IEventStore -{ - - /// - /// Initializes a new - /// - /// The service used to perform logging - /// The options used to configure the - /// The service used to provide s - /// The service used to interact with the remove Event Store service - /// The service used to interact with the remove Event Store service, exclusively for persistent subscriptions - public ESEventStore(ILogger logger, IOptions options, ISerializerProvider serializerProvider, EventStoreClient eventStoreClient, EventStorePersistentSubscriptionsClient eventStorePersistentSubscriptionsClient) - { - this.Logger = logger; - this.Options = options.Value; - this.Serializer = serializerProvider.GetSerializers().First(s => this.Options.SerializerType == null || s.GetType() == this.Options.SerializerType); - this.EventStoreClient = eventStoreClient; - this.EventStorePersistentSubscriptionsClient = eventStorePersistentSubscriptionsClient; - } - - /// - /// Gets the service used to perform logging - /// - protected virtual ILogger Logger { get; } - - /// - /// Gets the options used to configure the - /// - protected virtual EventStoreOptions Options { get; } - - /// - /// Gets the service used to interact with the remove Event Store service - /// - protected virtual EventStoreClient EventStoreClient { get; } - - /// - /// Gets the service used to interact with the remove Event Store service, exclusively for persistent subscriptions - /// - protected virtual EventStorePersistentSubscriptionsClient EventStorePersistentSubscriptionsClient { get; } - - /// - /// Gets the service used to serialize and deserialize s - /// - protected virtual ISerializer Serializer { get; } - - /// - public virtual async Task StreamExistsAsync(string streamId, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - return (await this.GetAsync(streamId, cancellationToken).ConfigureAwait(false)) != null; - } - - /// - public virtual async Task GetAsync(string streamId, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - var qualifiedStreamId = this.GetQualifiedStreamId(streamId); - - var streamMetadataResult = await this.EventStoreClient.GetStreamMetadataAsync(qualifiedStreamId, cancellationToken: cancellationToken).ConfigureAwait(false); - if (streamMetadataResult.StreamDeleted) throw new StreamNotFoundException(streamId); - var offset = streamMetadataResult.Metadata.TruncateBefore ?? StreamPosition.StartOfStream; - - var readResult = this.EventStoreClient.ReadStreamAsync(Direction.Forwards, qualifiedStreamId, offset, 1, cancellationToken: cancellationToken); - ReadState? readState; - - try { readState = await readResult.ReadState.ConfigureAwait(false); } - catch { throw new StreamNotFoundException(streamId); } - if (readState == ReadState.StreamNotFound) - { - if (streamId.StartsWith("$ce-")) return new EventStreamDescriptor(streamId, 0, null, null); - else throw new StreamNotFoundException(streamId); - } - var firstEvent = await readResult.FirstAsync(cancellationToken).ConfigureAwait(false); - readResult = this.EventStoreClient.ReadStreamAsync(Direction.Backwards, qualifiedStreamId, ESStreamPosition.End, 1, cancellationToken: cancellationToken); - var lastEvent = await readResult.FirstAsync(cancellationToken).ConfigureAwait(false); - - return new EventStreamDescriptor(streamId, lastEvent.Event.EventNumber.ToInt64() + 1 - offset.ToInt64(), firstEvent.Event.Created, lastEvent.Event.Created); - } - - /// - public virtual async Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events)); - if (expectedVersion < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(expectedVersion)); - streamId = this.GetQualifiedStreamId(streamId); - - var readResult = this.EventStoreClient.ReadStreamAsync(Direction.Backwards, streamId, ESStreamPosition.End, 1, cancellationToken: cancellationToken); - var shouldThrowIfNotExists = expectedVersion.HasValue && expectedVersion != StreamPosition.StartOfStream && expectedVersion != StreamPosition.EndOfStream; - try { if (await readResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound && shouldThrowIfNotExists) throw new OptimisticConcurrencyException(expectedVersion, null); } - catch (StreamDeletedException) { if(shouldThrowIfNotExists) throw new OptimisticConcurrencyException(expectedVersion, null); } - - var eventsData = events.Select(e => - { - var metadata = e.Metadata ?? new Dictionary(); - metadata[EventRecordMetadata.ClrTypeName] = e.Data?.GetType().AssemblyQualifiedName!; - return new EventData(Uuid.NewUuid(), e.Type, this.Serializer.SerializeToByteArray(e.Data), this.Serializer.SerializeToByteArray(metadata)); - }); - var writeResult = expectedVersion.HasValue - ? await this.EventStoreClient.AppendToStreamAsync(streamId, StreamRevision.FromInt64(expectedVersion.Value), eventsData, cancellationToken: cancellationToken).ConfigureAwait(false) - : await this.EventStoreClient.AppendToStreamAsync(streamId, StreamState.Any, eventsData, cancellationToken: cancellationToken).ConfigureAwait(false); - return writeResult.NextExpectedStreamRevision.ToUInt64(); - } - - /// - public virtual IAsyncEnumerable ReadAsync(string? streamId, StreamReadDirection readDirection, long offset, ulong? length = null, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) - { - if(string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ReadFromAllAsync(readDirection, offset, length, cancellationToken); - else return this.ReadFromStreamAsync(this.GetDatabaseStreamId()!, readDirection, offset, length, cancellationToken); - } - else return this.ReadFromStreamAsync(this.GetQualifiedStreamId(streamId), readDirection, offset, length, cancellationToken); - } - - /// - /// Reads events recorded on the specified stream - /// - /// The id of the stream to read events from - /// The direction in which to read the stream - /// The offset starting from which to read events - /// The amount of events to read - /// A - /// A new containing the events read from the store - protected virtual async IAsyncEnumerable ReadFromStreamAsync(string streamId, StreamReadDirection readDirection, long offset, ulong? length = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - - ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); - - var direction = readDirection switch - { - StreamReadDirection.Backwards => Direction.Backwards, - StreamReadDirection.Forwards => Direction.Forwards, - _ => throw new NotSupportedException($"The specified {nameof(StreamReadDirection)} '{readDirection}' is not supported") - }; - - var streamMetadataResult = await this.EventStoreClient.GetStreamMetadataAsync(streamId, cancellationToken: cancellationToken).ConfigureAwait(false); - if (streamMetadataResult.StreamDeleted) throw new StreamNotFoundException(streamId); - if (streamMetadataResult.Metadata.TruncateBefore.HasValue && offset != StreamPosition.EndOfStream && offset < streamMetadataResult.Metadata.TruncateBefore.Value.ToInt64()) offset = streamMetadataResult.Metadata.TruncateBefore.Value.ToInt64(); - - if (readDirection == StreamReadDirection.Forwards && offset == StreamPosition.EndOfStream) yield break; - else if (readDirection == StreamReadDirection.Backwards && offset == StreamPosition.StartOfStream) yield break; - - var readResult = this.EventStoreClient.ReadStreamAsync(direction, streamId, ESStreamPosition.FromInt64(offset), length.HasValue ? (long)length.Value : long.MaxValue, true, cancellationToken: cancellationToken); - try { if (await readResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound) throw new StreamNotFoundException(streamId); } - catch (StreamDeletedException) { throw new StreamNotFoundException(streamId); } - - await foreach (var e in readResult) yield return this.DeserializeEventRecord(e); - } - - /// - /// Reads recorded events across all streams - /// - /// The direction in which to read events - /// The offset starting from which to read events - /// The amount of events to read - /// A - /// A new containing the events read from the store - protected virtual async IAsyncEnumerable ReadFromAllAsync(StreamReadDirection readDirection, long offset, ulong? length = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - var direction = readDirection switch - { - StreamReadDirection.Backwards => Direction.Backwards, - StreamReadDirection.Forwards => Direction.Forwards, - _ => throw new NotSupportedException($"The specified {nameof(StreamReadDirection)} '{readDirection}' is not supported") - }; - - if (readDirection == StreamReadDirection.Forwards && offset == StreamPosition.EndOfStream) yield break; - else if (readDirection == StreamReadDirection.Backwards && offset == StreamPosition.StartOfStream) yield break; - - var position = offset switch - { - StreamPosition.StartOfStream => Position.Start, - StreamPosition.EndOfStream => Position.End, - _ => readDirection == StreamReadDirection.Backwards ? Position.End : Position.Start - }; - var events = this.EventStoreClient.ReadAllAsync(direction, position, length.HasValue ? (long)length.Value : long.MaxValue, cancellationToken: cancellationToken); - var streamOffset = 0; - await foreach (var e in events.Where(e => !e.Event.EventType.StartsWith('$'))) - { - if (readDirection == StreamReadDirection.Forwards ? streamOffset >= offset : streamOffset < (offset == StreamPosition.EndOfStream ? int.MaxValue : offset + 1)) yield return this.DeserializeEventRecord(e); - streamOffset++; - } - } - - /// - public virtual Task> ObserveAsync(string? streamId, long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) - { - if (string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ObserveAllAsync(offset, consumerGroup, cancellationToken); - else return this.ObserveStreamAsync(this.GetDatabaseStreamId()!, offset, consumerGroup, cancellationToken); - } - else return this.ObserveStreamAsync(streamId, offset, consumerGroup, cancellationToken); - } - - /// - /// Subscribes to events of the specified stream - /// - /// The id of the stream, if any, to subscribe to. If not set, subscribes to all events - /// The offset starting from which to receive events. Defaults to - /// The name of the consumer group, if any, in case the subscription is persistent - /// A - /// A new used to observe events - protected virtual async Task> ObserveStreamAsync(string streamId, long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); - if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId); - var qualifiedStreamId = this.GetQualifiedStreamId(streamId); - - var subject = new Subject(); - if (string.IsNullOrWhiteSpace(consumerGroup)) - { - var position = offset == StreamPosition.EndOfStream ? FromStream.End : FromStream.After(ESStreamPosition.FromInt64(offset)); - var records = new List(); - if (position != FromStream.End) records = await this.ReadAsync(streamId, StreamReadDirection.Forwards, offset, cancellationToken: cancellationToken).ToListAsync(cancellationToken).ConfigureAwait(false); - var subscription = await this.EventStoreClient.SubscribeToStreamAsync(qualifiedStreamId, FromStream.End, (sub, e, token) => this.OnEventConsumedAsync(subject, sub, e, token), true, (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false); - return Observable.StartWith(Observable.Using(() => subscription, watch => subject), records); - } - else - { - var position = offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset); - var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1); - try { await this.EventStorePersistentSubscriptionsClient.CreateToStreamAsync(streamId, consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } - catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { } - var checkpointedPosition = await this.GetConsumerCheckpointedPositionAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false); - var persistentSubscription = await this.EventStorePersistentSubscriptionsClient.SubscribeToStreamAsync(qualifiedStreamId, consumerGroup, (sub, e, retry, token) => this.OnEventConsumedAsync(subject, streamId, sub, e, retry, checkpointedPosition, token), (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false); - return Observable.Using(() => persistentSubscription, watch => subject); - } - } - - /// - /// Subscribes to all events - /// - /// The offset starting from which to receive events. Defaults to - /// The name of the consumer group, if any, in case the subscription is persistent - /// A - /// A new used to observe events - protected virtual async Task> ObserveAllAsync(long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) - { - ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); - - var subject = new ReplaySubject(); - if (string.IsNullOrWhiteSpace(consumerGroup)) - { - var position = offset == StreamPosition.EndOfStream ? FromAll.End : FromAll.Start; - var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents()); - var subscription = await this.EventStoreClient.SubscribeToAllAsync(position, (sub, e, token) => this.OnEventConsumedAsync(subject, sub, e, token), true, (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), filterOptions: filterOptions, cancellationToken: cancellationToken).ConfigureAwait(false); - var observable = Observable.Using(() => subscription, _ => subject); - var streamOffset = 0; - if (offset != StreamPosition.StartOfStream && offset != StreamPosition.EndOfStream) observable = observable.SkipWhile(e => - { - var skip = streamOffset < offset; - streamOffset++; - return skip; - }); - return observable; - } - else - { - var position = offset == StreamPosition.EndOfStream ? Position.End : Position.Start; - var filter = EventTypeFilter.ExcludeSystemEvents(); - var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1); - try { await this.EventStorePersistentSubscriptionsClient.CreateToAllAsync(consumerGroup, filter, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } - catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { } - var checkpointedPosition = await this.GetConsumerCheckpointedPositionAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); - var persistentSubscription = await this.EventStorePersistentSubscriptionsClient.SubscribeToAllAsync(consumerGroup, (sub, e, retry, token) => this.OnEventConsumedAsync(subject, null, sub, e, retry, checkpointedPosition, token), (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken); - return Observable.Using(() => persistentSubscription, watch => subject); - } - } - - /// - public virtual async Task SetOffsetAsync(string consumerGroup, long offset, string? streamId = null, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(consumerGroup)) throw new ArgumentNullException(nameof(consumerGroup)); - ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); - - IPosition position = string.IsNullOrWhiteSpace(streamId) ? offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.Start : offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset); - var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1); - PersistentSubscriptionInfo subscription; - streamId = string.IsNullOrWhiteSpace(streamId) ? string.IsNullOrWhiteSpace(this.Options.DatabaseName) ? null : this.GetDatabaseStreamId()! : this.GetQualifiedStreamId(streamId); - if (string.IsNullOrWhiteSpace(streamId)) - { - try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToAllAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); } - catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound) { throw new StreamNotFoundException(); } - if (subscription.Stats.LastCheckpointedEventPosition != null) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, subscription.Stats.LastCheckpointedEventPosition, cancellationToken).ConfigureAwait(false); - - await this.EventStorePersistentSubscriptionsClient.DeleteToAllAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); - try { await this.EventStorePersistentSubscriptionsClient.CreateToAllAsync(consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } //it occurred in tests that EventStore would only eventually delete the subscription, resulting in caught exception, thus the need for the try/catch block - catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { await this.EventStorePersistentSubscriptionsClient.UpdateToAllAsync(consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } - } - else - { - try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToStreamAsync(streamId, consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); } - catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound) { throw new StreamNotFoundException(); } - if (subscription.Stats.LastCheckpointedEventPosition != null) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, subscription.Stats.LastCheckpointedEventPosition, cancellationToken).ConfigureAwait(false); - - await this.EventStorePersistentSubscriptionsClient.DeleteToStreamAsync(streamId, consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); - try { await this.EventStorePersistentSubscriptionsClient.CreateToStreamAsync(streamId, consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } //it occurred in tests that EventStore would only eventually delete the subscription, resulting in caught exception, thus the need for the try/catch block - catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { await this.EventStorePersistentSubscriptionsClient.UpdateToStreamAsync(streamId, consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } - } - } - - /// - public virtual async Task TruncateAsync(string streamId, ulong? beforeVersion = null, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId); - - var truncateBefore = beforeVersion.HasValue ? ESStreamPosition.FromInt64((long)beforeVersion.Value) : ESStreamPosition.End; - await this.EventStoreClient.SetStreamMetadataAsync(this.GetQualifiedStreamId(streamId), StreamState.Any, new StreamMetadata(truncateBefore: truncateBefore), cancellationToken: cancellationToken).ConfigureAwait(false); - } - - /// - public virtual async Task DeleteAsync(string streamId, CancellationToken cancellationToken = default) - { - if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); - if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId); - - await this.EventStoreClient.DeleteAsync(this.GetQualifiedStreamId(streamId), StreamState.Any, cancellationToken: cancellationToken).ConfigureAwait(false); - } - - /// - /// Converts the specified stream id to a qualified stream id, which is prefixed with the current database name, if any - /// - /// The stream id to convert - /// The qualified id of the specified stream id - protected virtual string GetQualifiedStreamId(string streamId) => string.IsNullOrWhiteSpace(this.Options.DatabaseName) || streamId.StartsWith($"$ce-") ? streamId : $"{this.Options.DatabaseName}-{streamId}"; - - /// - /// Gets the id, if any, of the stream that contains references to all events in the database - /// - /// The id, if any, of the stream that contains references to all events in the database - protected virtual string? GetDatabaseStreamId() => string.IsNullOrWhiteSpace(this.Options.DatabaseName) ? null : $"$ce-{this.Options.DatabaseName}"; - - /// - /// Deserializes the specified into a new - /// - /// The to deserialize - /// The the has been produced by, if any - /// The to stream s to - /// A boolean indicating whether or not the is being replayed to its consumer. Ignore if 'subscription' is null - /// The deserialized - protected virtual IEventRecord DeserializeEventRecord(ResolvedEvent e, PersistentSubscription? subscription = null, ISubject? subject = null, bool? replayed = null) - { - var metadata = this.Serializer.Deserialize>(e.Event.Metadata.ToArray()); - var clrTypeName = metadata![EventRecordMetadata.ClrTypeName].ToString()!; - var clrType = Type.GetType(clrTypeName) ?? throw new Exception(); - var data = this.Serializer.Deserialize(e.Event.Data.ToArray(), clrType); - metadata.Remove(EventRecordMetadata.ClrTypeName); - if (!metadata.Any()) metadata = null; - if (subscription == null) return new EventRecord(e.OriginalStreamId, e.Event.EventId.ToString(), e.Event.EventNumber.ToUInt64(), e.Event.Position.CommitPosition, e.Event.Created, e.Event.EventType, data, metadata); - else return new AckableEventRecord(e.OriginalStreamId, e.Event.EventId.ToString(), e.Event.EventNumber.ToUInt64(), e.Event.Position.CommitPosition, e.Event.Created, e.Event.EventType, data, metadata, replayed, () => this.OnAckEventAsync(subject!, subscription, e), reason => this.OnNackEventAsync(subject!, subscription, e, reason)); - } - - /// - /// Gets the last checkpointed position, if any, of the specified consumer group - /// - /// The consumer group to get the highest checkpointed position for - /// The id of the stream, if any, to get the consumer group's checkpointed position for - /// A - /// A new awaitable - protected virtual async Task GetConsumerCheckpointedPositionAsync(string consumerGroup, string? streamId = null, CancellationToken cancellationToken = default) - { - try - { - return await this.ReadAsync(this.GetConsumerCheckpointStreamId(consumerGroup, streamId), StreamReadDirection.Forwards, StreamPosition.StartOfStream, cancellationToken: cancellationToken) - .Select(e => e.Data) - .OfType() - .OrderByDescending(u => u) - .FirstOrDefaultAsync(cancellationToken) - .ConfigureAwait(false); - } - catch (StreamNotFoundException) { return null; } - } - - /// - /// Sets the last checkpointed position of the specified consumer group - /// - /// The consumer group to set the last checkpointed position for - /// The id of the stream, if any, to get the consumer group's checkpointed position for - /// The last checkpointed position - /// A - /// A new awaitable - protected virtual async Task SetConsumerCheckpointPositionAsync(string consumerGroup, string? streamId, IPosition position, CancellationToken cancellationToken = default) - { - var data = position switch - { - Position pos => pos.CommitPosition, - ESStreamPosition spos => (ulong)spos.ToInt64(), - _ => throw new NotSupportedException($"The position type '{position.GetType()}' is not supported in this context") - }; - await this.AppendAsync(this.GetConsumerCheckpointStreamId(consumerGroup, streamId), new EventDescriptor[] { new("$checkpoint", data) }, cancellationToken: cancellationToken).ConfigureAwait(false); - } - - /// - /// Gets the id of the stream used to store the checkpoints of the specified consumer group, and optionally stream - /// - /// The consumer group to get the checkpoint stream id for - /// The id of the stream, if any, to get the consumer group's checkpoint stream for - /// - protected virtual string GetConsumerCheckpointStreamId(string consumerGroup, string? streamId) => $"${consumerGroup}:{streamId ?? "$all"}_checkpoints"; - - /// - /// Handles the consumption of a on a - /// - /// The to stream s to - /// The the has been received by - /// The to handle - /// A - /// A new awaitable - protected virtual Task OnEventConsumedAsync(ISubject subject, StreamSubscription subscription, ResolvedEvent e, CancellationToken cancellationToken) => Task.Run(() => subject.OnNext(this.DeserializeEventRecord(e)), cancellationToken); - - /// - /// Handles the consumption of a on a - /// - /// The to stream s to - /// The id of the stream, if any, to consume s from - /// The the has been received by - /// The to handle - /// The retry count, if any - /// The highest position ever checkpointed by the consumer group - /// A - /// A new awaitable - protected virtual Task OnEventConsumedAsync(ISubject subject, string? streamId, PersistentSubscription subscription, ResolvedEvent e, int? retryCount, ulong? checkpointedPosition, CancellationToken cancellationToken) - { - try - { - if (string.IsNullOrWhiteSpace(this.Options.DatabaseName) || !e.OriginalStreamId.StartsWith(this.GetDatabaseStreamId()!)) if (e.OriginalStreamId.StartsWith('$') || e.Event.Metadata.Length < 1) return subscription.Ack(e); - return Task.Run(() => subject.OnNext(this.DeserializeEventRecord(e, subscription, subject, checkpointedPosition > (string.IsNullOrWhiteSpace(streamId) ? e.Event.Position.CommitPosition : e.Event.EventNumber.ToUInt64()))), cancellationToken); - } - catch (Exception ex) - { - subject.OnError(ex); - return Task.CompletedTask; - } - } - - /// - /// Acks the specified - /// - /// The to stream s to - /// The the to ack has been received by - /// The to ack - /// A new awaitable - protected virtual async Task OnAckEventAsync(ISubject subject, PersistentSubscription subscription, ResolvedEvent e) - { - try { await subscription.Ack(e.OriginalEvent.EventId).ConfigureAwait(false); } - catch (ObjectDisposedException ex) { subject.OnError(ex); } - } - - /// - /// Nacks the specified - /// - /// The to stream s to - /// The the to nack has been received by - /// The to nack - /// The reason why to nack the - /// A new awaitable - protected virtual async Task OnNackEventAsync(ISubject subject, PersistentSubscription subscription, ResolvedEvent e, string? reason) - { - try { await subscription.Nack(PersistentSubscriptionNakEventAction.Retry, reason ?? "Unknown", e.OriginalEvent.EventId).ConfigureAwait(false); } - catch (ObjectDisposedException ex) { subject.OnError(ex); } - } - - /// - /// Handles the specified being dropped - /// - /// The to stream s to - /// The the has been received by - /// The reason why to drop the - /// The that occurred, if any - protected virtual void OnSubscriptionDropped(ISubject subject, StreamSubscription subscription, SubscriptionDroppedReason reason, Exception? ex) - { - switch (reason) - { - case SubscriptionDroppedReason.Disposed: - subject.OnCompleted(); - break; - case SubscriptionDroppedReason.SubscriberError: - case SubscriptionDroppedReason.ServerError: - subject.OnError(ex ?? new Exception()); - break; - } - } - - /// - /// Handles the specified being dropped - /// - /// The to stream s to - /// The the has been received by - /// The reason why to drop the - /// The that occurred, if any - protected virtual void OnSubscriptionDropped(ISubject subject, PersistentSubscription subscription, SubscriptionDroppedReason reason, Exception? ex) - { - switch (reason) - { - case SubscriptionDroppedReason.Disposed: - subject.OnCompleted(); - break; - case SubscriptionDroppedReason.SubscriberError: - case SubscriptionDroppedReason.ServerError: - subject.OnError(ex ?? new Exception()); - break; - } - } - - /// - /// Exposes constants about event related metadata used by the - /// - protected static class EventRecordMetadata - { - - /// - /// Gets the name of the event record metadata used to store the event CLR type's assembly qualified name - /// - public const string ClrTypeName = "clr-type"; - - } - -} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStoreFactory.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStoreFactory.cs deleted file mode 100644 index 5dd09c250..000000000 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStoreFactory.cs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright © 2021-Present Neuroglia SRL. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"), -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using EventStore.Client; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; - -namespace Neuroglia.Data.Infrastructure.EventSourcing.EventStore.Services; - -/// -/// Represents the service used to create instances -/// -/// -/// Initializes a new -/// -/// The current -public class ESEventStoreFactory(IServiceProvider serviceProvider) - : IFactory -{ - - /// - /// Gets the name of the EventStore connection string - /// - public const string ConnectionStringName = "eventstore"; - - /// - /// Gets the current - /// - protected IServiceProvider ServiceProvider { get; } = serviceProvider; - - /// - public virtual ESEventStore Create() - { - var configuration = this.ServiceProvider.GetRequiredService(); - var connectionString = configuration.GetConnectionString(ConnectionStringName); - if (string.IsNullOrWhiteSpace(connectionString)) throw new Exception($"An error occurred while attempting to create an ESEventStore instance. The '{ConnectionStringName}' connection string is not provided or is invalid. Please ensure that the connection string is properly configured in the application settings."); - var settings = EventStoreClientSettings.Create(connectionString); - return ActivatorUtilities.CreateInstance(this.ServiceProvider, new EventStoreClient(settings), new EventStorePersistentSubscriptionsClient(settings)); - } - - object IFactory.Create() => this.Create(); - -} diff --git a/src/Neuroglia.Data.PatchModel/JsonPatchTypeInfo.cs b/src/Neuroglia.Data.PatchModel/JsonPatchTypeInfo.cs index 3aa4d716e..508725e08 100644 --- a/src/Neuroglia.Data.PatchModel/JsonPatchTypeInfo.cs +++ b/src/Neuroglia.Data.PatchModel/JsonPatchTypeInfo.cs @@ -14,9 +14,9 @@ using Json.Patch; using Json.Pointer; using Microsoft.Extensions.DependencyInjection; +using Neuroglia.Collections; using Neuroglia.Data.Guards; using Neuroglia.Data.Infrastructure.Services; -using Neuroglia.Data.PatchModel.Services; using Neuroglia.Serialization.Json; using System.Collections; using System.Collections.Concurrent; diff --git a/src/Neuroglia.Data.Schemas.Json/Services/JsonSchemaResolver.cs b/src/Neuroglia.Data.Schemas.Json/Services/JsonSchemaResolver.cs index 1a099a222..9af5f9084 100644 --- a/src/Neuroglia.Data.Schemas.Json/Services/JsonSchemaResolver.cs +++ b/src/Neuroglia.Data.Schemas.Json/Services/JsonSchemaResolver.cs @@ -110,7 +110,7 @@ protected virtual async Task> ResolveReferencedSchemasA { var reference = refKeyword.Value.Deserialize()!; var refSchema = await this.ResolveReferencedSchemaAsync(new Uri(reference, UriKind.RelativeOrAbsolute), rootSchema, cancellationToken).ConfigureAwait(false); - if (refSchema.HasValue) refSchemas.Add(refSchema); + if (refSchema.HasValue) refSchemas.Add(refSchema.Value); } if (allOfKeyword.Value.ValueKind == JsonValueKind.Array) diff --git a/src/Neuroglia.Mediation.AspNetCore/Extensions/ControllerBaseExtensions.cs b/src/Neuroglia.Mediation.AspNetCore/Extensions/ControllerBaseExtensions.cs index e4c95fe90..45517e4b0 100644 --- a/src/Neuroglia.Mediation.AspNetCore/Extensions/ControllerBaseExtensions.cs +++ b/src/Neuroglia.Mediation.AspNetCore/Extensions/ControllerBaseExtensions.cs @@ -42,7 +42,7 @@ public static class ControllerBaseExtensions public static ActionResult Process(this ControllerBase controller, TResult result, int successStatusCode = 200) where TResult : IOperationResult { - if (result.Status != (int)HttpStatusCode.OK) + if (!(result.Status >= 200 && result.Status < 300)) { if (result.Status == (int)HttpStatusCode.Forbidden) return controller.StatusCode((int)HttpStatusCode.Forbidden); if (result.Status == (int)HttpStatusCode.BadRequest) @@ -53,10 +53,10 @@ public static ActionResult Process(this ControllerBase controller, TRes if (result.Status == (int)HttpStatusCode.NotFound) { result.Errors?.ToList().ForEach(e => controller.ModelState.AddModelError(e.Title!, e.Detail!)); - return NotFound(controller, controller.ModelState); ; + return NotFound(controller, controller.ModelState); } if (result.Status == (int)HttpStatusCode.NotModified) return controller.StatusCode((int)HttpStatusCode.NotModified); - return controller.StatusCode((int)HttpStatusCode.InternalServerError); + return controller.StatusCode((int)HttpStatusCode.InternalServerError, result.Data); } if (result.Data != null) return new ObjectResult(result.Data) { StatusCode = successStatusCode }; else return controller.StatusCode(successStatusCode); diff --git a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/ESEventStoreTests.cs b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/ESEventStoreTests.cs deleted file mode 100644 index ae3e31511..000000000 --- a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/ESEventStoreTests.cs +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright © 2021-Present Neuroglia SRL. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"), -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -using DotNet.Testcontainers.Containers; -using EventStore.Client; -using Microsoft.Extensions.DependencyInjection; -using Neuroglia.Data.Infrastructure.EventSourcing; -using Neuroglia.Serialization; -using Neuroglia.UnitTests.Containers; - -namespace Neuroglia.UnitTests.Cases.Data.Infrastructure.EventSourcing.EventStores; - -[TestCaseOrderer("Neuroglia.UnitTests.Services.PriorityTestCaseOrderer", "Neuroglia.UnitTests")] -public class ESEventStoreTests - : EventStoreTestsBase -{ - - public ESEventStoreTests() : base(BuildServices()) { } - - public static IServiceCollection BuildServices() - { - var services = new ServiceCollection(); - services.AddLogging(); - services.AddSerialization(); - services.AddSingleton(EventStoreContainerBuilder.Build()); - services.AddHostedService(provider => new ContainerBootstrapper(provider.GetRequiredService())); - services.AddSingleton(provider => EventStoreClientSettings.Create($"esdb://{provider.GetRequiredService().Hostname}:{provider.GetRequiredService().GetMappedPublicPort(EventStoreContainerBuilder.PublicPort2)}?tls=false")); - services.AddSingleton(provider => new EventStoreClient(provider.GetRequiredService())); - services.AddSingleton(provider => new EventStorePersistentSubscriptionsClient(provider.GetRequiredService())); - services.AddESEventStore(); - return services; - } - -} \ No newline at end of file diff --git a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/Repositories/EventSourcingRepositoryTests.cs b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/Repositories/EventSourcingRepositoryTests.cs index 75959cc64..84719b147 100644 --- a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/Repositories/EventSourcingRepositoryTests.cs +++ b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/Repositories/EventSourcingRepositoryTests.cs @@ -38,7 +38,7 @@ static IServiceCollection BuildServices() services.AddSingleton(provider => EventStoreClientSettings.Create($"esdb://{provider.GetRequiredService().Hostname}:{provider.GetRequiredService().GetMappedPublicPort(EventStoreContainerBuilder.PublicPort2)}?tls=false")); services.AddSingleton(provider => new EventStoreClient(provider.GetRequiredService())); services.AddSingleton(provider => new EventStorePersistentSubscriptionsClient(provider.GetRequiredService())); - services.AddESEventStore(); + services.AddEsdbEventStore(); services.AddEventSourcingRepository(); return services; } From 5a284ccaecf105f5e0a051bc0470f6a0a0a4146e Mon Sep 17 00:00:00 2001 From: Charles d'Avernas Date: Tue, 9 Apr 2024 14:20:36 +0200 Subject: [PATCH 2/2] feat(EventSourcing): Added the IProjectionManager and related services (IProjectionBuilder, ...) feat(EventStore): Added a new esdb implementation of the IProjectionManager interfaces and related services --- .../Extensions/IDictionaryExtensions.cs | 32 + .../Projection.cs | 35 ++ .../Services/Interfaces/IProjectionBuilder.cs | 46 ++ .../Services/Interfaces/IProjectionManager.cs | 41 ++ .../Interfaces/IProjectionSourceBuilder.cs | 30 + .../EsdbJavaScriptConversion.cs | 311 ++++++++++ .../EsdbServiceCollectionExtensions.cs | 50 ++ .../Services/EsdbEventStore.cs | 557 ++++++++++++++++++ .../Services/EsdbEventStoreFactory.cs | 53 ++ .../Services/EsdbProjectionBuilder.cs | 129 ++++ .../Services/EsdbProjectionManager.cs | 49 ++ .../EventStores/EsdbEventStoreTests.cs | 44 ++ .../EventStores/EsdbProjectionManagerTests.cs | 126 ++++ 13 files changed, 1503 insertions(+) create mode 100644 src/Neuroglia.Core/Extensions/IDictionaryExtensions.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Projection.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionBuilder.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionManager.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionSourceBuilder.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/EsdbJavaScriptConversion.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Extensions/EsdbServiceCollectionExtensions.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbEventStore.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbEventStoreFactory.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbProjectionBuilder.cs create mode 100644 src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbProjectionManager.cs create mode 100644 test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/EsdbEventStoreTests.cs create mode 100644 test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/EsdbProjectionManagerTests.cs diff --git a/src/Neuroglia.Core/Extensions/IDictionaryExtensions.cs b/src/Neuroglia.Core/Extensions/IDictionaryExtensions.cs new file mode 100644 index 000000000..ce48da59d --- /dev/null +++ b/src/Neuroglia.Core/Extensions/IDictionaryExtensions.cs @@ -0,0 +1,32 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia; + +/// +/// Defines extensions for instances +/// +public static class IDictionaryExtensions +{ + + /// + /// Gets the value at the specified key + /// + /// The type of the keys + /// The type of the values + /// The extended + /// The key of the value to get + /// The value with the specified key + public static TValue Get(this IDictionary dictionary, TKey key) => dictionary[key]; + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Projection.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Projection.cs new file mode 100644 index 000000000..998f112cc --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Projection.cs @@ -0,0 +1,35 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Neuroglia.Data.Infrastructure.EventSourcing.Services; + +namespace Neuroglia.Data.Infrastructure.EventSourcing; + +/// +/// Defines constants and statics used to help expressing event-driven projections +/// +public static class Projection +{ + + /// + /// Placeholder method used by implementations to link a processed event to a specific stream + /// + /// The stream to link the processed event to + /// The processed event + public static void LinkEventTo(string stream, IEventRecord e) + { + ArgumentException.ThrowIfNullOrWhiteSpace(stream); + ArgumentNullException.ThrowIfNull(e); + } + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionBuilder.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionBuilder.cs new file mode 100644 index 000000000..ce43d0952 --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionBuilder.cs @@ -0,0 +1,46 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Linq.Expressions; + +namespace Neuroglia.Data.Infrastructure.EventSourcing.Services; + +/// +/// Defines the fundamentals of a service used to build event-driven projections +/// +/// The type of the state of the event-driven projection to create +public interface IProjectionBuilder +{ + + /// + /// Configures the used to create the projection's initial state + /// + /// The used to create the projection's initial state + /// The configured + IProjectionBuilder Given(Expression> factory); + + /// + /// Configures the predicate used to to filter incoming event records based on the current projection state + /// + /// A used to to filter incoming event records based on the current projection state + /// The configured + IProjectionBuilder When(Expression> predicate); + + /// + /// Configures an to be performed on the projection state when a matching event record is processed + /// + /// An to be performed on the projection state when a matching event record is processed + /// The configured + IProjectionBuilder Then(Expression> action); + +} \ No newline at end of file diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionManager.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionManager.cs new file mode 100644 index 000000000..c1cf210d3 --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionManager.cs @@ -0,0 +1,41 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.Data.Infrastructure.EventSourcing.Services; + +/// +/// Defines the fundamentals of a service used to create and retrieve event-driven projections +/// +public interface IProjectionManager +{ + + /// + /// Creates a new event-driven projection + /// + /// The type of the state of the projection to create + /// The name of the projection to create + /// An used to build and configure the event-driven projection to create + /// A + /// A new awaitable + Task CreateAsync(string name, Action> setup, CancellationToken cancellationToken = default); + + /// + /// Retrieves the current state of the event-driven projection with the specified name + /// + /// The type of the projection's state + /// The name of the event-driven projection to get the current state of + /// A + /// The current state of the specified event-driven projection + Task GetStateAsync(string name, CancellationToken cancellationToken = default); + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionSourceBuilder.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionSourceBuilder.cs new file mode 100644 index 000000000..973466f86 --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IProjectionSourceBuilder.cs @@ -0,0 +1,30 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Neuroglia.Data.Infrastructure.EventSourcing.Services; + +/// +/// Defines the fundamentals of a service used to build a new event-driven projection source +/// +/// The type of the state of the projection to build the source for +public interface IProjectionSourceBuilder +{ + + /// + /// Builds a new event-driven projection that processes events from the specified stream + /// + /// The name of the stream from which events will be processed by the projection + /// A new implementation used to configure the projection to build + IProjectionBuilder FromStream(string name); + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/EsdbJavaScriptConversion.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/EsdbJavaScriptConversion.cs new file mode 100644 index 000000000..1fa8d11ae --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/EsdbJavaScriptConversion.cs @@ -0,0 +1,311 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Lambda2Js; +using Neuroglia; +using Neuroglia.Collections; +using System.Collections; +using System.Linq.Expressions; + +namespace Neuroglia.Data.Infrastructure.EventSourcing; + +/// +/// Defines custom s used to translate projections to EventStore +/// +public static class EsdbJavaScriptConversion +{ + + /// + /// Gets an array of the s used to translate projections to EventStore + /// + public static readonly JavascriptConversionExtension[] Extensions = [new NullCheckConversionExtension(), new BinaryExpressionConversionExtension(), new ObjectConversionExtension(), new ArrayInitializerConversionExtension(), new ListConversionExtension(), new CollectionConversionExtension(), new DictionaryConversionExtension(), new EventRecordConversionExtension(), new ProjectionConversionExtension()]; + + class NullCheckConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not BinaryExpression binary || (binary.NodeType != ExpressionType.Equal && binary.NodeType != ExpressionType.NotEqual)) return; + var leftNull = binary.Left is ConstantExpression left && left.Value == null; + var rightNull = binary.Right is ConstantExpression right && right.Value == null; + if ((!leftNull && !rightNull) || (leftNull && rightNull)) return; + context.PreventDefault(); + var compareTo = leftNull ? binary.Right : binary.Left; + var op = binary.NodeType switch + { + ExpressionType.Equal => "===", + ExpressionType.NotEqual => "!==", + _ => throw new NotSupportedException() + }; + context.Write(compareTo); + context.Write($" {op} undefined && "); + context.Write(compareTo); + context.Write($" {op} null"); + } + + } + + class BinaryExpressionConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not BinaryExpression binary) return; + switch (binary.NodeType) + { + case ExpressionType.Add: + context.PreventDefault(); + context.Write(binary.Left); + context.Write(" + "); + context.Write(binary.Right); + break; + case ExpressionType.Subtract: + context.PreventDefault(); + context.Write(binary.Left); + context.Write(" - "); + context.Write(binary.Right); + break; + case ExpressionType.Multiply: + context.PreventDefault(); + context.Write(binary.Left); + context.Write(" * "); + context.Write(binary.Right); + break; + case ExpressionType.Divide: + context.PreventDefault(); + context.Write(binary.Left); + context.Write(" / "); + context.Write(binary.Right); + break; + case ExpressionType.And: + context.PreventDefault(); + context.Write(binary.Left); + context.Write(" & "); + context.Write(binary.Right); + break; + case ExpressionType.AndAlso: + context.PreventDefault(); + context.Write(binary.Left); + context.Write(" && "); + context.Write(binary.Right); + break; + case ExpressionType.Or: + context.PreventDefault(); + context.Write(binary.Left); + context.Write(" | "); + context.Write(binary.Right); + break; + case ExpressionType.OrElse: + context.PreventDefault(); + context.Write(binary.Left); + context.Write(" || "); + context.Write(binary.Right); + break; + } + } + + } + + class ObjectConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not MethodCallExpression call || call.Method.DeclaringType != typeof(ObjectExtensions)) return; + context.PreventDefault(); + switch (call.Method.Name) + { + case nameof(ObjectExtensions.GetProperty): + { + using (context.Operation(JavascriptOperationTypes.Call)) + { + using (context.Operation(JavascriptOperationTypes.IndexerProperty)) + context.WriteNode(call.Arguments.First()); + context.Write($".{call.Arguments.Last().ToString()[1..^1]}"); + } + break; + } + } + } + + } + + class ArrayInitializerConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not NewExpression constructor || !constructor.Type.IsArray && !typeof(IEnumerable).IsAssignableFrom(constructor.Type)) return; + context.PreventDefault(); + context.Write("[]"); + } + + } + + class ListConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not MethodCallExpression call || !typeof(ICollection).IsAssignableFrom(call.Method.DeclaringType)) return; + context.PreventDefault(); + switch (call.Method.Name) + { + case nameof(ICollection.Contains): + { + using (context.Operation(JavascriptOperationTypes.Call)) + { + using (context.Operation(JavascriptOperationTypes.IndexerProperty)) + context.Write($"{call.Object}.includes"); + context.WriteManyIsolated('(', ')', ',', call.Arguments); + } + break; + } + case nameof(ICollection.Add): + { + using (context.Operation(JavascriptOperationTypes.Call)) + { + using (context.Operation(JavascriptOperationTypes.IndexerProperty)) + context.Write($"{call.Object}.push"); + context.WriteManyIsolated('(', ')', ',', call.Arguments); + } + break; + } + + } + } + + } + + class CollectionConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not MethodCallExpression call || call.Method.DeclaringType != typeof(ICollectionExtensions)) return; + context.PreventDefault(); + switch (call.Method.Name) + { + case nameof(ICollectionExtensions.Contains): + { + using (context.Operation(JavascriptOperationTypes.Call)) + { + using (context.Operation(JavascriptOperationTypes.IndexerProperty))context.WriteNode(call.Arguments.First()); + context.Write(".includes("); + context.WriteNode(call.Arguments.Last()); + context.Write(")"); + } + break; + } + case nameof(ICollectionExtensions.Add): + { + using (context.Operation(JavascriptOperationTypes.Call)) + { + using (context.Operation(JavascriptOperationTypes.IndexerProperty)) context.WriteNode(call.Arguments.First()); + context.Write(".push("); + context.WriteNode(call.Arguments.Last()); + context.Write(")"); + } + break; + } + + } + } + + } + + class DictionaryConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not MethodCallExpression call || call.Method.DeclaringType != typeof(IDictionaryExtensions)) return; + context.PreventDefault(); + switch (call.Method.Name) + { + case nameof(IDictionaryExtensions.Get): + { + using (context.Operation(JavascriptOperationTypes.Call)) + { + using (context.Operation(JavascriptOperationTypes.IndexerProperty)) + context.WriteNode(call.Arguments.First()); + context.Write($".{call.Arguments.Last().ToString()[1..^1]}"); + } + return; + } + } + } + + } + + class EventRecordConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not MemberExpression member || member.Member.DeclaringType != typeof(IEventRecord)) return; + context.PreventDefault(); + using (context.Operation(JavascriptOperationTypes.IndexerProperty)) + { + context.WriteNode(member.Expression); + context.Write($".{member.Member.Name.ToCamelCase()}"); + } + } + + } + + class ProjectionConversionExtension + : JavascriptConversionExtension + { + + /// + public override void ConvertToJavascript(JavascriptConversionContext context) + { + if (context.Node is not MethodCallExpression call || call.Method.DeclaringType != typeof(Projection)) return; + context.PreventDefault(); + switch (call.Method.Name) + { + case nameof(Projection.LinkEventTo): + { + using (context.Operation(JavascriptOperationTypes.Call)) + { + using (context.Operation(JavascriptOperationTypes.Call)) + { + using (context.Operation(JavascriptOperationTypes.IndexerProperty))context.Write("linkTo"); + context.WriteManyIsolated('(', ')', ',', call.Arguments); + } + } + break; + } + } + } + + } + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Extensions/EsdbServiceCollectionExtensions.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Extensions/EsdbServiceCollectionExtensions.cs new file mode 100644 index 000000000..3fd753f0a --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Extensions/EsdbServiceCollectionExtensions.cs @@ -0,0 +1,50 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Neuroglia.Data.Infrastructure.EventSourcing.Services; + +namespace Neuroglia.Data.Infrastructure.EventSourcing; + +/// +/// Defines extensions for s +/// +public static class EsdbServiceCollectionExtensions +{ + + /// + /// Adds and configures a + /// + /// The to configure + /// An used to configure the + /// The configured + public static IServiceCollection AddEsdbEventStore(this IServiceCollection services, Action? setup = null) + { + services.AddEventStore(setup); + return services; + } + + /// + /// Adds and configures a + /// + /// The to configure + /// The configured + public static IServiceCollection AddEsdbProjectionManager(this IServiceCollection services) + { + services.TryAddSingleton(); + services.TryAddSingleton(provider => provider.GetRequiredService()); + return services; + } + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbEventStore.cs new file mode 100644 index 000000000..ddb8e4cff --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbEventStore.cs @@ -0,0 +1,557 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using EventStore.Client; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Neuroglia.Data.Infrastructure.EventSourcing.Configuration; +using Neuroglia.Data.Infrastructure.EventSourcing.EventStore.Services; +using Neuroglia.Data.Infrastructure.EventSourcing.Services; +using Neuroglia.Plugins; +using Neuroglia.Serialization; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Runtime.CompilerServices; +using ESStreamPosition = EventStore.Client.StreamPosition; + +namespace Neuroglia.Data.Infrastructure.EventSourcing; + +/// +/// Represents the default Event Store implementation of the interface +/// +[Plugin(Tags = ["event-store"]), Factory(typeof(EsdbEventStoreFactory))] +public class EsdbEventStore + : IEventStore +{ + + /// + /// Initializes a new + /// + /// The service used to perform logging + /// The options used to configure the + /// The service used to provide s + /// The service used to interact with the remove Event Store service + /// The service used to interact with the remove Event Store service, exclusively for persistent subscriptions + public EsdbEventStore(ILogger logger, IOptions options, ISerializerProvider serializerProvider, EventStoreClient eventStoreClient, EventStorePersistentSubscriptionsClient eventStorePersistentSubscriptionsClient) + { + this.Logger = logger; + this.Options = options.Value; + this.Serializer = serializerProvider.GetSerializers().First(s => this.Options.SerializerType == null || s.GetType() == this.Options.SerializerType); + this.EventStoreClient = eventStoreClient; + this.EventStorePersistentSubscriptionsClient = eventStorePersistentSubscriptionsClient; + } + + /// + /// Gets the service used to perform logging + /// + protected virtual ILogger Logger { get; } + + /// + /// Gets the options used to configure the + /// + protected virtual EventStoreOptions Options { get; } + + /// + /// Gets the service used to interact with the remove Event Store service + /// + protected virtual EventStoreClient EventStoreClient { get; } + + /// + /// Gets the service used to interact with the remove Event Store service, exclusively for persistent subscriptions + /// + protected virtual EventStorePersistentSubscriptionsClient EventStorePersistentSubscriptionsClient { get; } + + /// + /// Gets the service used to serialize and deserialize s + /// + protected virtual ISerializer Serializer { get; } + + /// + public virtual async Task StreamExistsAsync(string streamId, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); + return (await this.GetAsync(streamId, cancellationToken).ConfigureAwait(false)) != null; + } + + /// + public virtual async Task GetAsync(string streamId, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); + var qualifiedStreamId = this.GetQualifiedStreamId(streamId); + + var streamMetadataResult = await this.EventStoreClient.GetStreamMetadataAsync(qualifiedStreamId, cancellationToken: cancellationToken).ConfigureAwait(false); + if (streamMetadataResult.StreamDeleted) throw new StreamNotFoundException(streamId); + var offset = streamMetadataResult.Metadata.TruncateBefore ?? StreamPosition.StartOfStream; + + var readResult = this.EventStoreClient.ReadStreamAsync(Direction.Forwards, qualifiedStreamId, offset, 1, cancellationToken: cancellationToken); + ReadState? readState; + + try { readState = await readResult.ReadState.ConfigureAwait(false); } + catch { throw new StreamNotFoundException(streamId); } + if (readState == ReadState.StreamNotFound) + { + if (streamId.StartsWith("$ce-")) return new EventStreamDescriptor(streamId, 0, null, null); + else throw new StreamNotFoundException(streamId); + } + var firstEvent = await readResult.FirstAsync(cancellationToken).ConfigureAwait(false); + readResult = this.EventStoreClient.ReadStreamAsync(Direction.Backwards, qualifiedStreamId, ESStreamPosition.End, 1, cancellationToken: cancellationToken); + var lastEvent = await readResult.FirstAsync(cancellationToken).ConfigureAwait(false); + + return new EventStreamDescriptor(streamId, lastEvent.Event.EventNumber.ToInt64() + 1 - offset.ToInt64(), firstEvent.Event.Created, lastEvent.Event.Created); + } + + /// + public virtual async Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); + if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events)); + if (expectedVersion < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(expectedVersion)); + streamId = this.GetQualifiedStreamId(streamId); + + var readResult = this.EventStoreClient.ReadStreamAsync(Direction.Backwards, streamId, ESStreamPosition.End, 1, cancellationToken: cancellationToken); + var shouldThrowIfNotExists = expectedVersion.HasValue && expectedVersion != StreamPosition.StartOfStream && expectedVersion != StreamPosition.EndOfStream; + try { if (await readResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound && shouldThrowIfNotExists) throw new OptimisticConcurrencyException(expectedVersion, null); } + catch (StreamDeletedException) { if(shouldThrowIfNotExists) throw new OptimisticConcurrencyException(expectedVersion, null); } + + var eventsData = events.Select(e => + { + var metadata = e.Metadata ?? new Dictionary(); + metadata[EventRecordMetadata.ClrTypeName] = e.Data?.GetType().AssemblyQualifiedName!; + return new EventData(Uuid.NewUuid(), e.Type, this.Serializer.SerializeToByteArray(e.Data), this.Serializer.SerializeToByteArray(metadata)); + }); + var writeResult = expectedVersion.HasValue + ? await this.EventStoreClient.AppendToStreamAsync(streamId, StreamRevision.FromInt64(expectedVersion.Value), eventsData, cancellationToken: cancellationToken).ConfigureAwait(false) + : await this.EventStoreClient.AppendToStreamAsync(streamId, StreamState.Any, eventsData, cancellationToken: cancellationToken).ConfigureAwait(false); + return writeResult.NextExpectedStreamRevision.ToUInt64(); + } + + /// + public virtual IAsyncEnumerable ReadAsync(string? streamId, StreamReadDirection readDirection, long offset, ulong? length = null, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) + { + if(string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ReadFromAllAsync(readDirection, offset, length, cancellationToken); + else return this.ReadFromStreamAsync(this.GetDatabaseStreamId()!, readDirection, offset, length, cancellationToken); + } + else return this.ReadFromStreamAsync(this.GetQualifiedStreamId(streamId), readDirection, offset, length, cancellationToken); + } + + /// + /// Reads events recorded on the specified stream + /// + /// The id of the stream to read events from + /// The direction in which to read the stream + /// The offset starting from which to read events + /// The amount of events to read + /// A + /// A new containing the events read from the store + protected virtual async IAsyncEnumerable ReadFromStreamAsync(string streamId, StreamReadDirection readDirection, long offset, ulong? length = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); + + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); + + var direction = readDirection switch + { + StreamReadDirection.Backwards => Direction.Backwards, + StreamReadDirection.Forwards => Direction.Forwards, + _ => throw new NotSupportedException($"The specified {nameof(StreamReadDirection)} '{readDirection}' is not supported") + }; + + var streamMetadataResult = await this.EventStoreClient.GetStreamMetadataAsync(streamId, cancellationToken: cancellationToken).ConfigureAwait(false); + if (streamMetadataResult.StreamDeleted) throw new StreamNotFoundException(streamId); + if (streamMetadataResult.Metadata.TruncateBefore.HasValue && offset != StreamPosition.EndOfStream && offset < streamMetadataResult.Metadata.TruncateBefore.Value.ToInt64()) offset = streamMetadataResult.Metadata.TruncateBefore.Value.ToInt64(); + + if (readDirection == StreamReadDirection.Forwards && offset == StreamPosition.EndOfStream) yield break; + else if (readDirection == StreamReadDirection.Backwards && offset == StreamPosition.StartOfStream) yield break; + + var readResult = this.EventStoreClient.ReadStreamAsync(direction, streamId, ESStreamPosition.FromInt64(offset), length.HasValue ? (long)length.Value : long.MaxValue, true, cancellationToken: cancellationToken); + try { if (await readResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound) throw new StreamNotFoundException(streamId); } + catch (StreamDeletedException) { throw new StreamNotFoundException(streamId); } + + await foreach (var e in readResult) yield return this.DeserializeEventRecord(e); + } + + /// + /// Reads recorded events accross all streams + /// + /// The direction in which to read events + /// The offset starting from which to read events + /// The amount of events to read + /// A + /// A new containing the events read from the store + protected virtual async IAsyncEnumerable ReadFromAllAsync(StreamReadDirection readDirection, long offset, ulong? length = null, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var direction = readDirection switch + { + StreamReadDirection.Backwards => Direction.Backwards, + StreamReadDirection.Forwards => Direction.Forwards, + _ => throw new NotSupportedException($"The specified {nameof(StreamReadDirection)} '{readDirection}' is not supported") + }; + + if (readDirection == StreamReadDirection.Forwards && offset == StreamPosition.EndOfStream) yield break; + else if (readDirection == StreamReadDirection.Backwards && offset == StreamPosition.StartOfStream) yield break; + + var position = offset switch + { + StreamPosition.StartOfStream => Position.Start, + StreamPosition.EndOfStream => Position.End, + _ => readDirection == StreamReadDirection.Backwards ? Position.End : Position.Start + }; + var events = this.EventStoreClient.ReadAllAsync(direction, position, length.HasValue ? (long)length.Value : long.MaxValue, cancellationToken: cancellationToken); + var streamOffset = 0; + await foreach (var e in events.Where(e => !e.Event.EventType.StartsWith('$'))) + { + if (readDirection == StreamReadDirection.Forwards ? streamOffset >= offset : streamOffset < (offset == StreamPosition.EndOfStream ? int.MaxValue : offset + 1)) yield return this.DeserializeEventRecord(e); + streamOffset++; + } + } + + /// + public virtual Task> ObserveAsync(string? streamId, long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) + { + if (string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ObserveAllAsync(offset, consumerGroup, cancellationToken); + else return this.ObserveStreamAsync(this.GetDatabaseStreamId()!, offset, consumerGroup, cancellationToken); + } + else return this.ObserveStreamAsync(streamId, offset, consumerGroup, cancellationToken); + } + + /// + /// Subscribes to events of the specified stream + /// + /// The id of the stream, if any, to subscribe to. If not set, subscribes to all events + /// The offset starting from which to receive events. Defaults to + /// The name of the consumer group, if any, in case the subscription is persistent + /// A + /// A new used to observe events + protected virtual async Task> ObserveStreamAsync(string streamId, long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); + if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId); + var qualifiedStreamId = this.GetQualifiedStreamId(streamId); + + var subject = new Subject(); + if (string.IsNullOrWhiteSpace(consumerGroup)) + { + var position = offset == StreamPosition.EndOfStream ? FromStream.End : FromStream.After(ESStreamPosition.FromInt64(offset)); + var records = new List(); + if (position != FromStream.End) records = await this.ReadAsync(streamId, StreamReadDirection.Forwards, offset, cancellationToken: cancellationToken).ToListAsync(cancellationToken).ConfigureAwait(false); + var subscription = await this.EventStoreClient.SubscribeToStreamAsync(qualifiedStreamId, FromStream.End, (sub, e, token) => this.OnEventConsumedAsync(subject, sub, e, token), true, (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false); + return Observable.StartWith(Observable.Using(() => subscription, watch => subject), records); + } + else + { + var position = offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset); + var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1); + try { await this.EventStorePersistentSubscriptionsClient.CreateToStreamAsync(streamId, consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } + catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { } + var checkpointedPosition = await this.GetConsumerCheckpointedPositionAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false); + var persistentSubscription = await this.EventStorePersistentSubscriptionsClient.SubscribeToStreamAsync(qualifiedStreamId, consumerGroup, (sub, e, retry, token) => this.OnEventConsumedAsync(subject, streamId, sub, e, retry, checkpointedPosition, token), (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false); + return Observable.Using(() => persistentSubscription, watch => subject); + } + } + + /// + /// Subscribes to all events + /// + /// The offset starting from which to receive events. Defaults to + /// The name of the consumer group, if any, in case the subscription is persistent + /// A + /// A new used to observe events + protected virtual async Task> ObserveAllAsync(long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default) + { + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); + + var subject = new ReplaySubject(); + if (string.IsNullOrWhiteSpace(consumerGroup)) + { + var position = offset == StreamPosition.EndOfStream ? FromAll.End : FromAll.Start; + var filterOptions = new SubscriptionFilterOptions(EventTypeFilter.ExcludeSystemEvents()); + var subscription = await this.EventStoreClient.SubscribeToAllAsync(position, (sub, e, token) => this.OnEventConsumedAsync(subject, sub, e, token), true, (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), filterOptions: filterOptions, cancellationToken: cancellationToken).ConfigureAwait(false); + var observable = Observable.Using(() => subscription, _ => subject); + var streamOffset = 0; + if (offset != StreamPosition.StartOfStream && offset != StreamPosition.EndOfStream) observable = observable.SkipWhile(e => + { + var skip = streamOffset < offset; + streamOffset++; + return skip; + }); + return observable; + } + else + { + var position = offset == StreamPosition.EndOfStream ? Position.End : Position.Start; + var filter = EventTypeFilter.ExcludeSystemEvents(); + var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1); + try { await this.EventStorePersistentSubscriptionsClient.CreateToAllAsync(consumerGroup, filter, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } + catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { } + var checkpointedPosition = await this.GetConsumerCheckpointedPositionAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); + var persistentSubscription = await this.EventStorePersistentSubscriptionsClient.SubscribeToAllAsync(consumerGroup, (sub, e, retry, token) => this.OnEventConsumedAsync(subject, null, sub, e, retry, checkpointedPosition, token), (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken); + return Observable.Using(() => persistentSubscription, watch => subject); + } + } + + /// + public virtual async Task SetOffsetAsync(string consumerGroup, long offset, string? streamId = null, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(consumerGroup)) throw new ArgumentNullException(nameof(consumerGroup)); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); + + IPosition position = string.IsNullOrWhiteSpace(streamId) ? offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.Start : offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset); + var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1); + PersistentSubscriptionInfo subscription; + streamId = string.IsNullOrWhiteSpace(streamId) ? string.IsNullOrWhiteSpace(this.Options.DatabaseName) ? null : this.GetDatabaseStreamId()! : this.GetQualifiedStreamId(streamId); + if (string.IsNullOrWhiteSpace(streamId)) + { + try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToAllAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); } + catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound) { throw new StreamNotFoundException(); } + if (subscription.Stats.LastCheckpointedEventPosition != null) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, subscription.Stats.LastCheckpointedEventPosition, cancellationToken).ConfigureAwait(false); + + await this.EventStorePersistentSubscriptionsClient.DeleteToAllAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); + try { await this.EventStorePersistentSubscriptionsClient.CreateToAllAsync(consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } //it occurred in tests that EventStore would only eventually delete the subscription, resulting in caught exception, thus the need for the try/catch block + catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { await this.EventStorePersistentSubscriptionsClient.UpdateToAllAsync(consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } + } + else + { + try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToStreamAsync(streamId, consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); } + catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound) { throw new StreamNotFoundException(); } + if (subscription.Stats.LastCheckpointedEventPosition != null) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, subscription.Stats.LastCheckpointedEventPosition, cancellationToken).ConfigureAwait(false); + + await this.EventStorePersistentSubscriptionsClient.DeleteToStreamAsync(streamId, consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); + try { await this.EventStorePersistentSubscriptionsClient.CreateToStreamAsync(streamId, consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } //it occurred in tests that EventStore would only eventually delete the subscription, resulting in caught exception, thus the need for the try/catch block + catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { await this.EventStorePersistentSubscriptionsClient.UpdateToStreamAsync(streamId, consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); } + } + } + + /// + public virtual async Task TruncateAsync(string streamId, ulong? beforeVersion = null, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); + if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId); + + var truncateBefore = beforeVersion.HasValue ? ESStreamPosition.FromInt64((long)beforeVersion.Value) : ESStreamPosition.End; + await this.EventStoreClient.SetStreamMetadataAsync(this.GetQualifiedStreamId(streamId), StreamState.Any, new StreamMetadata(truncateBefore: truncateBefore), cancellationToken: cancellationToken).ConfigureAwait(false); + } + + /// + public virtual async Task DeleteAsync(string streamId, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); + if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId); + + await this.EventStoreClient.DeleteAsync(this.GetQualifiedStreamId(streamId), StreamState.Any, cancellationToken: cancellationToken).ConfigureAwait(false); + } + + /// + /// Converts the specified stream id to a qualified stream id, which is prefixed with the current database name, if any + /// + /// The stream id to convert + /// The qualified id of the specified stream id + protected virtual string GetQualifiedStreamId(string streamId) => string.IsNullOrWhiteSpace(this.Options.DatabaseName) || streamId.StartsWith($"$ce-") ? streamId : $"{this.Options.DatabaseName}-{streamId}"; + + /// + /// Gets the id, if any, of the stream that contains references to all events in the database + /// + /// The id, if any, of the stream that contains references to all events in the database + protected virtual string? GetDatabaseStreamId() => string.IsNullOrWhiteSpace(this.Options.DatabaseName) ? null : $"$ce-{this.Options.DatabaseName}"; + + /// + /// Deserializes the specified into a new + /// + /// The to deserialize + /// The the has been produced by, if any + /// The to stream s to + /// A boolean indicating whether or not the is being replayed to its consumer. Ignore if 'subscription' is null + /// The deserialized + protected virtual IEventRecord DeserializeEventRecord(ResolvedEvent e, PersistentSubscription? subscription = null, ISubject? subject = null, bool? replayed = null) + { + var metadata = this.Serializer.Deserialize>(e.Event.Metadata.ToArray()); + var clrTypeName = metadata![EventRecordMetadata.ClrTypeName].ToString()!; + var clrType = Type.GetType(clrTypeName) ?? throw new Exception(); + var data = this.Serializer.Deserialize(e.Event.Data.ToArray(), clrType); + metadata.Remove(EventRecordMetadata.ClrTypeName); + if (!metadata.Any()) metadata = null; + if (subscription == null) return new EventRecord(e.OriginalStreamId, e.Event.EventId.ToString(), e.Event.EventNumber.ToUInt64(), e.Event.Position.CommitPosition, e.Event.Created, e.Event.EventType, data, metadata); + else return new AckableEventRecord(e.OriginalStreamId, e.Event.EventId.ToString(), e.Event.EventNumber.ToUInt64(), e.Event.Position.CommitPosition, e.Event.Created, e.Event.EventType, data, metadata, replayed, () => this.OnAckEventAsync(subject!, subscription, e), reason => this.OnNackEventAsync(subject!, subscription, e, reason)); + } + + /// + /// Gets the last checkpointed position, if any, of the specified consumer group + /// + /// The consumer group to get the highest checkpointed position for + /// The id of the stream, if any, to get the consumer group's checkpointed position for + /// A + /// A new awaitable + protected virtual async Task GetConsumerCheckpointedPositionAsync(string consumerGroup, string? streamId = null, CancellationToken cancellationToken = default) + { + try + { + return await this.ReadAsync(this.GetConsumerCheckpointStreamId(consumerGroup, streamId), StreamReadDirection.Forwards, StreamPosition.StartOfStream, cancellationToken: cancellationToken) + .Select(e => e.Data) + .OfType() + .OrderByDescending(u => u) + .FirstOrDefaultAsync(cancellationToken) + .ConfigureAwait(false); + } + catch (StreamNotFoundException) { return null; } + } + + /// + /// Sets the last checkpointed position of the specified consumer group + /// + /// The consumer group to set the last checkpointed position for + /// The id of the stream, if any, to get the consumer group's checkpointed position for + /// The last checkpointed position + /// A + /// A new awaitable + protected virtual async Task SetConsumerCheckpointPositionAsync(string consumerGroup, string? streamId, IPosition position, CancellationToken cancellationToken = default) + { + var data = position switch + { + Position pos => pos.CommitPosition, + ESStreamPosition spos => (ulong)spos.ToInt64(), + _ => throw new NotSupportedException($"The position type '{position.GetType()}' is not supported in this context") + }; + await this.AppendAsync(this.GetConsumerCheckpointStreamId(consumerGroup, streamId), new EventDescriptor[] { new("$checkpoint", data) }, cancellationToken: cancellationToken).ConfigureAwait(false); + } + + /// + /// Gets the id of the stream used to store the checkpoints of the specified consumer group, and optionally stream + /// + /// The consumer group to get the checkpoint stream id for + /// The id of the stream, if any, to get the consumer group's checkpoint stream for + /// + protected virtual string GetConsumerCheckpointStreamId(string consumerGroup, string? streamId) => $"${consumerGroup}:{streamId ?? "$all"}_checkpoints"; + + /// + /// Handles the consumption of a on a + /// + /// The to stream s to + /// The the has been received by + /// The to handle + /// A + /// A new awaitable + protected virtual Task OnEventConsumedAsync(ISubject subject, StreamSubscription subscription, ResolvedEvent e, CancellationToken cancellationToken) => Task.Run(() => subject.OnNext(this.DeserializeEventRecord(e)), cancellationToken); + + /// + /// Handles the consumption of a on a + /// + /// The to stream s to + /// The id of the stream, if any, to consume s from + /// The the has been received by + /// The to handle + /// The retry count, if any + /// The highest position ever checkpointed by the consumer group + /// A + /// A new awaitable + protected virtual Task OnEventConsumedAsync(ISubject subject, string? streamId, PersistentSubscription subscription, ResolvedEvent e, int? retryCount, ulong? checkpointedPosition, CancellationToken cancellationToken) + { + try + { + if (string.IsNullOrWhiteSpace(this.Options.DatabaseName) || !e.OriginalStreamId.StartsWith(this.GetDatabaseStreamId()!)) if (e.OriginalStreamId.StartsWith('$') || e.Event.Metadata.Length < 1) return subscription.Ack(e); + return Task.Run(() => subject.OnNext(this.DeserializeEventRecord(e, subscription, subject, checkpointedPosition > (string.IsNullOrWhiteSpace(streamId) ? e.Event.Position.CommitPosition : e.Event.EventNumber.ToUInt64()))), cancellationToken); + } + catch (Exception ex) + { + subject.OnError(ex); + return Task.CompletedTask; + } + } + + /// + /// Acks the specified + /// + /// The to stream s to + /// The the to ack has been received by + /// The to ack + /// A new awaitable + protected virtual async Task OnAckEventAsync(ISubject subject, PersistentSubscription subscription, ResolvedEvent e) + { + try { await subscription.Ack(e.OriginalEvent.EventId).ConfigureAwait(false); } + catch (ObjectDisposedException ex) { subject.OnError(ex); } + } + + /// + /// Nacks the specified + /// + /// The to stream s to + /// The the to nack has been received by + /// The to nack + /// The reason why to nack the + /// A new awaitable + protected virtual async Task OnNackEventAsync(ISubject subject, PersistentSubscription subscription, ResolvedEvent e, string? reason) + { + try { await subscription.Nack(PersistentSubscriptionNakEventAction.Retry, reason ?? "Unknown", e.OriginalEvent.EventId).ConfigureAwait(false); } + catch (ObjectDisposedException ex) { subject.OnError(ex); } + } + + /// + /// Handles the specified being dropped + /// + /// The to stream s to + /// The the has been received by + /// The reason why to drop the + /// The that occurred, if any + protected virtual void OnSubscriptionDropped(ISubject subject, StreamSubscription subscription, SubscriptionDroppedReason reason, Exception? ex) + { + switch (reason) + { + case SubscriptionDroppedReason.Disposed: + subject.OnCompleted(); + break; + case SubscriptionDroppedReason.SubscriberError: + case SubscriptionDroppedReason.ServerError: + subject.OnError(ex ?? new Exception()); + break; + } + } + + /// + /// Handles the specified being dropped + /// + /// The to stream s to + /// The the has been received by + /// The reason why to drop the + /// The that occurred, if any + protected virtual void OnSubscriptionDropped(ISubject subject, PersistentSubscription subscription, SubscriptionDroppedReason reason, Exception? ex) + { + switch (reason) + { + case SubscriptionDroppedReason.Disposed: + subject.OnCompleted(); + break; + case SubscriptionDroppedReason.SubscriberError: + case SubscriptionDroppedReason.ServerError: + subject.OnError(ex ?? new Exception()); + break; + } + } + + /// + /// Exposes constants about event related metadata used by the + /// + protected static class EventRecordMetadata + { + + /// + /// Gets the name of the event record metadata used to store the event CLR type's assembly qualified name + /// + public const string ClrTypeName = "clr-type"; + + } + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbEventStoreFactory.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbEventStoreFactory.cs new file mode 100644 index 000000000..443227cd4 --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbEventStoreFactory.cs @@ -0,0 +1,53 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using EventStore.Client; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Neuroglia.Data.Infrastructure.EventSourcing.EventStore.Services; + +/// +/// Represents the service used to create instances +/// +/// +/// Initializes a new +/// +/// The current +public class EsdbEventStoreFactory(IServiceProvider serviceProvider) + : IFactory +{ + + /// + /// Gets the name of the EventStore connection string + /// + public const string ConnectionStringName = "eventstore"; + + /// + /// Gets the current + /// + protected IServiceProvider ServiceProvider { get; } = serviceProvider; + + /// + public virtual EsdbEventStore Create() + { + var configuration = this.ServiceProvider.GetRequiredService(); + var connectionString = configuration.GetConnectionString(ConnectionStringName); + if (string.IsNullOrWhiteSpace(connectionString)) throw new Exception($"An error occurred while attempting to create an ESEventStore instance. The '{ConnectionStringName}' connection string is not provided or is invalid. Please ensure that the connection string is properly configured in the application settings."); + var settings = EventStoreClientSettings.Create(connectionString); + return ActivatorUtilities.CreateInstance(this.ServiceProvider, new EventStoreClient(settings), new EventStorePersistentSubscriptionsClient(settings)); + } + + object IFactory.Create() => this.Create(); + +} diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbProjectionBuilder.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbProjectionBuilder.cs new file mode 100644 index 000000000..2c13286ff --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbProjectionBuilder.cs @@ -0,0 +1,129 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using EventStore.Client; +using Lambda2Js; +using System.Linq.Expressions; +using System.Text; + +namespace Neuroglia.Data.Infrastructure.EventSourcing.Services; + +/// +/// Represents the EventStore implementation of the interface +/// +/// The name of the projection to build +/// The underlying EventStore projection management API client +public class EsdbProjectionBuilder(string name, EventStoreProjectionManagementClient projections) + : IProjectionSourceBuilder, IProjectionBuilder +{ + + /// + /// Gets the name of the projection to build + /// + protected string Name { get; } = name; + + /// + /// Gets the name of the stream, if any, from which to process events + /// + protected string? StreamName { get; set; } + + /// + /// Gets the underlying EventStore projection management API client + /// + protected EventStoreProjectionManagementClient Projections { get; } = projections; + + /// + /// Gets the of the , if any, used to create the projection's initial state + /// + protected Expression>? GivenFactory { get; set; } + + /// + /// Gets the of the predicate , if any, used to filter incoming events + /// + protected Expression>? WhenPredicate { get; set; } + + /// + /// Gets the of an to perform on the projection when a matching event is processed + /// + protected List>>? ThenActions { get; set; } + + /// + public virtual IProjectionBuilder FromStream(string name) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + this.StreamName = name; + return this; + } + + /// + public virtual IProjectionBuilder Given(Expression> factory) + { + ArgumentNullException.ThrowIfNull(factory); + this.GivenFactory = factory; + return this; + } + + /// + public virtual IProjectionBuilder When(Expression> predicate) + { + ArgumentNullException.ThrowIfNull(predicate); + this.WhenPredicate = this.WhenPredicate == null + ? predicate + : Expression.Lambda>( + Expression.AndAlso( + this.WhenPredicate.Body, + predicate.Body), + this.WhenPredicate.Parameters); + return this; + } + + /// + public virtual IProjectionBuilder Then(Expression> action) + { + ArgumentNullException.ThrowIfNull(action); + this.ThenActions ??= []; + this.ThenActions.Add(action); + return this; + } + + /// + /// Builds the projection on EventStore + /// + /// A + /// A new awaitable + public virtual async Task BuildAsync(CancellationToken cancellationToken = default) + { + var query = this.BuildQuery(); + await this.Projections.CreateContinuousAsync(this.Name, query, true, cancellationToken: cancellationToken).ConfigureAwait(false); + } + + /// + /// Builds the EventStore JavaScript query expression of the projection to build + /// + /// The compiled EventStore JavaScript query expression of the projection to build + protected virtual string BuildQuery() + { + var compilationOptions = new JavascriptCompilationOptions(JsCompilationFlags.BodyOnly, EsdbJavaScriptConversion.Extensions); + var builder = new StringBuilder(); + builder.AppendLine($"fromStream('{this.StreamName}')"); + builder.AppendLine(@" .when({"); + if (this.GivenFactory != null) builder.AppendLine(@$" $init: () => {this.GivenFactory.CompileToJavascript(compilationOptions)},"); + builder.AppendLine(@" $any: (state, e) => {"); + if (this.WhenPredicate != null) builder.AppendLine(@$" if(!({this.WhenPredicate.CompileToJavascript(compilationOptions)})) return;"); + if (this.ThenActions?.Count > 0) this.ThenActions.ForEach(a => builder.AppendLine(@$" {a.CompileToJavascript(compilationOptions)};")); + builder.AppendLine(@" }"); + builder.AppendLine(@" });"); + return builder.ToString(); + } + +} \ No newline at end of file diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbProjectionManager.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbProjectionManager.cs new file mode 100644 index 000000000..ab540c670 --- /dev/null +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/EsdbProjectionManager.cs @@ -0,0 +1,49 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using EventStore.Client; +using Neuroglia.Serialization.Json; + +namespace Neuroglia.Data.Infrastructure.EventSourcing.Services; + +/// +/// Represents the EventStore implementation of the interface +/// +/// The underlying EventStore projection management API client +public class EsdbProjectionManager(EventStoreProjectionManagementClient projections) + : IProjectionManager +{ + + /// + /// Gets the underlying EventStore projection management API client + /// + protected EventStoreProjectionManagementClient Projections { get; } = projections; + + /// + public virtual async Task CreateAsync(string name, Action> setup, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + ArgumentNullException.ThrowIfNull(setup); + var builder = new EsdbProjectionBuilder(name, this.Projections); + setup(builder); + await builder.BuildAsync(cancellationToken).ConfigureAwait(false); + } + + /// + public virtual async Task GetStateAsync(string name, CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrWhiteSpace(name); + return await this.Projections.GetResultAsync(name, serializerOptions: JsonSerializer.DefaultOptions, cancellationToken: cancellationToken).ConfigureAwait(false); + } + +} diff --git a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/EsdbEventStoreTests.cs b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/EsdbEventStoreTests.cs new file mode 100644 index 000000000..0f57f6b29 --- /dev/null +++ b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/EsdbEventStoreTests.cs @@ -0,0 +1,44 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using DotNet.Testcontainers.Containers; +using EventStore.Client; +using Microsoft.Extensions.DependencyInjection; +using Neuroglia.Data.Infrastructure.EventSourcing; +using Neuroglia.Serialization; +using Neuroglia.UnitTests.Containers; + +namespace Neuroglia.UnitTests.Cases.Data.Infrastructure.EventSourcing.EventStores; + +[TestCaseOrderer("Neuroglia.UnitTests.Services.PriorityTestCaseOrderer", "Neuroglia.UnitTests")] +public class EsdbEventStoreTests + : EventStoreTestsBase +{ + + public EsdbEventStoreTests() : base(BuildServices()) { } + + public static IServiceCollection BuildServices() + { + var services = new ServiceCollection(); + services.AddLogging(); + services.AddSerialization(); + services.AddSingleton(EventStoreContainerBuilder.Build()); + services.AddHostedService(provider => new ContainerBootstrapper(provider.GetRequiredService())); + services.AddSingleton(provider => EventStoreClientSettings.Create($"esdb://{provider.GetRequiredService().Hostname}:{provider.GetRequiredService().GetMappedPublicPort(EventStoreContainerBuilder.PublicPort2)}?tls=false")); + services.AddSingleton(provider => new EventStoreClient(provider.GetRequiredService())); + services.AddSingleton(provider => new EventStorePersistentSubscriptionsClient(provider.GetRequiredService())); + services.AddEsdbEventStore(); + return services; + } + +} diff --git a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/EsdbProjectionManagerTests.cs b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/EsdbProjectionManagerTests.cs new file mode 100644 index 000000000..ec57609d6 --- /dev/null +++ b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStores/EsdbProjectionManagerTests.cs @@ -0,0 +1,126 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using DotNet.Testcontainers.Containers; +using EventStore.Client; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Neuroglia.Collections; +using Neuroglia.Data.Infrastructure.EventSourcing; +using Neuroglia.Data.Infrastructure.EventSourcing.Services; +using Neuroglia.Serialization; +using Neuroglia.UnitTests.Containers; + +namespace Neuroglia.UnitTests.Cases.Data.Infrastructure.EventSourcing.EventStores; + +[TestCaseOrderer("Neuroglia.UnitTests.Services.PriorityTestCaseOrderer", "Neuroglia.UnitTests")] +public class EsdbProjectionManagerTests + : IAsyncLifetime +{ + + public EsdbProjectionManagerTests() + { + var services = BuildServices(); + this.ServiceProvider = services.BuildServiceProvider(); + } + + protected CancellationTokenSource CancellationTokenSource { get; } = new(); + + protected ServiceProvider ServiceProvider { get; } + + protected IEventStore EventStore => this.ServiceProvider.GetRequiredService(); + + protected IProjectionManager ProjectionManager => this.ServiceProvider.GetRequiredService(); + + public virtual async Task InitializeAsync() + { + foreach (var hostedService in this.ServiceProvider.GetServices()) + { + await hostedService.StartAsync(this.CancellationTokenSource.Token).ConfigureAwait(false); + } + } + + public virtual async Task DisposeAsync() => await this.ServiceProvider.DisposeAsync().ConfigureAwait(false); + + public static IServiceCollection BuildServices() + { + var services = new ServiceCollection(); + services.AddLogging(); + services.AddSerialization(); + services.AddSingleton(EventStoreContainerBuilder.Build()); + services.AddHostedService(provider => new ContainerBootstrapper(provider.GetRequiredService())); + services.AddSingleton(provider => EventStoreClientSettings.Create($"esdb://{provider.GetRequiredService().Hostname}:{provider.GetRequiredService().GetMappedPublicPort(EventStoreContainerBuilder.PublicPort2)}?tls=false")); + services.AddSingleton(provider => new EventStoreClient(provider.GetRequiredService())); + services.AddSingleton(provider => new EventStorePersistentSubscriptionsClient(provider.GetRequiredService())); + services.AddSingleton(provider => new EventStoreProjectionManagementClient(provider.GetRequiredService())); + services.AddEsdbEventStore(); + services.AddEsdbProjectionManager(); + return services; + } + + [Fact, Priority(1)] + public async Task Create_Projection_FromStream_Should_Work() + { + //arrange + var name = "fake-name"; + var streamName = "fake-stream"; + var initialState = new List() { 1, 2, 3 }; + var index = 4; + Action>> setup = projection => projection + .FromStream(streamName) + .Given(() => new List() { 1, 2, 3 }) + .When((state, e) => e.Data != null && e.Data!.GetProperty("index") != null && !state.Contains(e.Data!.GetProperty("index"))) + .Then((state, e) => state.Add(e.Data!.GetProperty("index"))); + initialState.Add(index); + + //act + await this.ProjectionManager.CreateAsync(name, setup); + await this.EventStore.AppendAsync(streamName, [new EventDescriptor("fake-type", new { index = index })]); + await Task.Delay(250); + var state = await this.ProjectionManager.GetStateAsync>(name); + + //assert + state.Should().NotBeNull(); + state.Should().BeEquivalentTo(initialState); + } + + [Fact, Priority(2)] + public async Task Create_Projection_FromStream_LinkTo_Should_Work() + { + //arrange + var name = "fake-name"; + var streamName = "fake-stream"; + var linkToStreamName = "fake-link-to-stream"; + var initialState = new List() { 1, 2, 3 }; + var index = 4; + Action>> setup = projection => projection + .FromStream(streamName) + .Given(() => new List() { 1, 2, 3 }) + .When((state, e) => e.Data != null && e.Data!.GetProperty("index") != null && !state.Contains(e.Data!.GetProperty("index"))) + .Then((state, e) => Projection.LinkEventTo(linkToStreamName, e)); + initialState.Add(index); + + //act + await this.ProjectionManager.CreateAsync(name, setup); + await this.EventStore.AppendAsync(streamName, [new EventDescriptor("fake-type", new { index })]); + await Task.Delay(250); + + //assert + IEventStreamDescriptor result = null!; + var action = async () => result = await this.EventStore.GetAsync(linkToStreamName); + await action.Should().NotThrowAsync(); + result.Should().NotBeNull(); + result.Length.Should().Be(1); + } + +} \ No newline at end of file