diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs index 9c913a4cb..fe587d554 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs @@ -145,7 +145,7 @@ public virtual IAsyncEnumerable ReadAsync(string? streamId, Stream if (string.IsNullOrWhiteSpace(streamId)) { if(string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ReadFromAllAsync(readDirection, offset, length, cancellationToken); - else return this.ReadFromStreamAsync($"$ce-{this.Options.DatabaseName}", readDirection, offset, length, cancellationToken); + else return this.ReadFromStreamAsync(this.GetDatabaseStreamId()!, readDirection, offset, length, cancellationToken); } else return this.ReadFromStreamAsync(this.GetQualifiedStreamId(streamId), readDirection, offset, length, cancellationToken); } @@ -227,7 +227,7 @@ public virtual Task> ObserveAsync(string? streamId, lo if (string.IsNullOrWhiteSpace(streamId)) { if (string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ObserveAllAsync(offset, consumerGroup, cancellationToken); - else return this.ObserveStreamAsync($"$ce-{this.Options.DatabaseName}", offset, consumerGroup, cancellationToken); + else return this.ObserveStreamAsync(this.GetDatabaseStreamId()!, offset, consumerGroup, cancellationToken); } else return this.ObserveStreamAsync(streamId, offset, consumerGroup, cancellationToken); } @@ -314,9 +314,10 @@ public virtual async Task SetOffsetAsync(string consumerGroup, long offset, stri if (string.IsNullOrWhiteSpace(consumerGroup)) throw new ArgumentNullException(nameof(consumerGroup)); ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream); - IPosition position = string.IsNullOrWhiteSpace(streamId) ? offset == StreamPosition.EndOfStream ? Position.End : Position.Start : offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset); + IPosition position = string.IsNullOrWhiteSpace(streamId) ? offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.Start : offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset); var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1); PersistentSubscriptionInfo subscription; + streamId = string.IsNullOrWhiteSpace(streamId) ? string.IsNullOrWhiteSpace(this.Options.DatabaseName) ? null : this.GetDatabaseStreamId()! : this.GetQualifiedStreamId(streamId); if (string.IsNullOrWhiteSpace(streamId)) { try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToAllAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); } @@ -329,8 +330,7 @@ public virtual async Task SetOffsetAsync(string consumerGroup, long offset, stri } else { - streamId = this.GetQualifiedStreamId(streamId); - try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToStreamAsync(consumerGroup, streamId, cancellationToken: cancellationToken).ConfigureAwait(false); } + try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToStreamAsync(streamId, consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); } catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound) { throw new StreamNotFoundException(); } if (subscription.Stats.LastCheckpointedEventPosition != null) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, subscription.Stats.LastCheckpointedEventPosition, cancellationToken).ConfigureAwait(false); @@ -366,6 +366,12 @@ public virtual async Task DeleteAsync(string streamId, CancellationToken cancell /// The qualified id of the specified stream id protected virtual string GetQualifiedStreamId(string streamId) => string.IsNullOrWhiteSpace(this.Options.DatabaseName) || streamId.StartsWith($"$ce-") ? streamId : $"{this.Options.DatabaseName}-{streamId}"; + /// + /// Gets the id, if any, of the stream that contains references to all events in the database + /// + /// The id, if any, of the stream that contains references to all events in the database + protected virtual string? GetDatabaseStreamId() => string.IsNullOrWhiteSpace(this.Options.DatabaseName) ? null : $"$ce-{this.Options.DatabaseName}"; + /// /// Deserializes the specified into a new /// @@ -415,7 +421,16 @@ protected virtual IEventRecord DeserializeEventRecord(ResolvedEvent e, Persisten /// The last checkpointed position /// A /// A new awaitable - protected virtual Task SetConsumerCheckpointPositionAsync(string consumerGroup, string? streamId, IPosition position, CancellationToken cancellationToken = default) => this.AppendAsync(this.GetConsumerCheckpointStreamId(consumerGroup, streamId), new EventDescriptor[] { new("$checkpoint", ((Position)position).CommitPosition) }, cancellationToken: cancellationToken); + protected virtual async Task SetConsumerCheckpointPositionAsync(string consumerGroup, string? streamId, IPosition position, CancellationToken cancellationToken = default) + { + var data = position switch + { + Position pos => pos.CommitPosition, + ESStreamPosition spos => (ulong)spos.ToInt64(), + _ => throw new NotSupportedException($"The position type '{position.GetType()}' is not supported in this context") + }; + await this.AppendAsync(this.GetConsumerCheckpointStreamId(consumerGroup, streamId), new EventDescriptor[] { new("$checkpoint", data) }, cancellationToken: cancellationToken).ConfigureAwait(false); + } /// /// Gets the id of the stream used to store the checkpoints of the specified consumer group, and optionally stream @@ -450,7 +465,7 @@ protected virtual Task OnEventConsumedAsync(ISubject subject, stri { try { - if (string.IsNullOrWhiteSpace(this.Options.DatabaseName) || !e.OriginalStreamId.StartsWith($"$ce-{this.Options.DatabaseName}")) if (e.OriginalStreamId.StartsWith("$") || e.Event.Metadata.Length < 1) return subscription.Ack(e); + if (string.IsNullOrWhiteSpace(this.Options.DatabaseName) || !e.OriginalStreamId.StartsWith(this.GetDatabaseStreamId()!)) if (e.OriginalStreamId.StartsWith("$") || e.Event.Metadata.Length < 1) return subscription.Ack(e); return Task.Run(() => subject.OnNext(this.DeserializeEventRecord(e, subscription, subject, checkpointedPosition > (string.IsNullOrWhiteSpace(streamId) ? e.Event.Position.CommitPosition : e.Event.EventNumber.ToUInt64()))), cancellationToken); } catch (Exception ex) diff --git a/src/Neuroglia.Eventing.CloudEvents.AspNetCore/Extensions/IApplicationBuilderExtensions.cs b/src/Neuroglia.Eventing.CloudEvents.AspNetCore/Extensions/IApplicationBuilderExtensions.cs new file mode 100644 index 000000000..d3f4887b7 --- /dev/null +++ b/src/Neuroglia.Eventing.CloudEvents.AspNetCore/Extensions/IApplicationBuilderExtensions.cs @@ -0,0 +1,31 @@ +// Copyright © 2021-Present Neuroglia SRL. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"), +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Microsoft.AspNetCore.Builder; + +namespace Neuroglia.Eventing.CloudEvents.AspNetCore; + +/// +/// Defines extensions for s +/// +public static class CloudEventIApplicationBuilderExtensions +{ + + /// + /// Configures the to use the + /// + /// The to configure + /// The configured + public static IApplicationBuilder UseCloudEvents(this IApplicationBuilder app) => app.UseMiddleware(); + +}