Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚧Prevent unsubscription in OnRecieve #194

Merged
merged 9 commits into from
Jan 23, 2024
32 changes: 25 additions & 7 deletions Carbonate/Core/SubscriptionUnsubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,35 @@

namespace Carbonate.Core;

using Exceptions;

/// <summary>
/// A subscription unsubscriber for unsubscribing from a <see cref="IReactable{TSubscription}"/>.
/// </summary>
internal sealed class SubscriptionUnsubscriber : IDisposable
/// <typeparam name="TSubscription">The type of subscription to use.</typeparam>
internal sealed class SubscriptionUnsubscriber<TSubscription> : IDisposable
where TSubscription : class, ISubscription
{
private readonly List<ISubscription> subscriptions;
private readonly ISubscription subscription;
private readonly List<TSubscription> subscriptions;
private readonly TSubscription subscription;
private readonly Func<bool> isProcessing;
private bool isDisposed;

/// <summary>
/// Initializes a new instance of the <see cref="SubscriptionUnsubscriber"/> class.
/// Initializes a new instance of the <see cref="SubscriptionUnsubscriber{TSubscription}"/> class.
/// </summary>
/// <param name="subscriptions">The list of subscriptions.</param>
/// <param name="subscription">The subscription that has been subscribed.</param>
internal SubscriptionUnsubscriber(List<ISubscription> subscriptions, ISubscription subscription)
/// <param name="isProcessing">Returns the in-processing state.</param>
internal SubscriptionUnsubscriber(List<TSubscription> subscriptions, TSubscription subscription, Func<bool> isProcessing)
{
this.subscriptions = subscriptions ?? throw new ArgumentNullException(nameof(subscriptions), "The parameter must not be null.");
this.subscription = subscription ?? throw new ArgumentNullException(nameof(subscription), "The parameter must not be null.");
ArgumentNullException.ThrowIfNull(subscriptions);
ArgumentNullException.ThrowIfNull(subscription);
ArgumentNullException.ThrowIfNull(isProcessing);

this.subscriptions = subscriptions;
this.subscription = subscription;
this.isProcessing = isProcessing;
}

/// <summary>
Expand All @@ -45,6 +56,13 @@ private void Dispose(bool disposing)

if (disposing)
{
if (this.isProcessing())
{
var exMsg = "The send notification process is currently in progress.";
exMsg += $"\nThe subscription '{this.subscription.Name}' with id '{this.subscription.Id}' could not be unsubscribed.";
throw new NotificationException(exMsg);
}

this.subscriptions.Remove(this.subscription);
}

Expand Down
40 changes: 40 additions & 0 deletions Carbonate/Exceptions/NotificationException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// <copyright file="NotificationException.cs" company="KinsonDigital">
// Copyright (c) KinsonDigital. All rights reserved.
// </copyright>

namespace Carbonate.Exceptions;

/// <summary>
/// Thrown when something goes wrong with the notification process.
/// </summary>
public sealed class NotificationException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="NotificationException"/> class.
/// </summary>
public NotificationException()
: base("The send notification process is currently in progress.")
{
}

/// <summary>
/// Initializes a new instance of the <see cref="NotificationException"/> class.
/// </summary>
/// <param name="message">The message that describes the error.</param>
public NotificationException(string message)
: base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="NotificationException"/> class.
/// </summary>
/// <param name="message">The message that describes the error.</param>
/// <param name="innerException">
/// The <see cref="Exception"/> instance that caused the current exception.
/// </param>
public NotificationException(string message, Exception innerException)
: base(message, innerException)
{
}
}
7 changes: 7 additions & 0 deletions Carbonate/NonDirectional/PushReactable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Carbonate.NonDirectional;

using System.Runtime.InteropServices;
using Core.NonDirectional;
using Exceptions;

/// <inheritdoc cref="IPushReactable"/>
public class PushReactable : ReactableBase<IReceiveSubscription>, IPushReactable
Expand All @@ -27,9 +28,15 @@ public void Push(Guid id)
continue;
}

