Skip to content

Commit

Permalink
Merge pull request #86 from neuroglia-io/fix-event-sourcing
Browse files Browse the repository at this point in the history
Updated the IEventStore interface to return the next expected version of the stream after appending events
  • Loading branch information
cdavernas authored Apr 4, 2024
2 parents 10e1aaf + b2a696e commit b09e33d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public interface IEventStore
/// <param name="events">The events to append to the specified stream</param>
/// <param name="expectedVersion">The expected version of the stream to append the events to. Used for optimistic concurrency</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default);
/// <returns>The version of the stream that is next expected</returns>
Task<ulong> AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default);

/// <summary>
/// Gets information about the specified stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public virtual async Task<IEventStreamDescriptor> GetAsync(string streamId, Canc
}

/// <inheritdoc/>
public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
public virtual async Task<ulong> AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events));
Expand All @@ -130,8 +130,10 @@ public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescrip
metadata[EventRecordMetadata.ClrTypeName] = e.Data?.GetType().AssemblyQualifiedName!;
return new EventData(Uuid.NewUuid(), e.Type, this.Serializer.SerializeToByteArray(e.Data), this.Serializer.SerializeToByteArray(metadata));
});
if (expectedVersion.HasValue) await this.EventStoreClient.AppendToStreamAsync(streamId, StreamRevision.FromInt64(expectedVersion.Value), eventsData, cancellationToken: cancellationToken).ConfigureAwait(false);
else await this.EventStoreClient.AppendToStreamAsync(streamId, StreamState.Any, eventsData, cancellationToken: cancellationToken).ConfigureAwait(false);
var writeResult = expectedVersion.HasValue
? await this.EventStoreClient.AppendToStreamAsync(streamId, StreamRevision.FromInt64(expectedVersion.Value), eventsData, cancellationToken: cancellationToken).ConfigureAwait(false)
: await this.EventStoreClient.AppendToStreamAsync(streamId, StreamState.Any, eventsData, cancellationToken: cancellationToken).ConfigureAwait(false);
return writeResult.NextExpectedStreamRevision.ToUInt64();
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class MemoryEventStore(IMemoryCache cache)
protected Subject<IEventRecord> Subject { get; } = new();

/// <inheritdoc/>
public virtual Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
public virtual Task<ulong> AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events));
Expand All @@ -78,14 +78,13 @@ public virtual Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> e
{
var record = new EventRecord(streamId, Guid.NewGuid().ToString(), offset, (ulong)this.Stream.Count, DateTimeOffset.Now, e.Type, e.Data, e.Metadata);
stream.Add(record);
offset++;
this.Stream.AddOrUpdate((ulong)this.Stream.Count + 1, record, (key, value) => record);
this.Subject.OnNext(record);
offset++;
}

offset--;
this.Cache.Set(streamId, stream);

return Task.CompletedTask;
return Task.FromResult(offset);
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public RedisEventStore(IOptions<EventStoreOptions> options, IConnectionMultiplex
protected ISubscriber Subscriber { get; }

/// <inheritdoc/>
public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
public virtual async Task<ulong> AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events));
Expand All @@ -87,7 +87,7 @@ public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescrip

if (expectedVersion.HasValue)
{
if (expectedVersion.Value == Infrastructure.EventSourcing.StreamPosition.EndOfStream)
if (expectedVersion.Value == StreamPosition.EndOfStream)
{
if (actualVersion != null) throw new OptimisticConcurrencyException(expectedVersion, actualVersion);
}
Expand All @@ -107,7 +107,8 @@ public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescrip
await this.Database.PublishAsync(this.GetRedisChannel(), entryValue).ConfigureAwait(false);
offset++;
}

offset--;
return offset;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ public async Task Append_Should_Work()
var events = EventStreamFactory.Create().ToList();

//act
await EventStore.AppendAsync(streamId, events);
var offset = await EventStore.AppendAsync(streamId, events);
var storedEvents = await EventStore.ReadAsync(streamId, StreamReadDirection.Forwards, 0).ToListAsync();

//assert
offset.Should().Be((ulong)storedEvents.Count - 1);
storedEvents.Should().HaveSameCount(events);
for (var i = 0; i < storedEvents.Count; i++)
{
Expand Down

0 comments on commit b09e33d

Please sign in to comment.