diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IEventStore.cs index 835f6ddc6..6d2a0aa55 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IEventStore.cs +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Abstractions/Services/Interfaces/IEventStore.cs @@ -26,8 +26,8 @@ public interface IEventStore /// The events to append to the specified stream /// The expected version of the stream to append the events to. Used for optimistic concurrency /// A - /// A new awaitable - Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default); + /// The version of the stream that is next expected + Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default); /// /// Gets information about the specified stream diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs index 762b8d039..8bdd32723 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs @@ -112,7 +112,7 @@ public virtual async Task GetAsync(string streamId, Canc } /// - public virtual async Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default) + public virtual async Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events)); @@ -130,8 +130,10 @@ public virtual async Task AppendAsync(string streamId, IEnumerable diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Memory/Services/MemoryEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Memory/Services/MemoryEventStore.cs index 816894000..40950c1c7 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.Memory/Services/MemoryEventStore.cs +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Memory/Services/MemoryEventStore.cs @@ -54,7 +54,7 @@ public class MemoryEventStore(IMemoryCache cache) protected Subject Subject { get; } = new(); /// - public virtual Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default) + public virtual Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events)); @@ -78,14 +78,13 @@ public virtual Task AppendAsync(string streamId, IEnumerable e { var record = new EventRecord(streamId, Guid.NewGuid().ToString(), offset, (ulong)this.Stream.Count, DateTimeOffset.Now, e.Type, e.Data, e.Metadata); stream.Add(record); - offset++; this.Stream.AddOrUpdate((ulong)this.Stream.Count + 1, record, (key, value) => record); this.Subject.OnNext(record); + offset++; } - + offset--; this.Cache.Set(streamId, stream); - - return Task.CompletedTask; + return Task.FromResult(offset); } /// diff --git a/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStore.cs b/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStore.cs index 44e783808..046fcc73e 100644 --- a/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStore.cs +++ b/src/Neuroglia.Data.Infrastructure.EventSourcing.Redis/Services/RedisEventStore.cs @@ -77,7 +77,7 @@ public RedisEventStore(IOptions options, IConnectionMultiplex protected ISubscriber Subscriber { get; } /// - public virtual async Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default) + public virtual async Task AppendAsync(string streamId, IEnumerable events, long? expectedVersion = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId)); if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events)); @@ -87,7 +87,7 @@ public virtual async Task AppendAsync(string streamId, IEnumerable diff --git a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStoreTestsBase.cs b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStoreTestsBase.cs index 5c71c0628..6d876a08f 100644 --- a/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStoreTestsBase.cs +++ b/test/Neuroglia.UnitTests/Cases/Data/Infrastructure/EventSourcing/EventStoreTestsBase.cs @@ -49,10 +49,11 @@ public async Task Append_Should_Work() var events = EventStreamFactory.Create().ToList(); //act - await EventStore.AppendAsync(streamId, events); + var offset = await EventStore.AppendAsync(streamId, events); var storedEvents = await EventStore.ReadAsync(streamId, StreamReadDirection.Forwards, 0).ToListAsync(); //assert + offset.Should().Be((ulong)storedEvents.Count - 1); storedEvents.Should().HaveSameCount(events); for (var i = 0; i < storedEvents.Count; i++) {