Skip to content

Commit

Permalink
Merge pull request #60 from neuroglia-io/fix-repositories
Browse files Browse the repository at this point in the history
Fixed all Repository implementations to ensure not throwing when attempting to retrieve a non-existing entity
  • Loading branch information
cdavernas authored Nov 22, 2023
2 parents ac44b2f + 51e4198 commit 23eaba8
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Neuroglia Framework.sln
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Neuroglia.Data.Guards", "sr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Neuroglia.UnitTests", "test\Neuroglia.UnitTests\Neuroglia.UnitTests.csproj", "{63922ABE-916D-4A4D-A3E1-8162BEED060E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Neuroglia.Mediation.Abstractions", "src\Neuroglia.Mediation.Abstractions\Neuroglia.Mediation.Abstractions.csproj", "{23AEEAC4-03C7-4E6F-AB74-26588908AAB9}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Neuroglia.Mediation.Abstractions", "src\Neuroglia.Mediation.Abstractions\Neuroglia.Mediation.Abstractions.csproj", "{23AEEAC4-03C7-4E6F-AB74-26588908AAB9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ namespace Neuroglia.Data.Infrastructure.EventSourcing.Configuration;
public class EventStoreOptions
{

/// <summary>
/// Gets/sets the name of the database to use, if any
/// </summary>
public string? DatabaseName { get; set; }

/// <summary>
/// Gets/sets the type of <see cref="ISerializer"/> to use to serialize and deserialize events. If not set, will use the first registered <see cref="ISerializer"/>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ public virtual IEventStoreOptionsBuilder UseSerializer<TSerializer>()
return this;
}

/// <inheritdoc/>
public virtual IEventStoreOptionsBuilder UseDatabase(string databaseName)
{
this.Options.DatabaseName = databaseName;
return this;
}

/// <inheritdoc/>
public virtual EventStoreOptions Build() => this.Options;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ public interface IEventStoreOptionsBuilder
IEventStoreOptionsBuilder UseSerializer<TSerializer>()
where TSerializer : class, ISerializer;

/// <summary>
/// Uses the specified database
/// </summary>
/// <param name="databaseName">The name of the database to use</param>
/// <returns>The configured <see cref="IEventStoreOptionsBuilder"/></returns>
IEventStoreOptionsBuilder UseDatabase(string databaseName);

/// <summary>
/// Builds the <see cref="EventStoreOptions"/>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,24 @@ public virtual async Task<bool> StreamExistsAsync(string streamId, CancellationT
public virtual async Task<IEventStreamDescriptor> GetAsync(string streamId, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
var qualifiedStreamId = this.GetQualifiedStreamId(streamId);

var streamMetadataResult = await this.EventStoreClient.GetStreamMetadataAsync(streamId, cancellationToken: cancellationToken).ConfigureAwait(false);
var streamMetadataResult = await this.EventStoreClient.GetStreamMetadataAsync(qualifiedStreamId, cancellationToken: cancellationToken).ConfigureAwait(false);
if (streamMetadataResult.StreamDeleted) throw new StreamNotFoundException(streamId);
var offset = streamMetadataResult.Metadata.TruncateBefore ?? StreamPosition.StartOfStream;

var readResult = this.EventStoreClient.ReadStreamAsync(Direction.Forwards, streamId, offset, 1, cancellationToken: cancellationToken);
var readResult = this.EventStoreClient.ReadStreamAsync(Direction.Forwards, qualifiedStreamId, offset, 1, cancellationToken: cancellationToken);
ReadState? readState;

try { readState = await readResult.ReadState.ConfigureAwait(false); }
catch { throw new StreamNotFoundException(streamId); }
if (readState == ReadState.StreamNotFound) throw new StreamNotFoundException(streamId);

if (readState == ReadState.StreamNotFound)
{
if (streamId.StartsWith("$ce-")) return new EventStreamDescriptor(streamId, 0, null, null);
else throw new StreamNotFoundException(streamId);
}
var firstEvent = await readResult.FirstAsync(cancellationToken).ConfigureAwait(false);
readResult = this.EventStoreClient.ReadStreamAsync(Direction.Backwards, streamId, ESStreamPosition.End, 1, cancellationToken: cancellationToken);
readResult = this.EventStoreClient.ReadStreamAsync(Direction.Backwards, qualifiedStreamId, ESStreamPosition.End, 1, cancellationToken: cancellationToken);
var lastEvent = await readResult.FirstAsync(cancellationToken).ConfigureAwait(false);

return new EventStreamDescriptor(streamId, lastEvent.Event.EventNumber.ToInt64() + 1 - offset.ToInt64(), firstEvent.Event.Created, lastEvent.Event.Created);
Expand All @@ -118,6 +122,7 @@ public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescrip
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (events == null || !events.Any()) throw new ArgumentNullException(nameof(events));
if (expectedVersion < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(expectedVersion));
streamId = this.GetQualifiedStreamId(streamId);

var readResult = this.EventStoreClient.ReadStreamAsync(Direction.Backwards, streamId, ESStreamPosition.End, 1, cancellationToken: cancellationToken);
var shouldThrowIfNotExists = expectedVersion.HasValue && expectedVersion != StreamPosition.StartOfStream && expectedVersion != StreamPosition.EndOfStream;
Expand All @@ -137,8 +142,12 @@ public virtual async Task AppendAsync(string streamId, IEnumerable<IEventDescrip
/// <inheritdoc/>
public virtual IAsyncEnumerable<IEventRecord> ReadAsync(string? streamId, StreamReadDirection readDirection, long offset, ulong? length = null, CancellationToken cancellationToken = default)
{
if(string.IsNullOrWhiteSpace(streamId)) return this.ReadFromAllAsync(readDirection, offset, length, cancellationToken);
else return this.ReadFromStreamAsync(streamId, readDirection, offset, length, cancellationToken);
if (string.IsNullOrWhiteSpace(streamId))
{
if(string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ReadFromAllAsync(readDirection, offset, length, cancellationToken);
else return this.ReadFromStreamAsync($"$ce-{this.Options.DatabaseName}", readDirection, offset, length, cancellationToken);
}
else return this.ReadFromStreamAsync(this.GetQualifiedStreamId(streamId), readDirection, offset, length, cancellationToken);
}

/// <summary>
Expand All @@ -153,7 +162,8 @@ public virtual IAsyncEnumerable<IEventRecord> ReadAsync(string? streamId, Stream
protected virtual async IAsyncEnumerable<IEventRecord> ReadFromStreamAsync(string streamId, StreamReadDirection readDirection, long offset, ulong? length = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset));

ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream);

var direction = readDirection switch
{
Expand All @@ -169,7 +179,7 @@ protected virtual async IAsyncEnumerable<IEventRecord> ReadFromStreamAsync(strin
if (readDirection == StreamReadDirection.Forwards && offset == StreamPosition.EndOfStream) yield break;
else if (readDirection == StreamReadDirection.Backwards && offset == StreamPosition.StartOfStream) yield break;

var readResult = this.EventStoreClient.ReadStreamAsync(direction, streamId, ESStreamPosition.FromInt64(offset), length.HasValue ? (long)length.Value : long.MaxValue, cancellationToken: cancellationToken);
var readResult = this.EventStoreClient.ReadStreamAsync(direction, streamId, ESStreamPosition.FromInt64(offset), length.HasValue ? (long)length.Value : long.MaxValue, true, cancellationToken: cancellationToken);
try { if (await readResult.ReadState.ConfigureAwait(false) == ReadState.StreamNotFound) throw new StreamNotFoundException(streamId); }
catch (StreamDeletedException) { throw new StreamNotFoundException(streamId); }

Expand Down Expand Up @@ -204,7 +214,7 @@ protected virtual async IAsyncEnumerable<IEventRecord> ReadFromAllAsync(StreamRe
};
var events = this.EventStoreClient.ReadAllAsync(direction, position, length.HasValue ? (long)length.Value : long.MaxValue, cancellationToken: cancellationToken);
var streamOffset = 0;
await foreach (var e in events.Where(e => !e.Event.EventType.StartsWith("$")))
await foreach (var e in events.Where(e => !e.Event.EventType.StartsWith('$')))
{
if (readDirection == StreamReadDirection.Forwards ? streamOffset >= offset : streamOffset < (offset == StreamPosition.EndOfStream ? int.MaxValue : offset + 1)) yield return this.DeserializeEventRecord(e);
streamOffset++;
Expand All @@ -214,7 +224,11 @@ protected virtual async IAsyncEnumerable<IEventRecord> ReadFromAllAsync(StreamRe
/// <inheritdoc/>
public virtual Task<IObservable<IEventRecord>> ObserveAsync(string? streamId, long offset = StreamPosition.EndOfStream, string? consumerGroup = null, CancellationToken cancellationToken = default)
{
if(string.IsNullOrWhiteSpace(streamId)) return this.ObserveAllAsync(offset, consumerGroup, cancellationToken);
if (string.IsNullOrWhiteSpace(streamId))
{
if (string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ObserveAllAsync(offset, consumerGroup, cancellationToken);
else return this.ObserveStreamAsync($"$ce-{this.Options.DatabaseName}", offset, consumerGroup, cancellationToken);
}
else return this.ObserveStreamAsync(streamId, offset, consumerGroup, cancellationToken);
}

Expand All @@ -231,14 +245,15 @@ protected virtual async Task<IObservable<IEventRecord>> ObserveStreamAsync(strin
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset));
if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId);
var qualifiedStreamId = this.GetQualifiedStreamId(streamId);

var subject = new Subject<IEventRecord>();
if (string.IsNullOrWhiteSpace(consumerGroup))
{
var position = offset == StreamPosition.EndOfStream ? FromStream.End : FromStream.After(ESStreamPosition.FromInt64(offset));
var records = new List<IEventRecord>();
if (position != FromStream.End) records = await this.ReadAsync(streamId, StreamReadDirection.Forwards, offset, cancellationToken: cancellationToken).ToListAsync(cancellationToken).ConfigureAwait(false);
var subscription = await this.EventStoreClient.SubscribeToStreamAsync(streamId, FromStream.End, (sub, e, token) => this.OnEventConsumedAsync(subject, sub, e, token), true, (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false);
var subscription = await this.EventStoreClient.SubscribeToStreamAsync(qualifiedStreamId, FromStream.End, (sub, e, token) => this.OnEventConsumedAsync(subject, sub, e, token), true, (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false);
return Observable.StartWith(Observable.Using(() => subscription, watch => subject), records);
}
else
Expand All @@ -248,7 +263,7 @@ protected virtual async Task<IObservable<IEventRecord>> ObserveStreamAsync(strin
try { await this.EventStorePersistentSubscriptionsClient.CreateToStreamAsync(streamId, consumerGroup, settings, cancellationToken: cancellationToken).ConfigureAwait(false); }
catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists) { }
var checkpointedPosition = await this.GetConsumerCheckpointedPositionAsync(consumerGroup, streamId, cancellationToken).ConfigureAwait(false);
var persistentSubscription = await this.EventStorePersistentSubscriptionsClient.SubscribeToStreamAsync(streamId, consumerGroup, (sub, e, retry, token) => this.OnEventConsumedAsync(subject, streamId, sub, e, retry, checkpointedPosition, token), (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false);
var persistentSubscription = await this.EventStorePersistentSubscriptionsClient.SubscribeToStreamAsync(qualifiedStreamId, consumerGroup, (sub, e, retry, token) => this.OnEventConsumedAsync(subject, streamId, sub, e, retry, checkpointedPosition, token), (sub, reason, ex) => this.OnSubscriptionDropped(subject, sub, reason, ex), cancellationToken: cancellationToken).ConfigureAwait(false);
return Observable.Using(() => persistentSubscription, watch => subject);
}
}
Expand Down Expand Up @@ -297,7 +312,7 @@ protected virtual async Task<IObservable<IEventRecord>> ObserveAllAsync(long off
public virtual async Task SetOffsetAsync(string consumerGroup, long offset, string? streamId = null, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(consumerGroup)) throw new ArgumentNullException(nameof(consumerGroup));
if (offset < StreamPosition.EndOfStream) throw new ArgumentOutOfRangeException(nameof(offset));
ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream);

IPosition position = string.IsNullOrWhiteSpace(streamId) ? offset == StreamPosition.EndOfStream ? Position.End : Position.Start : offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset);
var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1);
Expand All @@ -314,6 +329,7 @@ public virtual async Task SetOffsetAsync(string consumerGroup, long offset, stri
}
else
{
streamId = this.GetQualifiedStreamId(streamId);
try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToStreamAsync(consumerGroup, streamId, cancellationToken: cancellationToken).ConfigureAwait(false); }
catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound) { throw new StreamNotFoundException(); }
if (subscription.Stats.LastCheckpointedEventPosition != null) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, subscription.Stats.LastCheckpointedEventPosition, cancellationToken).ConfigureAwait(false);
Expand All @@ -331,7 +347,7 @@ public virtual async Task TruncateAsync(string streamId, ulong? beforeVersion =
if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId);

var truncateBefore = beforeVersion.HasValue ? ESStreamPosition.FromInt64((long)beforeVersion.Value) : ESStreamPosition.End;
await this.EventStoreClient.SetStreamMetadataAsync(streamId, StreamState.Any, new StreamMetadata(truncateBefore: truncateBefore), cancellationToken: cancellationToken).ConfigureAwait(false);
await this.EventStoreClient.SetStreamMetadataAsync(this.GetQualifiedStreamId(streamId), StreamState.Any, new StreamMetadata(truncateBefore: truncateBefore), cancellationToken: cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand All @@ -340,9 +356,16 @@ public virtual async Task DeleteAsync(string streamId, CancellationToken cancell
if (string.IsNullOrWhiteSpace(streamId)) throw new ArgumentNullException(nameof(streamId));
if (!await this.StreamExistsAsync(streamId, cancellationToken).ConfigureAwait(false)) throw new StreamNotFoundException(streamId);

await this.EventStoreClient.DeleteAsync(streamId, StreamState.Any, cancellationToken: cancellationToken).ConfigureAwait(false);
await this.EventStoreClient.DeleteAsync(this.GetQualifiedStreamId(streamId), StreamState.Any, cancellationToken: cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Converts the specified stream id to a qualified stream id, which is prefixed with the current database name, if any
/// </summary>
/// <param name="streamId">The stream id to convert</param>
/// <returns>The qualified id of the specified stream id</returns>
protected virtual string GetQualifiedStreamId(string streamId) => string.IsNullOrWhiteSpace(this.Options.DatabaseName) || streamId.StartsWith($"$ce-") ? streamId : $"{this.Options.DatabaseName}-{streamId}";

/// <summary>
/// Deserializes the specified <see cref="ResolvedEvent"/> into a new <see cref="IEventRecord"/>
/// </summary>
Expand Down Expand Up @@ -427,10 +450,10 @@ protected virtual Task OnEventConsumedAsync(ISubject<IEventRecord> subject, stri
{
try
{
if (e.OriginalStreamId.StartsWith("$") || e.Event.Metadata.Length < 1) return subscription.Ack(e);
if (string.IsNullOrWhiteSpace(this.Options.DatabaseName) || !e.OriginalStreamId.StartsWith($"$ce-{this.Options.DatabaseName}")) if (e.OriginalStreamId.StartsWith("$") || e.Event.Metadata.Length < 1) return subscription.Ack(e);
return Task.Run(() => subject.OnNext(this.DeserializeEventRecord(e, subscription, subject, checkpointedPosition > (string.IsNullOrWhiteSpace(streamId) ? e.Event.Position.CommitPosition : e.Event.EventNumber.ToUInt64()))), cancellationToken);
}
catch(Exception ex)
catch (Exception ex)
{
subject.OnError(ex);
return Task.CompletedTask;
Expand Down
Loading

0 comments on commit 23eaba8

Please sign in to comment.