Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
gingersamurai committed Aug 4, 2023
1 parent ba719e6 commit 567432a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 66 deletions.
4 changes: 2 additions & 2 deletions internal/topic/topicreaderinternal/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

var (

Check failure on line 19 in internal/topic/topicreaderinternal/committer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)
ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled"))
ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode"))
ErrCommitDisabled = xerrors.Wrap(errors.New("ydb: commits disabled"))
//ErrWrongCommitOrderInSyncMode = xerrors.Wrap(errors.New("ydb: wrong commit order in sync mode"))

Check failure on line 21 in internal/topic/topicreaderinternal/committer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

commentFormatting: put a space between `//` and comment text (gocritic)
)

type sendMessageToServerFunc func(msg rawtopicreader.ClientMessage) error
Expand Down
6 changes: 3 additions & 3 deletions internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange commitRange) error
if err != nil || session != ownSession {
return xerrors.WithStackTrace(PublicErrCommitSessionToExpiredSession)
}
if session.committedOffset() != commitRange.commitOffsetStart && r.cfg.CommitMode == CommitModeSync {
return ErrWrongCommitOrderInSyncMode
}
//if session.committedOffset() != commitRange.commitOffsetStart && r.cfg.CommitMode == CommitModeSync {

Check failure on line 383 in internal/topic/topicreaderinternal/stream_reader_impl.go

View workflow job for this annotation

GitHub Actions / golangci-lint

commentFormatting: put a space between `//` and comment text (gocritic)
// return ErrWrongCommitOrderInSyncMode
//}

return nil
}
Expand Down
122 changes: 61 additions & 61 deletions internal/topic/topicreaderinternal/stream_reader_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,67 +108,67 @@ func TestTopicStreamReaderImpl_CommitStolen(t *testing.T) {
xtest.WaitChannelClosed(t, commitReceived)
xtest.WaitChannelClosed(t, readRequestReceived)
})
xtest.TestManyTimesWithName(t, "wrong order commit with sync commit mode", func(t testing.TB) {
e := newTopicReaderTestEnv(t)
e.reader.cfg.CommitMode = CommitModeSync
e.Start()

lastOffset := e.partitionSession.lastReceivedMessageOffset()
const dataSize = 4

// request new data portion
readRequestReceived := make(empty.Chan)
e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize * 2}).Do(func(_ interface{}) {
close(readRequestReceived)
})

e.SendFromServer(&rawtopicreader.ReadResponse{
BytesSize: dataSize,
PartitionData: []rawtopicreader.PartitionData{
{
PartitionSessionID: e.partitionSessionID,
Batches: []rawtopicreader.Batch{
{
Codec: rawtopiccommon.CodecRaw,
ProducerID: "1",
MessageData: []rawtopicreader.MessageData{
{
Offset: lastOffset + 1,
},
},
},
},
},
},
})

e.SendFromServer(&rawtopicreader.ReadResponse{
BytesSize: dataSize,
PartitionData: []rawtopicreader.PartitionData{
{
PartitionSessionID: e.partitionSessionID,
Batches: []rawtopicreader.Batch{
{
Codec: rawtopiccommon.CodecRaw,
ProducerID: "1",
MessageData: []rawtopicreader.MessageData{
{
Offset: lastOffset + 2,
},
},
},
},
},
},
})

opts := newReadMessageBatchOptions()
opts.MinCount = 2
batch, err := e.reader.ReadMessageBatch(e.ctx, opts)
require.NoError(t, err)
require.ErrorIs(t, e.reader.Commit(e.ctx, batch.Messages[1].getCommitRange().priv), ErrWrongCommitOrderInSyncMode)
xtest.WaitChannelClosed(t, readRequestReceived)
})
//xtest.TestManyTimesWithName(t, "wrong order commit with sync commit mode", func(t testing.TB) {
// e := newTopicReaderTestEnv(t)
// e.reader.cfg.CommitMode = CommitModeSync
// e.Start()
//
// lastOffset := e.partitionSession.lastReceivedMessageOffset()
// const dataSize = 4
//
// // request new data portion
// readRequestReceived := make(empty.Chan)
// e.stream.EXPECT().Send(&rawtopicreader.ReadRequest{BytesSize: dataSize * 2}).Do(func(_ interface{}) {
// close(readRequestReceived)
// })
//
// e.SendFromServer(&rawtopicreader.ReadResponse{
// BytesSize: dataSize,
// PartitionData: []rawtopicreader.PartitionData{
// {
// PartitionSessionID: e.partitionSessionID,
// Batches: []rawtopicreader.Batch{
// {
// Codec: rawtopiccommon.CodecRaw,
// ProducerID: "1",
// MessageData: []rawtopicreader.MessageData{
// {
// Offset: lastOffset + 1,
// },
// },
// },
// },
// },
// },
// })
//
// e.SendFromServer(&rawtopicreader.ReadResponse{
// BytesSize: dataSize,
// PartitionData: []rawtopicreader.PartitionData{
// {
// PartitionSessionID: e.partitionSessionID,
// Batches: []rawtopicreader.Batch{
// {
// Codec: rawtopiccommon.CodecRaw,
// ProducerID: "1",
// MessageData: []rawtopicreader.MessageData{
// {
// Offset: lastOffset + 2,
// },
// },
// },
// },
// },
// },
// })
//
// opts := newReadMessageBatchOptions()
// opts.MinCount = 2
// batch, err := e.reader.ReadMessageBatch(e.ctx, opts)
// require.NoError(t, err)
// require.ErrorIs(t, e.reader.Commit(e.ctx, batch.Messages[1].getCommitRange().priv), ErrWrongCommitOrderInSyncMode)
// xtest.WaitChannelClosed(t, readRequestReceived)
//})

xtest.TestManyTimesWithName(t, "CommitAfterGracefulStopPartition", func(t testing.TB) {
e := newTopicReaderTestEnv(t)
Expand Down

0 comments on commit 567432a

Please sign in to comment.