diff --git a/Cargo.lock b/Cargo.lock index a0ed055ce6e5..f11864048ceb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11148,6 +11148,7 @@ dependencies = [ "serde_with 3.8.1", "serde_yaml", "simd-json", + "smallvec", "sqlx", "strum 0.26.3", "strum_macros 0.26.4", diff --git a/src/batch/executors/src/executor/source.rs b/src/batch/executors/src/executor/source.rs index 3bf7bea9bd82..aa145b4e4a43 100644 --- a/src/batch/executors/src/executor/source.rs +++ b/src/batch/executors/src/executor/source.rs @@ -150,7 +150,7 @@ impl SourceExecutor { self.metrics, SourceCtrlOpts { chunk_size: self.chunk_size, - rate_limit: None, + split_txn: false, }, ConnectorProperties::default(), None, diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index b004329a084c..307283726c34 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -101,8 +101,6 @@ impl Op { } } -pub type Ops<'a> = &'a [Op]; - /// `StreamChunk` is used to pass data over the streaming pathway. #[derive(Clone, PartialEq)] pub struct StreamChunk { diff --git a/src/common/src/array/stream_record.rs b/src/common/src/array/stream_record.rs index cfd474017d93..7682a62d3f33 100644 --- a/src/common/src/array/stream_record.rs +++ b/src/common/src/array/stream_record.rs @@ -27,6 +27,17 @@ pub enum RecordType { Update, } +impl RecordType { + /// Get the corresponding `Op`s for this record type. + pub fn ops(self) -> &'static [Op] { + match self { + RecordType::Insert => &[Op::Insert], + RecordType::Delete => &[Op::Delete], + RecordType::Update => &[Op::UpdateDelete, Op::UpdateInsert], + } + } +} + /// Generic type to represent a row change. #[derive(Debug, Clone, Copy)] pub enum Record { diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 8e3c58bbc2d8..5341d4bb40b7 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -138,6 +138,7 @@ serde_derive = "1" serde_json = "1" serde_with = { version = "3", features = ["json"] } simd-json = { version = "0.14.2", features = ["hints"] } +smallvec = "1" sqlx = { workspace = true } strum = "0.26" strum_macros = "0.26" diff --git a/src/connector/benches/debezium_json_parser.rs b/src/connector/benches/debezium_json_parser.rs index 73e356012aeb..239a1788bfac 100644 --- a/src/connector/benches/debezium_json_parser.rs +++ b/src/connector/benches/debezium_json_parser.rs @@ -22,6 +22,7 @@ use json_common::*; use paste::paste; use rand::Rng; use risingwave_connector::parser::{DebeziumParser, SourceStreamChunkBuilder}; +use risingwave_connector::source::SourceCtrlOpts; fn generate_debezium_json_row(rng: &mut impl Rng, change_event: &str) -> String { let source = r#"{"version":"1.7.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1639547113601,"snapshot":"true","db":"inventory","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null}"#; @@ -57,7 +58,10 @@ macro_rules! create_debezium_bench_helpers { || (block_on(DebeziumParser::new_for_test(get_descs())).unwrap(), records.clone()) , | (mut parser, records) | async move { let mut builder = - SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS); + SourceStreamChunkBuilder::new(get_descs(), SourceCtrlOpts { + chunk_size: NUM_RECORDS, + split_txn: false, + }); for record in records { let writer = builder.row_writer(); parser.parse_inner(None, record, writer).await.unwrap(); diff --git a/src/connector/benches/json_vs_plain_parser.rs b/src/connector/benches/json_vs_plain_parser.rs index b9a1139dcb03..88bcb144a728 100644 --- a/src/connector/benches/json_vs_plain_parser.rs +++ b/src/connector/benches/json_vs_plain_parser.rs @@ -23,7 +23,7 @@ use json_common::*; use old_json_parser::JsonParser; use risingwave_connector::parser::plain_parser::PlainParser; use risingwave_connector::parser::{SourceStreamChunkBuilder, SpecificParserConfig}; -use risingwave_connector::source::SourceContext; +use risingwave_connector::source::{SourceContext, SourceCtrlOpts}; // The original implementation used to parse JSON prior to #13707. mod old_json_parser { @@ -130,7 +130,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) { (parser, records.clone()) }, |(mut parser, records)| async move { - let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS); + let mut builder = SourceStreamChunkBuilder::new( + get_descs(), + SourceCtrlOpts { + chunk_size: NUM_RECORDS, + split_txn: false, + }, + ); for record in records { let writer = builder.row_writer(); parser @@ -155,7 +161,13 @@ fn bench_plain_parser_and_json_parser(c: &mut Criterion) { (parser, records.clone()) }, |(parser, records)| async move { - let mut builder = SourceStreamChunkBuilder::with_capacity(get_descs(), NUM_RECORDS); + let mut builder = SourceStreamChunkBuilder::new( + get_descs(), + SourceCtrlOpts { + chunk_size: NUM_RECORDS, + split_txn: false, + }, + ); for record in records { let writer = builder.row_writer(); parser.parse_inner(record, writer).await.unwrap(); diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index eeccf17be7a2..841dbfea4ab3 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -54,7 +54,7 @@ mod tests { BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceContext; + use crate::source::{SourceContext, SourceCtrlOpts}; fn get_payload() -> Vec> { vec![br#"t"#.to_vec(), br#"random"#.to_vec()] @@ -70,7 +70,7 @@ mod tests { .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); for payload in get_payload() { let writer = builder.row_writer(); @@ -80,7 +80,8 @@ mod tests { .unwrap(); } - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); { let (op, row) = rows.next().unwrap(); diff --git a/src/connector/src/parser/canal/simd_json_parser.rs b/src/connector/src/parser/canal/simd_json_parser.rs index 780b30654f22..9f299b45c61e 100644 --- a/src/connector/src/parser/canal/simd_json_parser.rs +++ b/src/connector/src/parser/canal/simd_json_parser.rs @@ -141,6 +141,7 @@ mod tests { use super::*; use crate::parser::SourceStreamChunkBuilder; + use crate::source::SourceCtrlOpts; #[tokio::test] async fn test_data_types() { @@ -162,12 +163,15 @@ mod tests { ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 1); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); - let writer = builder.row_writer(); - parser.parse_inner(payload.to_vec(), writer).await.unwrap(); + parser + .parse_inner(payload.to_vec(), builder.row_writer()) + .await + .unwrap(); - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let (op, row) = chunk.rows().next().unwrap(); assert_eq!(op, Op::Insert); assert_eq!(row.datum_at(0).to_owned_datum(), Some(ScalarImpl::Int32(1))); @@ -233,12 +237,15 @@ mod tests { ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); - let writer = builder.row_writer(); - parser.parse_inner(payload.to_vec(), writer).await.unwrap(); + parser + .parse_inner(payload.to_vec(), builder.row_writer()) + .await + .unwrap(); - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); @@ -287,12 +294,15 @@ mod tests { ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); - let writer = builder.row_writer(); - parser.parse_inner(payload.to_vec(), writer).await.unwrap(); + parser + .parse_inner(payload.to_vec(), builder.row_writer()) + .await + .unwrap(); - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index 574a1a5bacac..27b825ac90ed 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -14,6 +14,7 @@ use std::sync::LazyLock; +use risingwave_common::array::stream_record::RecordType; use risingwave_common::array::{ArrayBuilderImpl, Op, StreamChunk}; use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::log::LogSuppresser; @@ -21,6 +22,7 @@ use risingwave_common::types::{Datum, DatumCow, ScalarRefImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector_codec::decoder::{AccessError, AccessResult}; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; +use smallvec::SmallVec; use thiserror_ext::AsReport; use super::MessageMeta; @@ -28,69 +30,185 @@ use crate::parser::utils::{ extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta, extract_subject_from_meta, extract_timestamp_from_meta, }; -use crate::source::{SourceColumnDesc, SourceColumnType, SourceMeta}; +use crate::source::{SourceColumnDesc, SourceColumnType, SourceCtrlOpts, SourceMeta}; + +/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force +/// committed to avoid potential OOM. +const MAX_TRANSACTION_SIZE: usize = 4096; + +/// Represents an ongoing transaction. +struct Transaction { + id: Box, + len: usize, +} /// A builder for building a [`StreamChunk`] from [`SourceColumnDesc`]. pub struct SourceStreamChunkBuilder { - descs: Vec, + column_descs: Vec, + source_ctrl_opts: SourceCtrlOpts, builders: Vec, op_builder: Vec, vis_builder: BitmapBuilder, + ongoing_txn: Option, + ready_chunks: SmallVec<[StreamChunk; 1]>, } impl SourceStreamChunkBuilder { - pub fn with_capacity(descs: Vec, cap: usize) -> Self { - let builders = descs - .iter() - .map(|desc| desc.data_type.create_array_builder(cap)) - .collect(); + pub fn new(column_descs: Vec, source_ctrl_opts: SourceCtrlOpts) -> Self { + let (builders, op_builder, vis_builder) = + Self::create_builders(&column_descs, source_ctrl_opts.chunk_size); Self { - descs, + column_descs, + source_ctrl_opts, builders, - op_builder: Vec::with_capacity(cap), - vis_builder: BitmapBuilder::with_capacity(cap), + op_builder, + vis_builder, + ongoing_txn: None, + ready_chunks: SmallVec::new(), + } + } + + fn create_builders( + column_descs: &[SourceColumnDesc], + chunk_size: usize, + ) -> (Vec, Vec, BitmapBuilder) { + let reserved_capacity = chunk_size + 1; // it's possible to have an additional `U-` at the end + let builders = column_descs + .iter() + .map(|desc| desc.data_type.create_array_builder(reserved_capacity)) + .collect(); + let op_builder = Vec::with_capacity(reserved_capacity); + let vis_builder = BitmapBuilder::with_capacity(reserved_capacity); + (builders, op_builder, vis_builder) + } + + /// Begin a (CDC) transaction with the given `txn_id`. + pub fn begin_transaction(&mut self, txn_id: Box) { + if let Some(ref txn) = self.ongoing_txn { + tracing::warn!( + ongoing_txn_id = txn.id, + new_txn_id = txn_id, + "already in a transaction" + ); + } + tracing::debug!(txn_id, "begin upstream transaction"); + self.ongoing_txn = Some(Transaction { id: txn_id, len: 0 }); + } + + /// Commit the ongoing transaction with the given `txn_id`. + pub fn commit_transaction(&mut self, txn_id: Box) { + if let Some(txn) = self.ongoing_txn.take() { + if txn.id != txn_id { + tracing::warn!( + expected_txn_id = txn.id, + actual_txn_id = txn_id, + "unexpected transaction id" + ); + } + tracing::debug!(txn_id, "commit upstream transaction"); + + if self.current_chunk_len() >= self.source_ctrl_opts.chunk_size { + // if `split_txn` is on, we should've finished the chunk already + assert!(!self.source_ctrl_opts.split_txn); + self.finish_current_chunk(); + } + } else { + tracing::warn!(txn_id, "no ongoing transaction to commit"); } } + /// Check if the builder is in an ongoing transaction. + pub fn is_in_transaction(&self) -> bool { + self.ongoing_txn.is_some() + } + + /// Get a row writer for parser to write records to the builder. pub fn row_writer(&mut self) -> SourceStreamChunkRowWriter<'_> { SourceStreamChunkRowWriter { - descs: &self.descs, - builders: &mut self.builders, - op_builder: &mut self.op_builder, - vis_builder: &mut self.vis_builder, + builder: self, visible: true, // write visible rows by default row_meta: None, } } - /// Consumes the builder and returns a [`StreamChunk`]. - pub fn finish(self) -> StreamChunk { - StreamChunk::with_visibility( - self.op_builder, - self.builders + /// Write a heartbeat record to the builder. The builder will decide whether to finish the + /// current chunk or not. Currently it ensures that heartbeats are always in separate chunks. + pub fn heartbeat(&mut self, meta: MessageMeta<'_>) { + if self.current_chunk_len() > 0 { + // If there are records in the chunk, finish it first. + // If there's an ongoing transaction, `finish_current_chunk` will handle it properly. + // Note this + self.finish_current_chunk(); + } + + _ = self + .row_writer() + .invisible() + .with_meta(meta) + .do_insert(|_| Ok(Datum::None)); + self.finish_current_chunk(); // each heartbeat should be a separate chunk + } + + /// Finish and build a [`StreamChunk`] from the current pending records in the builder, + /// no matter whether the builder is in a transaction or not, `split_txn` or not. The + /// built chunk will be appended to the `ready_chunks` and the builder will be reset. + pub fn finish_current_chunk(&mut self) { + if self.op_builder.is_empty() { + return; + } + + let (builders, op_builder, vis_builder) = + Self::create_builders(&self.column_descs, self.source_ctrl_opts.chunk_size); + let chunk = StreamChunk::with_visibility( + std::mem::replace(&mut self.op_builder, op_builder), + std::mem::replace(&mut self.builders, builders) .into_iter() .map(|builder| builder.finish().into()) .collect(), - self.vis_builder.finish(), - ) + std::mem::replace(&mut self.vis_builder, vis_builder).finish(), + ); + self.ready_chunks.push(chunk); + + if let Some(ref mut txn) = self.ongoing_txn { + tracing::warn!( + txn_id = txn.id, + len = txn.len, + "splitting an ongoing transaction" + ); + txn.len = 0; + } } - /// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for - /// the builders of the next [`StreamChunk`]. - #[must_use] - pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk { - let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish` - let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); - builder.finish() + /// Consumes and returns the ready [`StreamChunk`]s. + pub fn consume_ready_chunks(&mut self) -> impl ExactSizeIterator + '_ { + self.ready_chunks.drain(..) } - pub fn len(&self) -> usize { + fn current_chunk_len(&self) -> usize { self.op_builder.len() } - pub fn is_empty(&self) -> bool { - self.op_builder.is_empty() + /// Commit a newly-written record by appending `op` and `vis` to the corresponding builders. + /// This is supposed to be called via the `row_writer` only. + fn commit_record(&mut self, op: Op, vis: bool) { + self.op_builder.push(op); + self.vis_builder.append(vis); + + let curr_chunk_size = self.current_chunk_len(); + let max_chunk_size = self.source_ctrl_opts.chunk_size; + + if let Some(ref mut txn) = self.ongoing_txn { + txn.len += 1; + + if txn.len >= MAX_TRANSACTION_SIZE + || (self.source_ctrl_opts.split_txn && curr_chunk_size >= max_chunk_size) + { + self.finish_current_chunk(); + } + } else if curr_chunk_size >= max_chunk_size { + self.finish_current_chunk(); + } } } @@ -104,10 +222,7 @@ impl SourceStreamChunkBuilder { /// - errors for non-primary key columns will be ignored and filled with default value instead; /// - other errors will be propagated. pub struct SourceStreamChunkRowWriter<'a> { - descs: &'a [SourceColumnDesc], - builders: &'a mut [ArrayBuilderImpl], - op_builder: &'a mut Vec, - vis_builder: &'a mut BitmapBuilder, + builder: &'a mut SourceStreamChunkBuilder, /// Whether the rows written by this writer should be visible in output `StreamChunk`. visible: bool, @@ -139,12 +254,7 @@ impl<'a> SourceStreamChunkRowWriter<'a> { } impl SourceStreamChunkRowWriter<'_> { - fn append_op(&mut self, op: Op) { - self.op_builder.push(op); - self.vis_builder.append(self.visible); - } - - fn do_action<'a, A: OpAction>( + fn do_action<'a, A: RowWriterAction>( &'a mut self, mut f: impl FnMut(&SourceColumnDesc) -> AccessResult>, ) -> AccessResult<()> { @@ -296,8 +406,8 @@ impl SourceStreamChunkRowWriter<'_> { // Columns that changes have been applied to. Used to rollback when an error occurs. let mut applied_columns = 0; - let result = (self.descs.iter()) - .zip_eq_fast(self.builders.iter_mut()) + let result = (self.builder.column_descs.iter()) + .zip_eq_fast(self.builder.builders.iter_mut()) .try_for_each(|(desc, builder)| { wrapped_f(desc).map(|output| { A::apply(builder, output); @@ -307,12 +417,16 @@ impl SourceStreamChunkRowWriter<'_> { match result { Ok(_) => { - A::finish(self); + // commit the action by appending `Op`s and visibility + for op in A::RECORD_TYPE.ops() { + self.builder.commit_record(*op, self.visible); + } + Ok(()) } Err(e) => { for i in 0..applied_columns { - A::rollback(&mut self.builders[i]); + A::rollback(&mut self.builder.builders[i]); } Err(e) } @@ -331,7 +445,7 @@ impl SourceStreamChunkRowWriter<'_> { where D: Into>, { - self.do_action::(|desc| f(desc).map(Into::into)) + self.do_action::(|desc| f(desc).map(Into::into)) } /// Write a `Delete` record to the [`StreamChunk`], with the given fallible closure that @@ -346,7 +460,7 @@ impl SourceStreamChunkRowWriter<'_> { where D: Into>, { - self.do_action::(|desc| f(desc).map(Into::into)) + self.do_action::(|desc| f(desc).map(Into::into)) } /// Write a `Update` record to the [`StreamChunk`], with the given fallible closure that @@ -362,27 +476,28 @@ impl SourceStreamChunkRowWriter<'_> { D1: Into>, D2: Into>, { - self.do_action::(|desc| f(desc).map(|(old, new)| (old.into(), new.into()))) + self.do_action::(|desc| f(desc).map(|(old, new)| (old.into(), new.into()))) } } -trait OpAction { +trait RowWriterAction { type Output<'a>; + const RECORD_TYPE: RecordType; fn output_for<'a>(datum: impl Into>) -> Self::Output<'a>; fn apply(builder: &mut ArrayBuilderImpl, output: Self::Output<'_>); fn rollback(builder: &mut ArrayBuilderImpl); - - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>); } -struct OpActionInsert; +struct InsertAction; -impl OpAction for OpActionInsert { +impl RowWriterAction for InsertAction { type Output<'a> = DatumCow<'a>; + const RECORD_TYPE: RecordType = RecordType::Insert; + #[inline(always)] fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { datum.into() @@ -397,18 +512,15 @@ impl OpAction for OpActionInsert { fn rollback(builder: &mut ArrayBuilderImpl) { builder.pop().unwrap() } - - #[inline(always)] - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.append_op(Op::Insert); - } } -struct OpActionDelete; +struct DeleteAction; -impl OpAction for OpActionDelete { +impl RowWriterAction for DeleteAction { type Output<'a> = DatumCow<'a>; + const RECORD_TYPE: RecordType = RecordType::Delete; + #[inline(always)] fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { datum.into() @@ -423,18 +535,15 @@ impl OpAction for OpActionDelete { fn rollback(builder: &mut ArrayBuilderImpl) { builder.pop().unwrap() } - - #[inline(always)] - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.append_op(Op::Delete); - } } -struct OpActionUpdate; +struct UpdateAction; -impl OpAction for OpActionUpdate { +impl RowWriterAction for UpdateAction { type Output<'a> = (DatumCow<'a>, DatumCow<'a>); + const RECORD_TYPE: RecordType = RecordType::Update; + #[inline(always)] fn output_for<'a>(datum: impl Into>) -> Self::Output<'a> { let datum = datum.into(); @@ -452,10 +561,4 @@ impl OpAction for OpActionUpdate { builder.pop().unwrap(); builder.pop().unwrap(); } - - #[inline(always)] - fn finish(writer: &mut SourceStreamChunkRowWriter<'_>) { - writer.append_op(Op::UpdateDelete); - writer.append_op(Op::UpdateInsert); - } } diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index 1024f22770a6..3bb16b4bf60e 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -181,6 +181,8 @@ mod tests { use super::*; use crate::parser::SourceStreamChunkBuilder; + use crate::source::SourceCtrlOpts; + #[tokio::test] async fn test_csv_without_headers() { let data = vec![ @@ -204,14 +206,15 @@ mod tests { SourceContext::dummy().into(), ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); for item in data { parser .parse_inner(item.as_bytes().to_vec(), builder.row_writer()) .await .unwrap(); } - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); { let (op, row) = rows.next().unwrap(); @@ -311,13 +314,14 @@ mod tests { SourceContext::dummy().into(), ) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); for item in data { let _ = parser .parse_inner(item.as_bytes().to_vec(), builder.row_writer()) .await; } - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); { let (op, row) = rows.next().unwrap(); diff --git a/src/connector/src/parser/debezium/avro_parser.rs b/src/connector/src/parser/debezium/avro_parser.rs index 4058f1d331c4..d5ccccb7977d 100644 --- a/src/connector/src/parser/debezium/avro_parser.rs +++ b/src/connector/src/parser/debezium/avro_parser.rs @@ -207,7 +207,7 @@ mod tests { use super::*; use crate::parser::{DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig}; - use crate::source::{SourceColumnDesc, SourceContext}; + use crate::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts}; use crate::WithOptionsSecResolved; const DEBEZIUM_AVRO_DATA: &[u8] = b"\x00\x00\x00\x00\x06\x00\x02\xd2\x0f\x0a\x53\x61\x6c\x6c\x79\x0c\x54\x68\x6f\x6d\x61\x73\x2a\x73\x61\x6c\x6c\x79\x2e\x74\x68\x6f\x6d\x61\x73\x40\x61\x63\x6d\x65\x2e\x63\x6f\x6d\x16\x32\x2e\x31\x2e\x32\x2e\x46\x69\x6e\x61\x6c\x0a\x6d\x79\x73\x71\x6c\x12\x64\x62\x73\x65\x72\x76\x65\x72\x31\xc0\xb4\xe8\xb7\xc9\x61\x00\x30\x66\x69\x72\x73\x74\x5f\x69\x6e\x5f\x64\x61\x74\x61\x5f\x63\x6f\x6c\x6c\x65\x63\x74\x69\x6f\x6e\x12\x69\x6e\x76\x65\x6e\x74\x6f\x72\x79\x00\x02\x12\x63\x75\x73\x74\x6f\x6d\x65\x72\x73\x00\x00\x20\x6d\x79\x73\x71\x6c\x2d\x62\x69\x6e\x2e\x30\x30\x30\x30\x30\x33\x8c\x06\x00\x00\x00\x02\x72\x02\x92\xc3\xe8\xb7\xc9\x61\x00"; @@ -225,15 +225,13 @@ mod tests { columns: Vec, payload: Vec, ) -> Vec<(Op, OwnedRow)> { - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); - { - let writer = builder.row_writer(); - parser - .parse_inner(None, Some(payload), writer) - .await - .unwrap(); - } - let chunk = builder.finish(); + let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); + parser + .parse_inner(None, Some(payload), builder.row_writer()) + .await + .unwrap(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); chunk .rows() .map(|(op, row_ref)| (op, row_ref.into_owned_row())) diff --git a/src/connector/src/parser/debezium/debezium_parser.rs b/src/connector/src/parser/debezium/debezium_parser.rs index fe917de7d369..d17f34add005 100644 --- a/src/connector/src/parser/debezium/debezium_parser.rs +++ b/src/connector/src/parser/debezium/debezium_parser.rs @@ -217,7 +217,7 @@ mod tests { use super::*; use crate::parser::{JsonProperties, SourceStreamChunkBuilder, TransactionControl}; - use crate::source::{ConnectorProperties, DataType}; + use crate::source::{ConnectorProperties, DataType, SourceCtrlOpts}; #[tokio::test] async fn test_parse_transaction_metadata() { @@ -249,7 +249,7 @@ mod tests { let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"35352:3962948040","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; @@ -258,7 +258,7 @@ mod tests { .parse_one_with_txn( None, Some(begin_msg.as_bytes().to_vec()), - builder.row_writer(), + dummy_builder.row_writer(), ) .await; match res { @@ -271,7 +271,7 @@ mod tests { .parse_one_with_txn( None, Some(commit_msg.as_bytes().to_vec()), - builder.row_writer(), + dummy_builder.row_writer(), ) .await; match res { @@ -321,7 +321,7 @@ mod tests { let mut parser = DebeziumParser::new(props, columns.clone(), Arc::new(source_ctx)) .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); let payload = r#"{ "payload": { "before": null, "after": { "O_ORDERKEY": 5, "O_CUSTKEY": 44485, "O_ORDERSTATUS": "F", "O_TOTALPRICE": "144659.20", "O_ORDERDATE": "1994-07-30" }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "RW_CDC_1002", "ts_ms": 1695277757000, "snapshot": "last", "db": "mydb", "sequence": null, "table": "orders_new", "server_id": 0, "gtid": null, "file": "binlog.000008", "pos": 3693, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1695277757017, "transaction": null } }"#; @@ -334,7 +334,8 @@ mod tests { .await; match res { Ok(ParseResult::Rows) => { - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); for (_, row) in chunk.rows() { let commit_ts = row.datum_at(5).unwrap().into_timestamptz(); assert_eq!(commit_ts, Timestamptz::from_millis(1695277757000).unwrap()); diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 4e3a679c3181..1d18fa5c53f0 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -144,6 +144,7 @@ mod tests { use super::*; use crate::parser::unified::debezium::extract_bson_id; use crate::parser::SourceStreamChunkBuilder; + use crate::source::SourceCtrlOpts; #[test] fn test_parse_bson_value_id_int() { let data = r#"{"_id":{"$numberInt":"2345"}}"#; @@ -183,13 +184,14 @@ mod tests { ]; let mut parser = DebeziumMongoJsonParser::new(columns.clone(), SourceContext::dummy().into()).unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3); - let writer = builder.row_writer(); + let mut builder = + SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test()); parser - .parse_inner(Some(key), Some(payload), writer) + .parse_inner(Some(key), Some(payload), builder.row_writer()) .await .unwrap(); - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); let (op, row) = rows.next().unwrap(); @@ -221,11 +223,15 @@ mod tests { DebeziumMongoJsonParser::new(columns.clone(), SourceContext::dummy().into()) .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 3); + let mut builder = + SourceStreamChunkBuilder::new(columns.clone(), SourceCtrlOpts::for_test()); - let writer = builder.row_writer(); - parser.parse_inner(None, Some(data), writer).await.unwrap(); - let chunk = builder.finish(); + parser + .parse_inner(None, Some(data), builder.row_writer()) + .await + .unwrap(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); let (op, row) = rows.next().unwrap(); assert_eq!(op, Op::Insert); diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index b0ad67530281..7e713e1a29e7 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -120,7 +120,7 @@ mod tests { DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceContext; + use crate::source::{SourceContext, SourceCtrlOpts}; fn assert_json_eq(parse_result: &Option, json_str: &str) { if let Some(ScalarImpl::Jsonb(json_val)) = parse_result { @@ -153,15 +153,13 @@ mod tests { columns: Vec, payload: Vec, ) -> Vec<(Op, OwnedRow)> { - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); - { - let writer = builder.row_writer(); - parser - .parse_inner(None, Some(payload), writer) - .await - .unwrap(); - } - let chunk = builder.finish(); + let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); + parser + .parse_inner(None, Some(payload), builder.row_writer()) + .await + .unwrap(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); chunk .rows() .map(|(op, row_ref)| (op, row_ref.into_owned_row())) @@ -509,7 +507,8 @@ mod tests { ]; let mut parser = build_parser(columns.clone()).await; - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 2); + let mut dummy_builder = + SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); let normal_values = ["111", "1", "33", "444", "555.0", "666.0"]; let overflow_values = [ @@ -530,7 +529,7 @@ mod tests { ).as_bytes().to_vec(); let res = parser - .parse_inner(None, Some(data), builder.row_writer()) + .parse_inner(None, Some(data), dummy_builder.row_writer()) .await; if i < 5 { // For other overflow, the parsing succeeds but the type conversion fails diff --git a/src/connector/src/parser/maxwell/simd_json_parser.rs b/src/connector/src/parser/maxwell/simd_json_parser.rs index e5bc7291ccf0..e3943480959c 100644 --- a/src/connector/src/parser/maxwell/simd_json_parser.rs +++ b/src/connector/src/parser/maxwell/simd_json_parser.rs @@ -23,7 +23,7 @@ mod tests { EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; - use crate::source::SourceContext; + use crate::source::{SourceContext, SourceCtrlOpts}; #[tokio::test] async fn test_json_parser() { @@ -45,7 +45,7 @@ mod tests { .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test()); let payloads = vec![ br#"{"database":"test","table":"t","type":"insert","ts":1666937996,"xid":1171,"commit":true,"data":{"id":1,"name":"tom","is_adult":0,"birthday":"2017-12-31 16:00:01"}}"#.to_vec(), br#"{"database":"test","table":"t","type":"insert","ts":1666938023,"xid":1254,"commit":true,"data":{"id":2,"name":"alex","is_adult":1,"birthday":"1999-12-31 16:00:01"}}"#.to_vec(), @@ -53,11 +53,14 @@ mod tests { ]; for payload in payloads { - let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(payload, builder.row_writer()) + .await + .unwrap(); } - let chunk = builder.finish(); + builder.finish_current_chunk(); + let chunk = builder.consume_ready_chunks().next().unwrap(); let mut rows = chunk.rows(); diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 30e1d1ef92ba..410a062f6875 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -31,7 +31,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{KAFKA_TIMESTAMP_COLUMN_NAME, TABLE_NAME_COLUMN_NAME}; use risingwave_common::log::LogSuppresser; use risingwave_common::metrics::GLOBAL_ERROR_METRICS; -use risingwave_common::types::{Datum, DatumCow, DatumRef}; +use risingwave_common::types::{DatumCow, DatumRef}; use risingwave_common::util::tracing::InstrumentStream; use risingwave_connector_codec::decoder::avro::MapHandling; use thiserror_ext::AsReport; @@ -43,13 +43,13 @@ pub use self::sql_server::{sql_server_row_to_owned_row, ScalarImplTiberiusWrappe pub use self::unified::json::{JsonAccess, TimestamptzHandling}; pub use self::unified::Access; use self::upsert_parser::UpsertParser; -use crate::error::{ConnectorError, ConnectorResult}; +use crate::error::ConnectorResult; use crate::parser::maxwell::MaxwellParser; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ BoxSourceStream, ChunkSourceStream, SourceColumnDesc, SourceColumnType, SourceContext, - SourceContextRef, SourceMessage, SourceMeta, + SourceContextRef, SourceCtrlOpts, SourceMeta, }; mod access_builder; @@ -198,80 +198,48 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { self.parse_one(key, payload, writer) .map_ok(|_| ParseResult::Rows) } - - fn append_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) { - _ = writer.do_insert(|_column| Ok(Datum::None)); - } -} - -#[try_stream(ok = Vec, error = ConnectorError)] -async fn ensure_largest_at_rate_limit(stream: BoxSourceStream, rate_limit: u32) { - #[for_await] - for batch in stream { - let mut batch = batch?; - let mut start = 0; - let end = batch.len(); - while start < end { - let next = std::cmp::min(start + rate_limit as usize, end); - yield std::mem::take(&mut batch[start..next].as_mut()).to_vec(); - start = next; - } - } } #[easy_ext::ext(SourceParserIntoStreamExt)] impl P { - /// Parse a data stream of one source split into a stream of [`StreamChunk`]. + /// Parse a stream of vectors of `SourceMessage` into a stream of [`StreamChunk`]. /// /// # Arguments - /// - `data_stream`: A data stream of one source split. - /// To be able to split multiple messages from mq, so it is not a pure byte stream + /// + /// - `msg_stream`: A stream of vectors of `SourceMessage`. /// /// # Returns /// - /// A [`ChunkSourceStream`] which is a stream of parsed messages. - pub fn into_stream(self, data_stream: BoxSourceStream) -> impl ChunkSourceStream { + /// A [`ChunkSourceStream`] which is a stream of parsed chunks. Each of the parsed chunks + /// are guaranteed to have less than or equal to `source_ctrl_opts.chunk_size` rows, unless + /// there's a large transaction and `source_ctrl_opts.split_txn` is false. + pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream { let actor_id = self.source_ctx().actor_id; let source_id = self.source_ctx().source_id.table_id(); - // Ensure chunk size is smaller than rate limit - let data_stream = if let Some(rate_limit) = &self.source_ctx().source_ctrl_opts.rate_limit { - Box::pin(ensure_largest_at_rate_limit(data_stream, *rate_limit)) - } else { - data_stream - }; - - // The parser stream will be long-lived. We use `instrument_with` here to create + // The stream will be long-lived. We use `instrument_with` here to create // a new span for the polling of each chunk. - into_chunk_stream_inner(self, data_stream) + let source_ctrl_opts = self.source_ctx().source_ctrl_opts; + into_chunk_stream_inner(self, msg_stream, source_ctrl_opts) .instrument_with(move || tracing::info_span!("source_parse_chunk", actor_id, source_id)) } } -/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force -/// committed to avoid potential OOM. -const MAX_TRANSACTION_SIZE: usize = 4096; - // TODO: when upsert is disabled, how to filter those empty payload // Currently, an err is returned for non upsert with empty payload #[try_stream(ok = StreamChunk, error = crate::error::ConnectorError)] async fn into_chunk_stream_inner( mut parser: P, - data_stream: BoxSourceStream, + msg_stream: BoxSourceStream, + source_ctrl_opts: SourceCtrlOpts, ) { - let columns = parser.columns().to_vec(); + let mut chunk_builder = + SourceStreamChunkBuilder::new(parser.columns().to_vec(), source_ctrl_opts); - let mut chunk_builder = SourceStreamChunkBuilder::with_capacity(columns, 0); - - struct Transaction { - id: Box, - len: usize, - } - let mut current_transaction = None; let mut direct_cdc_event_lag_latency_metrics = HashMap::new(); #[for_await] - for batch in data_stream { + for batch in msg_stream { // It's possible that the split is not active, which means the next batch may arrive // very lately, so we should prefer emitting all records in current batch before the end // of each iteration, instead of merging them with the next batch. An exception is when @@ -292,49 +260,30 @@ async fn into_chunk_stream_inner( // heartbeat message. Note that all messages in `batch` should belong to the same // split, so we don't have to do a split to heartbeats mapping here. - if let Some(Transaction { id, len }) = &mut current_transaction { - // if there's an ongoing transaction, something may be wrong - tracing::warn!( - id, - len, - "got a batch of empty messages during an ongoing transaction" - ); - // for the sake of simplicity, let's force emit the partial transaction chunk - if *len > 0 { - *len = 0; // reset `len` while keeping `id` - yield chunk_builder.take_and_reserve(1); // next chunk will only contain the heartbeat - } - } - - // According to the invariant we mentioned at the beginning of the `for batch` loop, - // there should be no data of previous batch in `chunk_builder`. - assert!(chunk_builder.is_empty()); - let heartbeat_msg = batch.last().unwrap(); tracing::debug!( offset = heartbeat_msg.offset, - "emitting a heartbeat message" + "handling a heartbeat message" ); - // TODO(rc): should be `chunk_builder.append_heartbeat` instead, which is simpler - parser.append_empty_row(chunk_builder.row_writer().invisible().with_meta( - MessageMeta { - meta: &heartbeat_msg.meta, - split_id: &heartbeat_msg.split_id, - offset: &heartbeat_msg.offset, - }, - )); - yield chunk_builder.take_and_reserve(batch_len); - - continue; + chunk_builder.heartbeat(MessageMeta { + meta: &heartbeat_msg.meta, + split_id: &heartbeat_msg.split_id, + offset: &heartbeat_msg.offset, + }); + + for chunk in chunk_builder.consume_ready_chunks() { + yield chunk; + } + continue; // continue to next batch } // When we reach here, there is at least one data message in the batch. We should ignore all // heartbeat messages. - let mut txn_started_in_last_batch = current_transaction.is_some(); + let mut txn_started_in_last_batch = chunk_builder.is_in_transaction(); let process_time_ms = chrono::Utc::now().timestamp_millis(); - for (i, msg) in batch.into_iter().enumerate() { + for msg in batch { if msg.is_cdc_heartbeat() { // ignore heartbeat messages continue; @@ -355,7 +304,9 @@ async fn into_chunk_stream_inner( direct_cdc_event_lag_latency.observe(lag_ms as f64); } - let old_len = chunk_builder.len(); + // Parse the message and write to the chunk builder, it's possible that the message + // contains multiple rows. When the chunk size reached the limit during parsing, the + // chunk builder may yield the chunk to `ready_chunks` and start a new chunk. match parser .parse_one_with_txn( msg.key, @@ -371,12 +322,6 @@ async fn into_chunk_stream_inner( // It's possible that parsing multiple rows in a single message PARTIALLY failed. // We still have to maintain the row number in this case. res @ (Ok(ParseResult::Rows) | Err(_)) => { - // Aggregate the number of new rows into the current transaction. - if let Some(Transaction { len, .. }) = &mut current_transaction { - let n_new_rows = chunk_builder.len() - old_len; - *len += n_new_rows; - } - if let Err(error) = res { // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern, // see #13105 @@ -401,28 +346,30 @@ async fn into_chunk_stream_inner( context.fragment_id.to_string(), ]); } + + for chunk in chunk_builder.consume_ready_chunks() { + yield chunk; + } } Ok(ParseResult::TransactionControl(txn_ctl)) => match txn_ctl { TransactionControl::Begin { id } => { - if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { - tracing::warn!(current_id, id, "already in transaction"); - } - tracing::debug!(id, "begin upstream transaction"); - current_transaction = Some(Transaction { id, len: 0 }); + chunk_builder.begin_transaction(id); } TransactionControl::Commit { id } => { - let current_id = current_transaction.as_ref().map(|t| &t.id); - if current_id != Some(&id) { - tracing::warn!(?current_id, id, "transaction id mismatch"); - } - tracing::debug!(id, "commit upstream transaction"); - current_transaction = None; + chunk_builder.commit_transaction(id); + assert!(!chunk_builder.is_in_transaction()); if txn_started_in_last_batch { - yield chunk_builder.take_and_reserve(batch_len - (i + 1)); + // If a transaction is across multiple batches, we yield the chunk + // immediately after the transaction is committed. + chunk_builder.finish_current_chunk(); txn_started_in_last_batch = false; } + + for chunk in chunk_builder.consume_ready_chunks() { + yield chunk; + } } }, @@ -450,22 +397,12 @@ async fn into_chunk_stream_inner( } } - if let Some(Transaction { len, id }) = &mut current_transaction { - // in transaction, check whether it's too large - if *len > MAX_TRANSACTION_SIZE { - // force commit - tracing::warn!( - id, - len, - "transaction is larger than {MAX_TRANSACTION_SIZE} rows, force commit" - ); - *len = 0; // reset `len` while keeping `id` - yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint - } - // TODO(rc): we will have better chunk size control later - } else if !chunk_builder.is_empty() { - // not in transaction, yield the chunk now - yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint + // Finish the remaining records in the batch. + if !chunk_builder.is_in_transaction() { + chunk_builder.finish_current_chunk(); + } + for chunk in chunk_builder.consume_ready_chunks() { + yield chunk; } } } @@ -476,7 +413,7 @@ pub enum EncodingType { Value, } -/// The entrypoint of parsing. It parses [`SourceMessage`] stream (byte stream) into [`StreamChunk`] stream. +/// The entrypoint of parsing. It parses `SourceMessage` stream (byte stream) into [`StreamChunk`] stream. /// Used by [`crate::source::into_chunk_stream`]. #[derive(Debug)] pub enum ByteStreamSourceParserImpl { @@ -490,7 +427,7 @@ pub enum ByteStreamSourceParserImpl { } impl ByteStreamSourceParserImpl { - /// Converts [`SourceMessage`] stream into [`StreamChunk`] stream. + /// Converts `SourceMessage` vec stream into [`StreamChunk`] stream. pub fn into_stream(self, msg_stream: BoxSourceStream) -> impl ChunkSourceStream + Unpin { #[auto_enum(futures03::Stream)] let stream = match self { @@ -558,10 +495,11 @@ impl ByteStreamSourceParserImpl { /// Test utilities for [`ByteStreamSourceParserImpl`]. #[cfg(test)] pub mod test_utils { - use futures::StreamExt as _; - use itertools::Itertools as _; + use futures::StreamExt; + use itertools::Itertools; use super::*; + use crate::source::SourceMessage; #[easy_ext::ext(ByteStreamSourceParserImplTestExt)] pub(crate) impl ByteStreamSourceParserImpl { diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index fe0113798e41..fcd8c9f308b6 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -217,7 +217,7 @@ mod tests { use super::*; use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl}; use crate::source::cdc::DebeziumCdcMeta; - use crate::source::{ConnectorProperties, DataType, SourceMessage, SplitId}; + use crate::source::{ConnectorProperties, DataType, SourceCtrlOpts, SourceMessage, SplitId}; #[tokio::test] async fn test_emit_transactional_chunk() { @@ -252,7 +252,11 @@ mod tests { let mut transactional = false; // for untransactional source, we expect emit a chunk for each message batch let message_stream = source_message_stream(transactional); - let chunk_stream = crate::parser::into_chunk_stream_inner(parser, message_stream.boxed()); + let chunk_stream = crate::parser::into_chunk_stream_inner( + parser, + message_stream.boxed(), + SourceCtrlOpts::for_test(), + ); let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) .into_iter() .collect(); @@ -289,7 +293,11 @@ mod tests { // for transactional source, we expect emit a single chunk for the transaction transactional = true; let message_stream = source_message_stream(transactional); - let chunk_stream = crate::parser::into_chunk_stream_inner(parser, message_stream.boxed()); + let chunk_stream = crate::parser::into_chunk_stream_inner( + parser, + message_stream.boxed(), + SourceCtrlOpts::for_test(), + ); let output: std::result::Result, _> = block_on(chunk_stream.collect::>()) .into_iter() .collect(); @@ -406,7 +414,7 @@ mod tests { ) .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); // "id":"35352:3962948040" Postgres transaction ID itself and LSN of given operation separated by colon, i.e. the format is txID:LSN let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; @@ -451,8 +459,8 @@ mod tests { _ => panic!("unexpected parse result: {:?}", res), } - let output = builder.take_and_reserve(10); - assert_eq!(0, output.cardinality()); + builder.finish_current_chunk(); + assert!(builder.consume_ready_chunks().next().is_none()); } #[tokio::test] @@ -483,7 +491,7 @@ mod tests { ) .await .unwrap(); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut dummy_builder = SourceStreamChunkBuilder::new(columns, SourceCtrlOpts::for_test()); let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_0", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#; let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( @@ -501,7 +509,7 @@ mod tests { .parse_one_with_txn( None, Some(msg.as_bytes().to_vec()), - builder.row_writer().with_meta(msg_meta), + dummy_builder.row_writer().with_meta(msg_meta), ) .await; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 3525152a35c2..1742334ee088 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -142,18 +142,28 @@ pub type SourceEnumeratorContextRef = Arc; /// The max size of a chunk yielded by source stream. pub const MAX_CHUNK_SIZE: usize = 1024; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct SourceCtrlOpts { /// The max size of a chunk yielded by source stream. pub chunk_size: usize, - /// Rate limit of source - pub rate_limit: Option, + /// Whether to allow splitting a transaction into multiple chunks to meet the `max_chunk_size`. + pub split_txn: bool, } // The options in `SourceCtrlOpts` are so important that we don't want to impl `Default` for it, // so that we can prevent any unintentional use of the default value. impl !Default for SourceCtrlOpts {} +impl SourceCtrlOpts { + #[cfg(test)] + pub fn for_test() -> Self { + SourceCtrlOpts { + chunk_size: 256, + split_txn: false, + } + } +} + #[derive(Debug)] pub struct SourceEnumeratorContext { pub info: SourceEnumeratorInfo, @@ -226,7 +236,7 @@ impl SourceContext { Arc::new(SourceMetrics::default()), SourceCtrlOpts { chunk_size: MAX_CHUNK_SIZE, - rate_limit: None, + split_txn: false, }, ConnectorProperties::default(), None, diff --git a/src/stream/src/common/rate_limit.rs b/src/stream/src/common/rate_limit.rs index 6144ae6810eb..57c518c971c9 100644 --- a/src/stream/src/common/rate_limit.rs +++ b/src/stream/src/common/rate_limit.rs @@ -13,9 +13,9 @@ // limitations under the License. /// Get the rate-limited max chunk size. -pub(crate) fn limited_chunk_size(rate_limit: Option) -> usize { +pub(crate) fn limited_chunk_size(rate_limit_burst: Option) -> usize { let config_chunk_size = crate::config::chunk_size(); - rate_limit - .map(|limit| config_chunk_size.min(limit as usize)) + rate_limit_burst + .map(|burst| config_chunk_size.min(burst as usize)) .unwrap_or(config_chunk_size) } diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 356540884e16..ea665d0cf2f0 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -22,14 +22,12 @@ use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::bail; use risingwave_common::catalog::ColumnDesc; -use risingwave_common::row::RowExt; -use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::parser::{ ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties, ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig, }; use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl}; -use risingwave_connector::source::{SourceColumnDesc, SourceContext}; +use risingwave_connector::source::{SourceColumnDesc, SourceContext, SourceCtrlOpts}; use rw_futures_util::pausable; use thiserror_ext::AsReport; use tracing::Instrument; @@ -822,24 +820,31 @@ async fn parse_debezium_chunk( parser: &mut DebeziumParser, chunk: &StreamChunk, ) -> StreamExecutorResult { - // here we transform the input chunk in (payload varchar, _rw_offset varchar, _rw_table_name varchar) schema + // here we transform the input chunk in `(payload varchar, _rw_offset varchar, _rw_table_name varchar)` schema // to chunk with downstream table schema `info.schema` of MergeNode contains the schema of the // table job with `_rw_offset` in the end // see `gen_create_table_plan_for_cdc_source` for details - let mut builder = - SourceStreamChunkBuilder::with_capacity(parser.columns().to_vec(), chunk.capacity()); - // The schema of input chunk (payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id) + // use `SourceStreamChunkBuilder` for convenience + let mut builder = SourceStreamChunkBuilder::new( + parser.columns().to_vec(), + SourceCtrlOpts { + chunk_size: chunk.capacity(), + split_txn: false, + }, + ); + + // The schema of input chunk `(payload varchar, _rw_offset varchar, _rw_table_name varchar, _row_id)` // We should use the debezium parser to parse the first column, // then chain the parsed row with `_rw_offset` row to get a new row. - let payloads = chunk.data_chunk().project(vec![0].as_slice()); - let offset_columns = chunk.data_chunk().project(vec![1].as_slice()); + let payloads = chunk.data_chunk().project(&[0]); + let offsets = chunk.data_chunk().project(&[1]).compact(); // TODO: preserve the transaction semantics for payload in payloads.rows() { let ScalarRefImpl::Jsonb(jsonb_ref) = payload.datum_at(0).expect("payload must exist") else { - unreachable!("payload must be jsonb"); + panic!("payload must be jsonb"); }; parser @@ -851,31 +856,23 @@ async fn parse_debezium_chunk( .await .unwrap(); } + builder.finish_current_chunk(); - let parsed_chunk = builder.finish(); - let (data_chunk, ops) = parsed_chunk.into_parts(); - - // concat the rows in the parsed chunk with the _rw_offset column, we should also retain the Op column - let mut new_rows = Vec::with_capacity(chunk.capacity()); - let offset_columns = offset_columns.compact(); - for (data_row, offset_row) in data_chunk - .rows_with_holes() - .zip_eq_fast(offset_columns.rows_with_holes()) - { - let combined = data_row.chain(offset_row); - new_rows.push(combined); - } + let parsed_chunk = { + let mut iter = builder.consume_ready_chunks(); + assert_eq!(1, iter.len()); + iter.next().unwrap() + }; + assert_eq!(parsed_chunk.capacity(), chunk.capacity()); // each payload is expected to generate one row + let (ops, mut columns, vis) = parsed_chunk.into_inner(); + // note that `vis` is not necessarily the same as the original chunk's visibilities - let data_types = parser - .columns() - .iter() - .map(|col| col.data_type.clone()) - .chain(std::iter::once(DataType::Varchar)) // _rw_offset column - .collect_vec(); + // concat the rows in the parsed chunk with the `_rw_offset` column + columns.extend(offsets.into_parts().0); Ok(StreamChunk::from_parts( ops, - DataChunk::from_rows(new_rows.as_slice(), data_types.as_slice()), + DataChunk::from_parts(columns.into(), vis), )) } diff --git a/src/stream/src/executor/row_id_gen.rs b/src/stream/src/executor/row_id_gen.rs index 216d62432191..5b59181af8db 100644 --- a/src/stream/src/executor/row_id_gen.rs +++ b/src/stream/src/executor/row_id_gen.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::array::stream_chunk::Ops; use risingwave_common::array::{Array, ArrayBuilder, ArrayRef, Op, SerialArrayBuilder}; use risingwave_common::bitmap::Bitmap; use risingwave_common::hash::VnodeBitmapExt; @@ -57,7 +56,7 @@ impl RowIdGenExecutor { fn gen_row_id_column_by_op( &mut self, column: &ArrayRef, - ops: Ops<'_>, + ops: &'_ [Op], vis: &Bitmap, ) -> ArrayRef { let len = column.len(); diff --git a/src/stream/src/executor/source/fetch_executor.rs b/src/stream/src/executor/source/fetch_executor.rs index f1866379aee3..650bd3ece867 100644 --- a/src/stream/src/executor/source/fetch_executor.rs +++ b/src/stream/src/executor/source/fetch_executor.rs @@ -182,7 +182,7 @@ impl FsFetchExecutor { source_desc.metrics.clone(), SourceCtrlOpts { chunk_size: limited_chunk_size(self.rate_limit_rps), - rate_limit: self.rate_limit_rps, + split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn }, source_desc.source.config.clone(), None, diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index b6c8e888f26a..a1c2fe503c7a 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -105,7 +105,7 @@ impl FsSourceExecutor { source_desc.metrics.clone(), SourceCtrlOpts { chunk_size: limited_chunk_size(self.rate_limit_rps), - rate_limit: self.rate_limit_rps, + split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn }, source_desc.source.config.clone(), None, diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 4613b794055f..1cb1dd95f38a 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -297,7 +297,7 @@ impl SourceBackfillExecutorInner { source_desc.metrics.clone(), SourceCtrlOpts { chunk_size: limited_chunk_size(self.rate_limit_rps), - rate_limit: self.rate_limit_rps, + split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn }, source_desc.source.config.clone(), None, diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 96dd4bd3f581..545f281279f2 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -203,7 +203,7 @@ impl SourceExecutor { source_desc.metrics.clone(), SourceCtrlOpts { chunk_size: limited_chunk_size(self.rate_limit_rps), - rate_limit: self.rate_limit_rps, + split_txn: self.rate_limit_rps.is_some(), // when rate limiting, we may split txn }, source_desc.source.config.clone(), schema_change_tx,