Skip to content

Commit

Permalink
refactor(source): respect max chunk size in source connector parser (#…
Browse files Browse the repository at this point in the history
…19698)

Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Dec 19, 2024
1 parent 29d697f commit f3d8e0d
Show file tree
Hide file tree
Showing 26 changed files with 416 additions and 312 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/executors/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl SourceExecutor {
self.metrics,
SourceCtrlOpts {
chunk_size: self.chunk_size,
rate_limit: None,
split_txn: false,
},
ConnectorProperties::default(),
None,
Expand Down
2 changes: 0 additions & 2 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ impl Op {
}
}

pub type Ops<'a> = &'a [Op];

/// `StreamChunk` is used to pass data over the streaming pathway.
#[derive(Clone, PartialEq)]
pub struct StreamChunk {
Expand Down
11 changes: 11 additions & 0 deletions src/common/src/array/stream_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ pub enum RecordType {
Update,
}

impl RecordType {
/// Get the corresponding `Op`s for this record type.
pub fn ops(self) -> &'static [Op] {
match self {
RecordType::Insert => &[Op::Insert],
RecordType::Delete => &[Op::Delete],
RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert],
}
}
}

/// Generic type to represent a row change.
#[derive(Debug, Clone, Copy)]
pub enum Record<R: Row> {
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ serde_derive = "1"
serde_json = "1"
serde_with = { version = "3", features = ["json"] }
simd-json = { version = "0.14.2", features = ["hints"] }
smallvec = "1"
sqlx = { workspace = true }
strum = "0.26"
strum_macros = "0.26"
Expand Down
6 changes: 5 additions & 1 deletion src/connector/benches/debezium_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use json_common::*;
use paste::paste;
use rand::Rng;
use risingwave_connector::parser::{DebeziumParser, SourceStreamChunkBuilder};
use risingwave_connector::source::SourceCtrlOpts;

fn generate_debezium_json_row(rng: &mut impl Rng, change_event: &str) -> String {
let source = r#"{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null}"#;
Expand Down Expand Up @@ -57,7 +58,10 @@ macro_rules! create_debezium_bench_helpers {
|| (block_on(DebeziumParser::new_for_test(get_descs())).unwrap(), records.clone()) ,
| (mut parser, records) | async move {
let mut builder =
SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
SourceStreamChunkBuilder::new(get_descs(), SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
});
for record in records {
let writer = builder.row_writer();
parser.parse_inner(None, record, writer).await.unwrap();
Expand Down
18 changes: 15 additions & 3 deletions src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use json_common::*;
use old_json_parser::JsonParser;
use risingwave_connector::parser::plain_parser::PlainParser;
use risingwave_connector::parser::{SourceStreamChunkBuilder, SpecificParserConfig};
use risingwave_connector::source::SourceContext;
use risingwave_connector::source::{SourceContext, SourceCtrlOpts};

// The original implementation used to parse JSON prior to #13707.
mod old_json_parser {
Expand Down Expand Up @@ -130,7 +130,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) {
(parser, records.clone())
},
|(mut parser, records)| async move {
let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
let mut builder = SourceStreamChunkBuilder::new(
get_descs(),
SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
},
);
for record in records {
let writer = builder.row_writer();
parser
Expand All @@ -155,7 +161,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) {
(parser, records.clone())
},
|(parser, records)| async move {
let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS);
let mut builder = SourceStreamChunkBuilder::new(
get_descs(),
SourceCtrlOpts {
chunk_size: NUM_RECORDS,
split_txn: false,
},
);
for record in records {
let writer = builder.row_writer();
parser.parse_inner(record, writer).await.unwrap();
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/parser/bytes_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod tests {
BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceContext;
use crate::source::{SourceContext, SourceCtrlOpts};

fn get_payload() -> Vec<Vec<u8>> {
vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
Expand All @@ -70,7 +70,7 @@ mod tests {
.await
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

for payload in get_payload() {
let writer = builder.row_writer();
Expand All @@ -80,7 +80,8 @@ mod tests {
.unwrap();
}

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
let mut rows = chunk.rows();
{
let (op, row) = rows.next().unwrap();
Expand Down
34 changes: 22 additions & 12 deletions src/connector/src/parser/canal/simd_json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ mod tests {

use super::*;
use crate::parser::SourceStreamChunkBuilder;
use crate::source::SourceCtrlOpts;

#[tokio::test]
async fn test_data_types() {
Expand All @@ -162,12 +163,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();
let (op, row) = chunk.rows().next().unwrap();
assert_eq!(op, Op::Insert);
assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1)));
Expand Down Expand Up @@ -233,12 +237,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();

let mut rows = chunk.rows();

Expand Down Expand Up @@ -287,12 +294,15 @@ mod tests {
)
.unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());

let writer = builder.row_writer();
parser.parse_inner(payload.to_vec(), writer).await.unwrap();
parser
.parse_inner(payload.to_vec(), builder.row_writer())
.await
.unwrap();

let chunk = builder.finish();
builder.finish_current_chunk();
let chunk = builder.consume_ready_chunks().next().unwrap();

let mut rows = chunk.rows();

Expand Down
Loading

0 comments on commit f3d8e0d

Please sign in to comment.