Skip to content

Commit

Permalink
Merge pull request #76 from neuroglia-io/fix-misc-1
Browse files Browse the repository at this point in the history
Minor fixes and improvements
  • Loading branch information
cdavernas authored Dec 8, 2023
2 parents f7734da + 86ea656 commit 64c68ad
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public virtual IAsyncEnumerable<IEventRecord> ReadAsync(string? streamId, Stream
if (string.IsNullOrWhiteSpace(streamId))
{
if(string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ReadFromAllAsync(readDirection, offset, length, cancellationToken);
else return this.ReadFromStreamAsync($"$ce-{this.Options.DatabaseName}", readDirection, offset, length, cancellationToken);
else return this.ReadFromStreamAsync(this.GetDatabaseStreamId()!, readDirection, offset, length, cancellationToken);
}
else return this.ReadFromStreamAsync(this.GetQualifiedStreamId(streamId), readDirection, offset, length, cancellationToken);
}
Expand Down Expand Up @@ -227,7 +227,7 @@ public virtual Task<IObservable<IEventRecord>> ObserveAsync(string? streamId, lo
if (string.IsNullOrWhiteSpace(streamId))
{
if (string.IsNullOrWhiteSpace(this.Options.DatabaseName)) return this.ObserveAllAsync(offset, consumerGroup, cancellationToken);
else return this.ObserveStreamAsync($"$ce-{this.Options.DatabaseName}", offset, consumerGroup, cancellationToken);
else return this.ObserveStreamAsync(this.GetDatabaseStreamId()!, offset, consumerGroup, cancellationToken);
}
else return this.ObserveStreamAsync(streamId, offset, consumerGroup, cancellationToken);
}
Expand Down Expand Up @@ -314,9 +314,10 @@ public virtual async Task SetOffsetAsync(string consumerGroup, long offset, stri
if (string.IsNullOrWhiteSpace(consumerGroup)) throw new ArgumentNullException(nameof(consumerGroup));
ArgumentOutOfRangeException.ThrowIfLessThan(offset, StreamPosition.EndOfStream);

IPosition position = string.IsNullOrWhiteSpace(streamId) ? offset == StreamPosition.EndOfStream ? Position.End : Position.Start : offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset);
IPosition position = string.IsNullOrWhiteSpace(streamId) ? offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.Start : offset == StreamPosition.EndOfStream ? ESStreamPosition.End : ESStreamPosition.FromInt64(offset);
var settings = new PersistentSubscriptionSettings(true, position, checkPointLowerBound: 1, checkPointUpperBound: 1);
PersistentSubscriptionInfo subscription;
streamId = string.IsNullOrWhiteSpace(streamId) ? string.IsNullOrWhiteSpace(this.Options.DatabaseName) ? null : this.GetDatabaseStreamId()! : this.GetQualifiedStreamId(streamId);
if (string.IsNullOrWhiteSpace(streamId))
{
try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToAllAsync(consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); }
Expand All @@ -329,8 +330,7 @@ public virtual async Task SetOffsetAsync(string consumerGroup, long offset, stri
}
else
{
streamId = this.GetQualifiedStreamId(streamId);
try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToStreamAsync(consumerGroup, streamId, cancellationToken: cancellationToken).ConfigureAwait(false); }
try { subscription = await this.EventStorePersistentSubscriptionsClient.GetInfoToStreamAsync(streamId, consumerGroup, cancellationToken: cancellationToken).ConfigureAwait(false); }
catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound) { throw new StreamNotFoundException(); }
if (subscription.Stats.LastCheckpointedEventPosition != null) await this.SetConsumerCheckpointPositionAsync(consumerGroup, streamId, subscription.Stats.LastCheckpointedEventPosition, cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -366,6 +366,12 @@ public virtual async Task DeleteAsync(string streamId, CancellationToken cancell
/// <returns>The qualified id of the specified stream id</returns>
protected virtual string GetQualifiedStreamId(string streamId) => string.IsNullOrWhiteSpace(this.Options.DatabaseName) || streamId.StartsWith($"$ce-") ? streamId : $"{this.Options.DatabaseName}-{streamId}";

/// <summary>
/// Gets the id, if any, of the stream that contains references to all events in the database
/// </summary>
/// <returns>The id, if any, of the stream that contains references to all events in the database</returns>
protected virtual string? GetDatabaseStreamId() => string.IsNullOrWhiteSpace(this.Options.DatabaseName) ? null : $"$ce-{this.Options.DatabaseName}";

/// <summary>
/// Deserializes the specified <see cref="ResolvedEvent"/> into a new <see cref="IEventRecord"/>
/// </summary>
Expand Down Expand Up @@ -415,7 +421,16 @@ protected virtual IEventRecord DeserializeEventRecord(ResolvedEvent e, Persisten
/// <param name="position">The last checkpointed position</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual Task SetConsumerCheckpointPositionAsync(string consumerGroup, string? streamId, IPosition position, CancellationToken cancellationToken = default) => this.AppendAsync(this.GetConsumerCheckpointStreamId(consumerGroup, streamId), new EventDescriptor[] { new("$checkpoint", ((Position)position).CommitPosition) }, cancellationToken: cancellationToken);
protected virtual async Task SetConsumerCheckpointPositionAsync(string consumerGroup, string? streamId, IPosition position, CancellationToken cancellationToken = default)
{
var data = position switch
{
Position pos => pos.CommitPosition,
ESStreamPosition spos => (ulong)spos.ToInt64(),
_ => throw new NotSupportedException($"The position type '{position.GetType()}' is not supported in this context")
};
await this.AppendAsync(this.GetConsumerCheckpointStreamId(consumerGroup, streamId), new EventDescriptor[] { new("$checkpoint", data) }, cancellationToken: cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Gets the id of the stream used to store the checkpoints of the specified consumer group, and optionally stream
Expand Down Expand Up @@ -450,7 +465,7 @@ protected virtual Task OnEventConsumedAsync(ISubject<IEventRecord> subject, stri
{
try
{
if (string.IsNullOrWhiteSpace(this.Options.DatabaseName) || !e.OriginalStreamId.StartsWith($"$ce-{this.Options.DatabaseName}")) if (e.OriginalStreamId.StartsWith("$") || e.Event.Metadata.Length < 1) return subscription.Ack(e);
if (string.IsNullOrWhiteSpace(this.Options.DatabaseName) || !e.OriginalStreamId.StartsWith(this.GetDatabaseStreamId()!)) if (e.OriginalStreamId.StartsWith("$") || e.Event.Metadata.Length < 1) return subscription.Ack(e);
return Task.Run(() => subject.OnNext(this.DeserializeEventRecord(e, subscription, subject, checkpointedPosition > (string.IsNullOrWhiteSpace(streamId) ? e.Event.Position.CommitPosition : e.Event.EventNumber.ToUInt64()))), cancellationToken);
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright © 2021-Present Neuroglia SRL. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Microsoft.AspNetCore.Builder;

namespace Neuroglia.Eventing.CloudEvents.AspNetCore;

/// <summary>
/// Defines extensions for <see cref="IApplicationBuilder"/>s
/// </summary>
public static class CloudEventIApplicationBuilderExtensions
{

/// <summary>
/// Configures the <see cref="IApplicationBuilder"/> to use the <see cref="CloudEventMiddleware"/>
/// </summary>
/// <param name="app">The <see cref="IApplicationBuilder"/> to configure</param>
/// <returns>The configured <see cref="IApplicationBuilder"/></returns>
public static IApplicationBuilder UseCloudEvents(this IApplicationBuilder app) => app.UseMiddleware<CloudEventMiddleware>();

}

0 comments on commit 64c68ad

Please sign in to comment.