IsProcessing = true;
subscription.OnReceive();
IsProcessing = false;
}
}
catch (Exception e) when (e is NotificationException)
{
throw;
}
catch (Exception e)
{
SendError(e, id);
Expand Down
11 changes: 10 additions & 1 deletion Carbonate/OneWay/PullReactable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Carbonate.OneWay;

using System.Runtime.InteropServices;
using Core.OneWay;
using Exceptions;

/// <inheritdoc cref="IPullReactable{TOut}"/>
public class PullReactable<TOut>
Expand All @@ -28,9 +29,17 @@ public class PullReactable<TOut>
continue;
}

return subscription.OnRespond() ?? default(TOut);
IsProcessing = true;
var value = subscription.OnRespond() ?? default(TOut);
IsProcessing = false;

return value;
}
}
catch (Exception e) when (e is NotificationException)
{
throw;
}
catch (Exception e)
{
SendError(e, id);
Expand Down
7 changes: 7 additions & 0 deletions Carbonate/OneWay/PushReactable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Carbonate.OneWay;

using System.Runtime.InteropServices;
using Core.OneWay;
using Exceptions;

/// <inheritdoc cref="IPushReactable{T}"/>
public class PushReactable<TIn>
Expand Down Expand Up @@ -34,9 +35,15 @@ public void Push(Guid id, in TIn data)
continue;
}

IsProcessing = true;
subscription.OnReceive(data);
IsProcessing = false;
}
}
catch (Exception e) when (e is NotificationException)
{
throw;
}
catch (Exception e)
{
SendError(e, id);
Expand Down
14 changes: 13 additions & 1 deletion Carbonate/ReactableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ public abstract class ReactableBase<TSubscription> : IReactable<TSubscription>
/// </summary>
internal List<TSubscription> InternalSubscriptions { get; } = [];

/// <summary>
/// Gets or sets a value indicating whether or not the <see cref="IReactable{TSubscription}"/> is
/// busy processing notifications.
/// </summary>
protected bool IsProcessing { get; set; }

/// <summary>
/// Gets a value indicating whether or not if the <see cref="ReactableBase{T}"/> has been disposed.
/// </summary>
Expand All @@ -56,7 +62,7 @@ public virtual IDisposable Subscribe(TSubscription subscription)

InternalSubscriptions.Add(subscription);

return new SubscriptionUnsubscriber(InternalSubscriptions.Cast<ISubscription>().ToList(), subscription);
return new SubscriptionUnsubscriber<TSubscription>(InternalSubscriptions, subscription, IsProcessingNotifications);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -181,4 +187,10 @@ protected void SendError(Exception exception, Guid id)
subscription.OnError(exception);
}
}

/// <summary>
/// Returns a value indicating whether or not the <see cref="IReactable{TSubscription}"/> is busy processing notifications.
/// </summary>
/// <returns>True if busy.</returns>
private bool IsProcessingNotifications() => IsProcessing;
}
11 changes: 10 additions & 1 deletion Carbonate/TwoWay/PushPullReactable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Carbonate.TwoWay;

using System.Runtime.InteropServices;
using Core.TwoWay;
using Exceptions;

/// <inheritdoc cref="IPushPullReactable{TIn,TOut}"/>
public class PushPullReactable<TIn, TOut> : ReactableBase<IReceiveRespondSubscription<TIn, TOut>>, IPushPullReactable<TIn, TOut>
Expand All @@ -30,9 +31,17 @@ public class PushPullReactable<TIn, TOut> : ReactableBase<IReceiveRespondSubscri
continue;
}

return subscription.OnRespond(data) ?? default(TOut);
IsProcessing = true;
var value = subscription.OnRespond(data) ?? default(TOut);
IsProcessing = false;

