-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(source): respect max chunk size in source connector parser #19698
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
SourceCtrlOpts::rate_limit
to split_txn: bool
SourceCtrlOpts::rate_limit
to split_txn: bool
SourceCtrlOpts::rate_limit
to split_txn: bool
a8d2c05
to
185defc
Compare
d3af95f
to
46e5995
Compare
5f97579
to
c7cdfb5
Compare
any context about the refactor? |
still WIP, will add description when it's ready |
3f2f0a0
to
1d9e42e
Compare
9fc46c4
to
10e0ea4
Compare
eeeebb4
to
486f99c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
if let Some(ref mut txn) = self.ongoing_txn { | ||
txn.len += 1; | ||
|
||
if txn.len >= MAX_TRANSACTION_SIZE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a warning log before. Shall we keep it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The warning log is moved to finish_current_chunk
, so that we won't miss any call site of finish_current_chunk
that can cause an interrupt of transaction.
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); | ||
builder.finish() | ||
/// Consumes and returns the ready [`StreamChunk`]s. | ||
pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator<Item = StreamChunk> + '_ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrite
SourceStreamChunkBuilder
to allow creating chunk during writing toSourceStreamChunkRowWriter
. The trick is to add aready_chunks
field.This is because that it's possible to parse multiple rows from one
SourceMessage
, while the parser API doesn't use async stream to yield rows, instead, it directly write toSourceStreamChunkRowWriter
.
I understand the motivation to return multiple chunks, but what's reason of separating finish()
into finish_current_chunk() + consume_ready_chunks()
? I mean, if you just want multiple chunks, changing finish()
to return Vec<StreamChunk>
can achieve that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because sometimes we need to call consume_ready_chunks
separately even we (the user of SourceStreamChunkBuilder
) don't call finish_current_chunk
explicitly before it. For example 1) in the arm ParseResult::Rows
of match parser.parse_one_with_txn()
; 2) after chunk_builder.commit_transaction(id)
.
Signed-off-by: Richard Chien <stdrc@outlook.com>
…iter Signed-off-by: Richard Chien <stdrc@outlook.com>
… type Signed-off-by: Richard Chien <stdrc@outlook.com>
Signed-off-by: Richard Chien <stdrc@outlook.com>
Signed-off-by: Richard Chien <stdrc@outlook.com>
Signed-off-by: Richard Chien <stdrc@outlook.com>
Signed-off-by: Richard Chien <stdrc@outlook.com>
Signed-off-by: Richard Chien <stdrc@outlook.com>
Signed-off-by: Richard Chien <stdrc@outlook.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good
builders: Vec<ArrayBuilderImpl>, | ||
op_builder: Vec<Op>, | ||
vis_builder: BitmapBuilder, | ||
ongoing_txn: Option<Transaction>, | ||
ready_chunks: SmallVec<[StreamChunk; 1]>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SmallVec
looks good.
Can you please elaborate in which (unusual) case we'll have multiple ready chunks stashed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When chunk_size
is limited to 1
, and somehow we parsed 3 rows from one SourceMessage
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May update the doc comment to reflect it would build multiple chunks instead of only one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment for the chunk builder in 307ebc2
d986091
to
b1a892c
Compare
Pull Request is not mergeable
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In
SourceStreamChunkBuilder
, with the information ofSourceCtrlOps
, we can actually restrict the chunk size early during chunk building, instead emitting large chunks first and cutting them into small ones later inapply_rate_limit
. This has two benefits:apply_rate_limit
used byFetchExecutor
,FsSourceExecutor
,SourceBackfillExecutor
andSourceExecutor
.Main changes happened in this PR:
Change
SourceCtrlOpts::rate_limit: Option<u32>
tosplit_txn: bool
.This is because we already pass in a limited chunk size (basically
min(stream_chunk_size, rate_limit_burst)
) viaSourceCtrlOpts::chunk_size
. We still needsplit_txn
to control whether we need to cut chunks during (CDC) transactions, as concluded here.Simplify
SourceStreamChunkRowWriter
, by directly store a reference toSourceStreamChunkBuilder
inside it.Rewrite
SourceStreamChunkBuilder
to allow creating chunk during writing toSourceStreamChunkRowWriter
. The trick is to add aready_chunks
field.This is because that it's possible to parse multiple rows from one
SourceMessage
, while the parser API doesn't use async stream to yield rows, instead, it directly write toSourceStreamChunkRowWriter
.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.