Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated the IEventStore interface to return the next expected version of the stream after appending events #86

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public interface IEventStore
/// <param name="events">The events to append to the specified stream</param>
/// <param name="expectedVersion">The expected version of the stream to append the events to. Used for optimistic concurrency</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default);
/// <returns>The version of the stream that is next expected</returns>
Task<ulong> AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default);

/// <summary>
/// Gets information about the specified stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
}

/// <inheritdoc/>
public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
public virtual async Task<ulong> AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events));
Expand All @@ -130,8 +130,10 @@
metadata[EventRecordMetadata.ClrTypeName] = e.Data?.GetType().AssemblyQualifiedName!;
return new EventData(Uuid.NewUuid(), e.Type, this.Serializer.SerializeToByteArray(e.Data), this.Serializer.SerializeToByteArray(metadata));
});
if (expectedVersion.HasValue) await this.EventStoreClient.AppendToStreamAsync(streamId, StreamRevision.FromInt64(expectedVersion.Value), eventsData, cancellationToken: cancellationToken).ConfigureAwait(false);
else await this.EventStoreClient.AppendToStreamAsync(streamId, StreamState.Any, eventsData, cancellationToken: cancellationToken).ConfigureAwait(false);
var writeResult = expectedVersion.HasValue
? await this.EventStoreClient.AppendToStreamAsync(streamId, StreamRevision.FromInt64(expectedVersion.Value), eventsData, cancellationToken: cancellationToken).ConfigureAwait(false)
: await this.EventStoreClient.AppendToStreamAsync(streamId, StreamState.Any, eventsData, cancellationToken: cancellationToken).ConfigureAwait(false);
return writeResult.NextExpectedStreamRevision.ToUInt64();
}

/// <inheritdoc/>
Expand Down Expand Up @@ -248,7 +250,7 @@
var position = offset == StreamPosition.EndOfStream ? FromStream.End : FromStream.After(ESStreamPosition.FromInt64(offset));
var records = new List<IEventRecord>();
if (position != FromStream.End) records = await this.ReadAsync(streamId, StreamReadDirection.Forwards, offset, cancellationToken: cancellationToken).ToListAsync(cancellationToken).ConfigureAwait(false);
var subscription = await this.EventStoreClient.SubscribeToStreamAsync(qualifiedStreamId, FromStream.End, (sub, e, token) => this.OnEventConsumedAsync(subject, sub, e, token), true, (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false);

Check warning on line 253 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'EventStoreClient.SubscribeToStreamAsync(string, FromStream, Func<StreamSubscription, ResolvedEvent, CancellationToken, Task>, bool, Action<StreamSubscription, SubscriptionDroppedReason, Exception?>?, UserCredentials?, CancellationToken)' is obsolete: 'SubscribeToStreamAsync is no longer supported. Use SubscribeToStream instead.'
return Observable.StartWith(Observable.Using(() => subscription, watch => subject), records);
}
else
Expand All @@ -258,7 +260,7 @@
try { await this.EventStorePersistentSubscriptionsClient.CreateToStreamAsync(streamId, consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); }
catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { }
var checkpointedPosition = await this.GetConsumerCheckpointedPositionAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false);
var persistentSubscription = await this.EventStorePersistentSubscriptionsClient.SubscribeToStreamAsync(qualifiedStreamId, consumerGroup, (sub, e, retry, token) => this.OnEventConsumedAsync(subject, streamId, sub, e, retry, checkpointedPosition, token), (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false);

Check warning on line 263 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'EventStorePersistentSubscriptionsClient.SubscribeToStreamAsync(string, string, Func<PersistentSubscription, ResolvedEvent, int?, CancellationToken, Task>, Action<PersistentSubscription, SubscriptionDroppedReason, Exception?>?, UserCredentials?, int, CancellationToken)' is obsolete: 'SubscribeToStreamAsync is no longer supported. Use SubscribeToStream with manual acks instead.'
return Observable.Using(() => persistentSubscription, watch => subject);
}
}
Expand Down Expand Up @@ -375,7 +377,7 @@
/// <param name="subject">The <see cref="ISubject{T}"/> to stream <see cref="IEventRecord"/>s to</param>
/// <param name="replayed">A boolean indicating whether or not the <see cref="ResolvedEvent"/> is being replayed to its consumer. Ignore if 'subscription' is null</param>
/// <returns>The deserialized <see cref="IEventRecord"/></returns>
protected virtual IEventRecord DeserializeEventRecord(ResolvedEvent e, PersistentSubscription? subscription = null, ISubject<IEventRecord>? subject = null, bool? replayed = null)

Check warning on line 380 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'PersistentSubscription' is obsolete