return value;
}
}
catch (Exception e) when (e is NotificationException)
{
throw;
}
catch (Exception e)
{
SendError(e, id);
Expand Down
41 changes: 34 additions & 7 deletions Testing/CarbonateTests/Core/SubscriptionUnsubscriberTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace CarbonateTests.Core;
using Xunit;

/// <summary>
/// Tests the <see cref="SubscriptionUnsubscriber"/> class.
/// Tests the <see cref="SubscriptionUnsubscriber{TSubscription}"/> class.
/// </summary>
public class SubscriptionUnsubscriberTests
{
Expand All @@ -22,13 +22,16 @@ public void Ctor_WithNullSubscriptionsParam_ThrowsException()
// Arrange & Act
var act = () =>
{
_ = new SubscriptionUnsubscriber(null, null);
_ = new SubscriptionUnsubscriber<ISubscription>(
null,
Substitute.For<ISubscription>(),
() => true);
};

// Assert
act.Should()
.Throw<ArgumentNullException>()
.WithMessage("The parameter must not be null. (Parameter 'subscriptions')");
.WithMessage("Value cannot be null. (Parameter 'subscriptions')");
}

[Fact]
Expand All @@ -37,13 +40,34 @@ public void Ctor_WithNullSubscriptionParam_ThrowsException()
// Arrange & Act
var act = () =>
{
_ = new SubscriptionUnsubscriber(Array.Empty<ISubscription>().ToList(), null);
_ = new SubscriptionUnsubscriber<ISubscription>(
Array.Empty<ISubscription>().ToList(),
null,
() => true);
};

// Assert
act.Should()
.Throw<ArgumentNullException>()
.WithMessage("The parameter must not be null. (Parameter 'subscription')");
.WithMessage("Value cannot be null. (Parameter 'subscription')");
}

[Fact]
public void Ctor_WithNullIsProcessingParam_ThrowsException()
{
// Arrange & Act
var act = () =>
{
_ = new SubscriptionUnsubscriber<ISubscription>(
Array.Empty<ISubscription>().ToList(),
Substitute.For<ISubscription>(),
null);
};

// Assert
act.Should()
.Throw<ArgumentNullException>()
.WithMessage("Value cannot be null. (Parameter 'isProcessing')");
}
#endregion

Expand All @@ -54,7 +78,10 @@ public void TotalSubscriptions_WhenInvoked_ReturnsCorrectResult()
// Arrange
var subscriptions = new[] { Substitute.For<ISubscription>(), Substitute.For<ISubscription>() };

var sut = new SubscriptionUnsubscriber(subscriptions.ToList(), Substitute.For<ISubscription>());
var sut = new SubscriptionUnsubscriber<ISubscription>(
subscriptions.ToList(),
Substitute.For<ISubscription>(),
() => false);

// Act
var actual = sut.TotalSubscriptions;
Expand All @@ -74,7 +101,7 @@ public void Dispose_WhenInvoked_RemovesFromSubscriptionsList()

var subscriptions = new[] { subA, subB, subC };

var sut = new SubscriptionUnsubscriber(subscriptions.ToList(), subB);
var sut = new SubscriptionUnsubscriber<ISubscription>(subscriptions.ToList(), subB, () => false);

// Act
sut.Dispose();
Expand Down
51 changes: 51 additions & 0 deletions Testing/CarbonateTests/Exceptions/NotificationExceptionTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// <copyright file="NotificationExceptionTests.cs" company="KinsonDigital">
// Copyright (c) KinsonDigital. All rights reserved.
// </copyright>

namespace CarbonateTests.Exceptions;

using Carbonate.Exceptions;
using FluentAssertions;
using Xunit;

/// <summary>
/// Tests the <see cref="NotificationException"/> class.
/// </summary>
public class NotificationExceptionTests
{
#region Constructor Tests
[Fact]
public void Ctor_WithNoParam_CorrectlySetsExceptionMessage()
{
// Act
var exception = new NotificationException();

// Assert
exception.Message.Should().Be("The send notification process is currently in progress.");
}

[Fact]
public void Ctor_WhenInvokedWithSingleMessageParam_CorrectlySetsMessage()
{
// Act
var exception = new NotificationException("test-message");

// Assert
exception.Message.Should().Be("test-message");
}

[Fact]
public void Ctor_WhenInvokedWithMessageAndInnerException_ThrowsException()
{
// Arrange
var innerException = new Exception("inner-exception");

// Act
var deviceException = new NotificationException("test-exception", innerException);

// Assert
deviceException.InnerException.Message.Should().Be("inner-exception");
deviceException.Message.Should().Be("test-exception");
}
#endregion
}
Loading
Loading