Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UF-2022 Add back fill acceptance tests #28

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Data;
using AutoFixture.Xunit2;
using Dapper;
using Microsoft.Data.SqlClient;
Expand Down Expand Up @@ -76,7 +77,7 @@ public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp()

timestamp = DateTimeOffset.Now;
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = timestamp}, _connection);
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp.AddSeconds(-1)}, _connection);
await InsertIntoDatabaseInTransaction(new EventSourceDto{AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp}, _connection);
var secondPageResult = await ReadFeed(firstPageResult.Last().Ulid, 100, _connection);
combinedReturnedEntries.AddRange(secondPageResult);

Expand All @@ -90,6 +91,103 @@ public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp()
Assert.Empty(thirdPageResult);
}

[Fact]
public async Task Back_Fill_Feed()
{
// Existing events - before back fill and changefeed is started
var oldEvent0 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now.AddDays(-4) };
var oldEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 1, Data = "1", Timestamp = DateTimeOffset.Now.AddDays(-3) };
var oldEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 2, Data = "2", Timestamp = DateTimeOffset.Now.AddDays(-2) };
var oldEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 3, Data = "3", Timestamp = DateTimeOffset.Now.AddDays(-1) };
await using var oldEventsTransaction1 = _connection.BeginTransaction();
await InsertIntoEventSource(oldEvent0, _connection, oldEventsTransaction1);
await InsertIntoEventSource(oldEvent1, _connection, oldEventsTransaction1);
await InsertIntoEventSource(oldEvent2, _connection, oldEventsTransaction1);
await InsertIntoEventSource(oldEvent3, _connection, oldEventsTransaction1);
await oldEventsTransaction1.CommitAsync();

// Starting to back fill feed
await using var backFillTransaction1 = _connection.BeginTransaction();
await InsertIntoOutbox(oldEvent0, _connection, backFillTransaction1);
await InsertIntoOutbox(oldEvent1, _connection, backFillTransaction1);
await backFillTransaction1.CommitAsync();

// Continue to back fill events - before changefeed is started
await using var backFillTransaction2 = _connection.BeginTransaction();
await InsertIntoOutbox(oldEvent2, _connection, backFillTransaction2);
await InsertIntoOutbox(oldEvent3, _connection, backFillTransaction2);
await backFillTransaction2.CommitAsync();

// Starting changefeed - Simulate live events at the same time as back filling is running
var timestamp = DateTimeOffset.Now;
var liveEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 4, Data = "4", Timestamp = timestamp };
var liveEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 5, Data = "5", Timestamp = timestamp };
await InsertIntoDatabaseInTransaction(liveEvent1, _connection);
await InsertIntoDatabaseInTransaction(liveEvent2, _connection);

// Continue to back fill outbox with events already processed by changefeed is started
await using var continueBackFillTransaction = _connection.BeginTransaction();
await InsertIntoOutbox(liveEvent1, _connection, continueBackFillTransaction);
await InsertIntoOutbox(liveEvent2, _connection, continueBackFillTransaction);
await continueBackFillTransaction.CommitAsync();

// Running changefeed - Back filling is stopped
timestamp = DateTimeOffset.Now;
var liveEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 6, Data = "6", Timestamp = timestamp };
var liveEvent4 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 7, Data = "7", Timestamp = timestamp };
await InsertIntoDatabaseInTransaction(liveEvent3, _connection);
await InsertIntoDatabaseInTransaction(liveEvent4, _connection);

// First read determines the order, so wait until back filling is done and stopped
// There should be a small overlap on back filling and live events processing to ensure all
// events are added to the outbox,

// Note that the first page only contains rows from the feed table. Only when the cursor is up to date
// with all rows in the feed table it will start to look into the outbox table. The feed and outbox output
// will not be combined in one read, but requires an additional read to get the events from the outbox.
// In this case it does not matter as all events are in the outbox table and the secondPageResult will
// therefore be empty.
var combinedReturnedEntries = new List<FeedResult>();
var firstPageResult = await ReadFeed(_startCursor, 100, _connection);
combinedReturnedEntries.AddRange(firstPageResult);

var secondPageResult = await ReadFeed(firstPageResult.Last().Ulid, 100, _connection);
combinedReturnedEntries.AddRange(secondPageResult);

var feedResult = combinedReturnedEntries.ToList();
Assert.Equal(8, feedResult.Count);
Assert.Equal("0", feedResult[0].Data);
Assert.Equal("1", feedResult[1].Data);
Assert.Equal("2", feedResult[2].Data);
Assert.Equal("3", feedResult[3].Data);
Assert.Equal("4", feedResult[4].Data);
Assert.Equal("5", feedResult[5].Data);
Assert.Equal("6", feedResult[6].Data);
Assert.Equal("7", feedResult[7].Data);
}

[Fact]
public async Task Ignore_Inserting_Into_Outbox_If_It_Already_Exists()
{
var eventSourceDto = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now };
await InsertIntoDatabaseInTransaction(eventSourceDto, _connection);

await using var transaction = _connection.BeginTransaction();
await InsertIntoOutbox(eventSourceDto, _connection, transaction);
await transaction.CommitAsync();

var result = await ReadFeed(_startCursor, 10, _connection);

Assert.NotNull(result);
var feedResult = result.ToList();
Assert.Single(feedResult);
Assert.Equal(eventSourceDto.AggregateId, feedResult.Single().AggregateId);
Assert.Equal(eventSourceDto.Sequence, feedResult.Single().Sequence);
Assert.Equal(eventSourceDto.Data, feedResult.Single().Data);
Assert.Equal(eventSourceDto.Timestamp, feedResult.Single().Timestamp);
Assert.NotEqual(_startCursor, feedResult.Single().Ulid);
}

private static async Task InsertIntoDatabaseInTransaction(
EventSourceDto eventSourceDto,
SqlConnection connection)
Expand All @@ -115,8 +213,16 @@ private static async Task InsertIntoOutbox(
{
const string insertIntoOutboxStatement =
"""
INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence)
VALUES (0, @TimeHint, @AggregateId, @Sequence);
IF NOT EXISTS (
SELECT 1
FROM [changefeed].[outbox:dbo.EventSource]
WHERE
AggregateId = @AggregateId
AND Sequence = @Sequence)
BEGIN
INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence)
VALUES (0, @TimeHint, @AggregateId, @Sequence);
END;
""";

var results = await connection.ExecuteAsync(
Expand Down Expand Up @@ -179,7 +285,9 @@ [ulid] BINARY(16) NOT NULL,
[ChangefeedAcceptanceTests].[dbo].[EventSource]
INNER JOIN #read AS R ON
R.AggregateId = [ChangefeedAcceptanceTests].[dbo].[EventSource].AggregateId AND
R.Sequence = [ChangefeedAcceptanceTests].[dbo].[EventSource].Sequence;
R.Sequence = [ChangefeedAcceptanceTests].[dbo].[EventSource].Sequence
ORDER BY
ulid
""";


Expand All @@ -192,5 +300,5 @@ INNER JOIN #read AS R ON
});

return results;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<PackageReference Include="NSubstitute" Version="5.1.0" />
<PackageReference Include="Polly" Version="8.4.2" />
<PackageReference Include="Testcontainers" Version="3.10.0" />
<PackageReference Include="Ulid" Version="1.3.4" />
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Loading