diff --git a/CHANGELOG.md b/CHANGELOG.md index 57f2e80aa..5c2161e4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added check if commit order is bad in sync mode + ## v3.49.1 * Added `table.options.WithIgnoreTruncated` option for `session.Execute` method * Added `table.result.ErrTruncated` error for check it with `errors.Is()` outside of `ydb-go-sdk` diff --git a/internal/topic/topicreaderinternal/committer.go b/internal/topic/topicreaderinternal/committer.go index 5b432e52b..01dcb8249 100644 --- a/internal/topic/topicreaderinternal/committer.go +++ b/internal/topic/topicreaderinternal/committer.go @@ -16,7 +16,10 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) -var ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled")) +var ( + ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled")) + ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode")) +) type sendMessageToServerFunc func(msg rawtopicreader.ClientMessage) error diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index c7058ce3a..0412124cc 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -380,6 +380,9 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange commitRange) error if err != nil || session != ownSession { return xerrors.WithStackTrace(PublicErrCommitSessionToExpiredSession) } + if session.committedOffset() != commitRange.commitOffsetStart && r.cfg.CommitMode == CommitModeSync { + return ErrWrongCommitOrderInSyncMode + } return nil } diff --git a/internal/topic/topicreaderinternal/stream_reader_impl_test.go b/internal/topic/topicreaderinternal/stream_reader_impl_test.go index 8f74e5d7d..20105b4c1 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl_test.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl_test.go @@ -108,6 +108,66 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) { xtest.WaitChannelClosed(t, commitReceived) xtest.WaitChannelClosed(t, readRequestReceived) }) + xtest.TestManyTimesWithName(t, "WrongOrderCommitWithSyncMode", func(t testing.TB) { + e := newTopicReaderTestEnv(t) + e.reader.cfg.CommitMode = CommitModeSync + e.Start() + + lastOffset := e.partitionSession.lastReceivedMessageOffset() + const dataSize = 4 + // request new data portion + readRequestReceived := make(empty.Chan) + e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize * 2}).Do(func(_ interface{}) { + close(readRequestReceived) + }) + + e.SendFromServer(&rawtopicreader.ReadResponse{ + BytesSize: dataSize, + PartitionData: []rawtopicreader.PartitionData{ + { + PartitionSessionID: e.partitionSessionID, + Batches: []rawtopicreader.Batch{ + { + Codec: rawtopiccommon.CodecRaw, + ProducerID: "1", + MessageData: []rawtopicreader.MessageData{ + { + Offset: lastOffset + 1, + }, + }, + }, + }, + }, + }, + }) + + e.SendFromServer(&rawtopicreader.ReadResponse{ + BytesSize: dataSize, + PartitionData: []rawtopicreader.PartitionData{ + { + PartitionSessionID: e.partitionSessionID, + Batches: []rawtopicreader.Batch{ + { + Codec: rawtopiccommon.CodecRaw, + ProducerID: "1", + MessageData: []rawtopicreader.MessageData{ + { + Offset: lastOffset + 2, + }, + }, + }, + }, + }, + }, + }) + + opts := newReadMessageBatchOptions() + opts.MinCount = 2 + batch, err := e.reader.ReadMessageBatch(e.ctx, opts) + require.NoError(t, err) + require.ErrorIs(t, e.reader.Commit(e.ctx, batch.Messages[1].getCommitRange().priv), ErrWrongCommitOrderInSyncMode) + xtest.WaitChannelClosed(t, readRequestReceived) + }) xtest.TestManyTimesWithName(t, "CommitAfterGracefulStopPartition", func(t testing.TB) { e := newTopicReaderTestEnv(t)