Skip to content

Commit

Permalink
Add and test ThrowIfCancellationRequested calls in FlowData and `…
Browse files Browse the repository at this point in the history
…Pipeline`.
  • Loading branch information
drasmart committed Oct 11, 2024
1 parent 76431de commit 00f0d80
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 2 deletions.
5 changes: 5 additions & 0 deletions FiftyOne.Pipeline.Core/Data/FlowData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,18 @@ public IEnumerable<KeyValuePair<string, object>> GetWhere(
/// <exception cref="ObjectDisposedException">
/// Thrown if the Pipeline has already been disposed.
/// </exception>
/// <exception cref="OperationCanceledException">
/// Thrown if <paramref name="cancellationToken"/> tripped
/// before execution started.
/// </exception>
/// <param name="cancellationToken">
/// Token to cancel processing.
/// Can be retrieved by flow elements from
/// <see cref="ProcessingCancellationToken"/>.
/// </param>
public void Process(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (_processed)
{
throw new PipelineException(Messages.ExceptionFlowDataAlreadyProcessed);
Expand Down
7 changes: 7 additions & 0 deletions FiftyOne.Pipeline.Core/FlowElements/Pipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
using System.Collections.ObjectModel;
using System.Globalization;
using System.Linq;
using System.Threading;

namespace FiftyOne.Pipeline.Core.FlowElements
{
Expand Down Expand Up @@ -301,6 +302,11 @@ public IFlowData CreateFlowData()
/// <exception cref="AggregateException">
/// Thrown if an error occurred during processing,
/// unless <see ref="SuppressProcessExceptions"/> is true.
/// </exception>
/// <exception cref="OperationCanceledException">
/// Thrown if
/// <see cref="IFlowData.ProcessingCancellationToken"/>
/// tripped during execution.
/// </exception>
public void Process(IFlowData data)
{
Expand All @@ -316,6 +322,7 @@ public void Process(IFlowData data)

foreach (var element in _flowElements)
{
data.ProcessingCancellationToken.ThrowIfCancellationRequested();
try
{
element.Process(data);
Expand Down
27 changes: 26 additions & 1 deletion Tests/FiftyOne.Pipeline.Core.Tests/Data/FlowDataTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
using Moq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq;
using System.Threading;

namespace FiftyOne.Pipeline.Core.Tests.Data
{
Expand Down Expand Up @@ -87,6 +88,30 @@ public void FlowData_Process()
_pipeline.Verify(p => p.Process(_flowData));
}

[TestMethod]
public void FlowData_Process_WithToken()
{
var token = new CancellationToken(canceled: false);
_flowData.Process(token);
_pipeline.Verify(p => p.Process(_flowData));
}

[TestMethod]
public void FlowData_Process_WithCancelledToken()
{
var token = new CancellationToken(canceled: true);
try
{
_flowData.Process(token);
Assert.Fail($"{nameof(_flowData.Process)} didn't throw.");
}
catch (OperationCanceledException)
{
// nop
}
_pipeline.Verify(p => p.Process(_flowData), Times.Never);
}

/// <summary>
/// Check that the Process method will not allow multiple calls
/// </summary>
Expand Down
59 changes: 58 additions & 1 deletion Tests/FiftyOne.Pipeline.Core.Tests/FlowElements/PipelineTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
using Moq;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace FiftyOne.Pipeline.Core.Tests.FlowElements
Expand Down Expand Up @@ -81,7 +82,7 @@ public void Pipeline_Process_SequenceOfTwo()
element2.Object);
// Don't create the flow data via the pipeline as we just want
// to test Process.
IFlowData data = StaticFactories.CreateFlowData(pipeline);
using IFlowData data = StaticFactories.CreateFlowData(pipeline);
data.Process();

// Act
Expand All @@ -97,6 +98,62 @@ public void Pipeline_Process_SequenceOfTwo()
// Check that element 1 was called before element 2.
element2.Verify(e => e.Process(It.Is<IFlowData>(d => data.GetDataKeys().Contains("element1"))),
"element 1 should have been called before element 2.");
}

[TestMethod]
public void Pipeline_Process_SequenceOfTwo_WithCancellation()
{
// Arrange
var element1 = GetMockFlowElement();
var element2 = GetMockFlowElement();
var tokenSource = new CancellationTokenSource();

// Configure the elements
element1.Setup(e => e.Process(It.IsAny<IFlowData>())).Callback((IFlowData d) =>
{
var tempdata = d.GetOrAdd("element1", (p) => new TestElementData(p));
tempdata["key"] = "done";
tokenSource.Cancel();
});
element2.Setup(e => e.Process(It.IsAny<IFlowData>())).Callback((IFlowData d) =>
{
var tempdata = d.GetOrAdd("element2", (p) => new TestElementData(p));
tempdata["key"] = "done";
});

// Create the pipeline
var pipeline = CreatePipeline(
false,
false,
element1.Object,
element2.Object);
// Don't create the flow data via the pipeline as we just want
// to test Process.
using IFlowData data = StaticFactories.CreateFlowData(pipeline);

// Act
try
{
data.Process(tokenSource.Token);
Assert.Fail($"{nameof(pipeline.Process)} didn't throw.");
}
catch (OperationCanceledException)
{
// nop
}

// Assert
Assert.IsTrue(data.Errors == null || data.Errors.Count == 0, "Expected no errors");
// Check that the resulting data has the expected values
Assert.IsTrue(data.GetDataKeys().Contains("element1"),
"data from element 1 is missing in the result");
Assert.IsFalse(data.GetDataKeys().Contains("element2"),
"data from element 2 is present in the result");
Assert.AreEqual("done", data.Get("element1")["key"].ToString(),
"element 1 result mismatch.");
// Check that element 1 was called before element 2.
element2.Verify(e => e.Process(It.Is<IFlowData>(d => data.GetDataKeys().Contains("element1"))),
Times.Never(), "element 2. should never be called.");
}

/// <summary>
Expand Down

0 comments on commit 00f0d80

Please sign in to comment.