Skip to content

Commit

Permalink
Merge pull request #50 from neuroglia-io/fix-subscription-handler
Browse files Browse the repository at this point in the history
Fixed critical issues with consumer cloud event partitioning, sequencing and offsetting
  • Loading branch information
cdavernas authored Aug 1, 2023
2 parents 26707ed + 2adcca5 commit 17683bb
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Json.Patch;
using Polly;
using Polly.CircuitBreaker;
using System.Net;
using System.Net.Mime;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
Expand Down Expand Up @@ -220,13 +221,30 @@ protected virtual async Task InitializeCloudEventStreamAsync()
this.Logger.LogDebug("Initializing the cloud event stream of subscription '{subscription}' at offset '{offset}'", this.Subscription, offset);
if (this.Subscription.Spec.Partition == null)
{
try
{
this.StreamOffset = (await this.EventStoreProvider.GetEventStore().GetStreamMetadataAsync(this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false)).Length;
}
catch (HyloException ex) when (ex.Problem.Status == (int)HttpStatusCode.NotFound)
{
this.StreamOffset = 0;
}
if (offset >= 0 && (ulong)offset == this.StreamOffset) offset = -1;
this.CloudEventStream = await this.EventStoreProvider.GetEventStore().SubscribeAsync(offset, this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false);
this.StreamOffset = (await this.EventStoreProvider.GetEventStore().GetStreamMetadataAsync(this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false)).Length;
}
else
{
try
{
this.StreamOffset = (await this.EventStoreProvider.GetEventStore().GetPartitionMetadataAsync(this.Subscription.Spec.Partition, this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false)).Length;
}
catch (HyloException ex) when (ex.Problem.Status == (int)HttpStatusCode.NotFound)
{
this.StreamOffset = 0;
}
if (offset >= 0 && (ulong)offset == this.StreamOffset) offset = -1;
this.CloudEventStream = await this.EventStoreProvider.GetEventStore().SubscribeToPartitionAsync(this.Subscription.Spec.Partition, offset, this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false);
this.StreamOffset = (await this.EventStoreProvider.GetEventStore().GetPartitionMetadataAsync(this.Subscription.Spec.Partition, this.StreamInitializationCancellationTokenSource.Token).ConfigureAwait(false)).Length;

}
this._Subscription = this.CloudEventStream.Where(this.Filters).SubscribeAsync(this.OnCloudEventAsync, onErrorAsync: this.OnSubscriptionErrorAsync, null);
if (offset != StreamPosition.EndOfStream && (ulong)offset < this.StreamOffset) _ = this.CatchUpAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -452,10 +470,15 @@ protected virtual async Task CatchUpAsync()
this.SubscriptionOutOfSync = true;
this.StreamInitializationTaskCompletionSource ??= new();
var currentOffset = this.Subscription.GetOffset();
if (currentOffset == StreamPosition.EndOfStream) currentOffset = (long)(await this.EventStoreProvider.GetEventStore().ReadOneAsync(StreamReadDirection.Backwards, StreamPosition.EndOfStream, this.StreamInitializationCancellationTokenSource!.Token).ConfigureAwait(false))!.Sequence;
var eventStore = this.EventStoreProvider.GetEventStore();
if (currentOffset == StreamPosition.EndOfStream) currentOffset = this.Subscription.Spec.Partition == null ?
(long)(await eventStore.ReadOneAsync(StreamReadDirection.Backwards, StreamPosition.EndOfStream, this.StreamInitializationCancellationTokenSource!.Token).ConfigureAwait(false))!.Sequence
: (long)(await eventStore.ReadPartitionAsync(this.Subscription.Spec.Partition, StreamReadDirection.Backwards, StreamPosition.EndOfStream, 1, this.StreamInitializationCancellationTokenSource!.Token).SingleAsync(this.StreamInitializationCancellationTokenSource!.Token).ConfigureAwait(false))!.Sequence;
do
{
var record = await this.EventStoreProvider.GetEventStore().ReadOneAsync(StreamReadDirection.Forwards, currentOffset!, this.StreamInitializationCancellationTokenSource!.Token).ConfigureAwait(false);
var record = this.Subscription.Spec.Partition == null ?
await eventStore.ReadOneAsync(StreamReadDirection.Forwards, currentOffset!, this.StreamInitializationCancellationTokenSource!.Token).ConfigureAwait(false)
: await eventStore.ReadPartitionAsync(this.Subscription.Spec.Partition, StreamReadDirection.Forwards, currentOffset, 1, this.StreamInitializationCancellationTokenSource!.Token).SingleOrDefaultAsync(this.StreamInitializationCancellationTokenSource!.Token).ConfigureAwait(false);
if (record == null)
{
await Task.Delay(50);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@

using CloudStreams.Core.Data;
using Hylo.Properties;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Mime;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
using static EventStore.Client.StreamMessage;

namespace CloudStreams.Core.Infrastructure.Services;

Expand Down Expand Up @@ -202,6 +200,7 @@ public virtual async Task<IObservable<CloudEventRecord>> SubscribeToPartitionAsy
partition.GetStreamName(),
offset.ToSubscriptionPosition(),
async (sub, e, cancellation) => subject.OnNext(await this.DeserializeResolvedEventAsync(e, cancellation).ConfigureAwait(false)),
true,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
return Observable.Using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public record CloudEventSequencingConfiguration
/// <summary>
/// Gets/sets the way to handle conflicts with existing attributes
/// </summary>
/// <remarks>See <see cref="CloudEventAttributeConflictResolution"/></remarks>
[DataMember(Order = 3, Name = "attributeConflictResolution"), JsonPropertyOrder(3), JsonPropertyName("attributeConflictResolution"), YamlMember(Order = 3, Alias = "attributeConflictResolution")]
public virtual string? AttributeConflictResolution { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public static class CloudEventDescriptorExtensions
public static CloudEvent ToCloudEvent(this CloudEventDescriptor descriptor)
{
if (descriptor == null) throw new ArgumentNullException(nameof(descriptor));
if(descriptor.Data is byte[] byteArray)
{
//lets assume the data is actually JSON

}
var e = (JsonObject)Hylo.Serializer.Json.SerializeToNode(descriptor.Metadata.ContextAttributes)!;
var data = Hylo.Serializer.Json.SerializeToNode(descriptor.Data);
e[CloudEventAttributes.Data] = data;
Expand Down

0 comments on commit 17683bb

Please sign in to comment.