Check warning on line 380 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'PersistentSubscription' is obsolete
{
var metadata = this.Serializer.Deserialize<IDictionary<string, object>>(e.Event.Metadata.ToArray());
var clrTypeName = metadata![EventRecordMetadata.ClrTypeName].ToString()!;
Expand Down Expand Up @@ -443,7 +445,7 @@
/// <param name="e">The <see cref="ResolvedEvent"/> to handle</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual Task OnEventConsumedAsync(ISubject<IEventRecord> subject, StreamSubscription subscription, ResolvedEvent e, CancellationToken cancellationToken) => Task.Run(() => subject.OnNext(this.DeserializeEventRecord(e)), cancellationToken);

Check warning on line 448 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'StreamSubscription' is obsolete

/// <summary>
/// Handles the consumption of a <see cref="ResolvedEvent"/> on a <see cref="PersistentSubscription"/>
Expand All @@ -456,7 +458,7 @@
/// <param name="checkpointedPosition">The highest position ever checkpointed by the consumer group</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual Task OnEventConsumedAsync(ISubject<IEventRecord> subject, string? streamId, PersistentSubscription subscription, ResolvedEvent e, int? retryCount, ulong? checkpointedPosition, CancellationToken cancellationToken)

Check warning on line 461 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'PersistentSubscription' is obsolete
{
try
{
Expand All @@ -477,7 +479,7 @@
/// <param name="subscription">The <see cref="PersistentSubscription"/> the <see cref="ResolvedEvent"/> to ack has been received by</param>
/// <param name="e">The <see cref="ResolvedEvent"/> to ack</param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task OnAckEventAsync(ISubject<IEventRecord> subject, PersistentSubscription subscription, ResolvedEvent e)

Check warning on line 482 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'PersistentSubscription' is obsolete
{
try { await subscription.Ack(e.OriginalEvent.EventId).ConfigureAwait(false); }
catch (ObjectDisposedException ex) { subject.OnError(ex); }
Expand All @@ -491,7 +493,7 @@
/// <param name="e">The <see cref="ResolvedEvent"/> to nack</param>
/// <param name="reason">The reason why to nack the <see cref="ResolvedEvent"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task OnNackEventAsync(ISubject<IEventRecord> subject, PersistentSubscription subscription, ResolvedEvent e, string? reason)

Check warning on line 496 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'PersistentSubscription' is obsolete
{
try { await subscription.Nack(PersistentSubscriptionNakEventAction.Retry, reason ?? "Unknown", e.OriginalEvent.EventId).ConfigureAwait(false); }
catch (ObjectDisposedException ex) { subject.OnError(ex); }
Expand All @@ -504,7 +506,7 @@
/// <param name="subscription">The <see cref="StreamSubscription"/> the <see cref="ResolvedEvent"/> has been received by</param>
/// <param name="reason">The reason why to drop the <see cref="StreamSubscription"/></param>
/// <param name="ex">The <see cref="Exception"/> that occurred, if any</param>
protected virtual void OnSubscriptionDropped(ISubject<IEventRecord> subject, StreamSubscription subscription, SubscriptionDroppedReason reason, Exception? ex)

Check warning on line 509 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'StreamSubscription' is obsolete
{
switch (reason)
{
Expand All @@ -525,7 +527,7 @@
/// <param name="subscription">The <see cref="PersistentSubscription"/> the <see cref="ResolvedEvent"/> has been received by</param>
/// <param name="reason">The reason why to drop the <see cref="PersistentSubscription"/></param>
/// <param name="ex">The <see cref="Exception"/> that occurred, if any</param>
protected virtual void OnSubscriptionDropped(ISubject<IEventRecord> subject, PersistentSubscription subscription, SubscriptionDroppedReason reason, Exception? ex)

Check warning on line 530 in src/Neuroglia.Data.Infrastructure.EventSourcing.EventStore/Services/ESEventStore.cs

View workflow job for this annotation

GitHub Actions / build (8.0.x)

'PersistentSubscription' is obsolete
{
switch (reason)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class MemoryEventStore(IMemoryCache cache)
protected Subject<IEventRecord> Subject { get; } = new();

/// <inheritdoc/>
public virtual Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
public virtual Task<ulong> AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events));
Expand All @@ -78,14 +78,13 @@ public virtual Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> e
{
var record = new EventRecord(streamId, Guid.NewGuid().ToString(), offset, (ulong)this.Stream.Count, DateTimeOffset.Now, e.Type, e.Data, e.Metadata);
stream.Add(record);
offset++;
this.Stream.AddOrUpdate((ulong)this.Stream.Count + 1, record, (key, value) => record);
this.Subject.OnNext(record);
offset++;
}

offset--;
this.Cache.Set(streamId, stream);

return Task.CompletedTask;
return Task.FromResult(offset);
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public RedisEventStore(IOptions<EventStoreOptions> options, IConnectionMultiplex
protected ISubscriber Subscriber { get; }

/// <inheritdoc/>
public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
public virtual async Task<ulong> AppendAsync(string streamId, IEnumerable<IEventDescriptor> events, long? expectedVersion = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events));
Expand All @@ -87,7 +87,7 @@ public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescrip

if (expectedVersion.HasValue)
{
if (expectedVersion.Value == Infrastructure.EventSourcing.StreamPosition.EndOfStream)
if (expectedVersion.Value == StreamPosition.EndOfStream)
{
if (actualVersion != null) throw new OptimisticConcurrencyException(expectedVersion, actualVersion);
}
Expand All @@ -107,7 +107,8 @@ public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescrip
await this.Database.PublishAsync(this.GetRedisChannel(), entryValue).ConfigureAwait(false);
offset++;
}

offset--;
return offset;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ public async Task Append_Should_Work()
var events = EventStreamFactory.Create().ToList();

//act
await EventStore.AppendAsync(streamId, events);
var offset = await EventStore.AppendAsync(streamId, events);
var storedEvents = await EventStore.ReadAsync(streamId, StreamReadDirection.Forwards, 0).ToListAsync();

//assert
offset.Should().Be((ulong)storedEvents.Count - 1);
storedEvents.Should().HaveSameCount(events);
for (var i = 0; i < storedEvents.Count; i++)
{
Expand Down
Loading