Skip to content

Commit

Permalink
Merge pull request #802 added check if commit order is bad in sync mo…
Browse files Browse the repository at this point in the history
…de from gingersamurai
  • Loading branch information
rekby authored Aug 4, 2023
2 parents 343fc98 + 33f4901 commit 2b953aa
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added check if commit order is bad in sync mode

## v3.49.1
* Added `table.options.WithIgnoreTruncated` option for `session.Execute` method
* Added `table.result.ErrTruncated` error for check it with `errors.Is()` outside of `ydb-go-sdk`
Expand Down
5 changes: 4 additions & 1 deletion internal/topic/topicreaderinternal/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

var ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled"))
var (
ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled"))
ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode"))
)

type sendMessageToServerFunc func(msg rawtopicreader.ClientMessage) error

Expand Down
3 changes: 3 additions & 0 deletions internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange commitRange) error
if err != nil || session != ownSession {
return xerrors.WithStackTrace(PublicErrCommitSessionToExpiredSession)
}
if session.committedOffset() != commitRange.commitOffsetStart && r.cfg.CommitMode == CommitModeSync {
return ErrWrongCommitOrderInSyncMode
}

return nil
}
Expand Down
60 changes: 60 additions & 0 deletions internal/topic/topicreaderinternal/stream_reader_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,66 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) {
xtest.WaitChannelClosed(t, commitReceived)
xtest.WaitChannelClosed(t, readRequestReceived)
})
xtest.TestManyTimesWithName(t, "WrongOrderCommitWithSyncMode", func(t testing.TB) {
e := newTopicReaderTestEnv(t)
e.reader.cfg.CommitMode = CommitModeSync
e.Start()

lastOffset := e.partitionSession.lastReceivedMessageOffset()
const dataSize = 4
// request new data portion
readRequestReceived := make(empty.Chan)
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize * 2}).Do(func(_ interface{}) {
close(readRequestReceived)
})

e.SendFromServer(&rawtopicreader.ReadResponse{
BytesSize: dataSize,
PartitionData: []rawtopicreader.PartitionData{
{
PartitionSessionID: e.partitionSessionID,
Batches: []rawtopicreader.Batch{
{
Codec: rawtopiccommon.CodecRaw,
ProducerID: "1",
MessageData: []rawtopicreader.MessageData{
{
Offset: lastOffset + 1,
},
},
},
},
},
},
})

e.SendFromServer(&rawtopicreader.ReadResponse{
BytesSize: dataSize,
PartitionData: []rawtopicreader.PartitionData{
{
PartitionSessionID: e.partitionSessionID,
Batches: []rawtopicreader.Batch{
{
Codec: rawtopiccommon.CodecRaw,
ProducerID: "1",
MessageData: []rawtopicreader.MessageData{
{
Offset: lastOffset + 2,
},
},
},
},
},
},
})

opts := newReadMessageBatchOptions()
opts.MinCount = 2
batch, err := e.reader.ReadMessageBatch(e.ctx, opts)
require.NoError(t, err)
require.ErrorIs(t, e.reader.Commit(e.ctx, batch.Messages[1].getCommitRange().priv), ErrWrongCommitOrderInSyncMode)
xtest.WaitChannelClosed(t, readRequestReceived)
})

xtest.TestManyTimesWithName(t, "CommitAfterGracefulStopPartition", func(t testing.TB) {
e := newTopicReaderTestEnv(t)
Expand Down

0 comments on commit 2b953aa

Please sign in to comment.