diff --git a/src/connector/src/parser/chunk_builder.rs b/src/connector/src/parser/chunk_builder.rs index 80a7c38b84450..d8f7037a73143 100644 --- a/src/connector/src/parser/chunk_builder.rs +++ b/src/connector/src/parser/chunk_builder.rs @@ -79,7 +79,7 @@ impl SourceStreamChunkBuilder { /// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for /// the builders of the next [`StreamChunk`]. #[must_use] - pub fn take(&mut self, next_cap: usize) -> StreamChunk { + pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk { let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish` let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap)); builder.finish() diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index fbda50b2fd52f..e6d4d5c5588b1 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -199,7 +199,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static { .map_ok(|_| ParseResult::Rows) } - fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) { + fn append_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) { _ = writer.do_insert(|_column| Ok(Datum::None)); } } @@ -250,7 +250,7 @@ impl P { /// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force /// committed to avoid potential OOM. -const MAX_ROWS_FOR_TRANSACTION: usize = 4096; +const MAX_TRANSACTION_SIZE: usize = 4096; // TODO: when upsert is disabled, how to filter those empty payload // Currently, an err is returned for non upsert with empty payload @@ -261,8 +261,7 @@ async fn into_chunk_stream_inner( ) { let columns = parser.columns().to_vec(); - let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut chunk_builder = SourceStreamChunkBuilder::with_capacity(columns, 0); struct Transaction { id: Box, @@ -273,45 +272,71 @@ async fn into_chunk_stream_inner( #[for_await] for batch in data_stream { + // It's possible that the split is not active, which means the next batch may arrive + // very lately, so we should prefer emitting all records in current batch before the end + // of each iteration, instead of merging them with the next batch. An exception is when + // a transaction is not committed yet, in which yield when the transaction is committed. + let batch = batch?; let batch_len = batch.len(); - let mut last_batch_not_yielded = false; - if let Some(Transaction { len, id }) = &mut current_transaction { - // Dirty state. The last batch is not yielded due to uncommitted transaction. - if *len > MAX_ROWS_FOR_TRANSACTION { - // Large transaction. Force commit. + if batch_len == 0 { + continue; + } + + if batch.iter().all(|msg| msg.is_cdc_heartbeat()) { + // This `.iter().all(...)` will short-circuit after seeing the first `false`, so in + // normal cases, this should only involve a constant time cost. + + // Now we know that there is no data message in the batch, let's just emit the latest + // heartbeat message. Note that all messages in `batch` should belong to the same + // split, so we don't have to do a split to heartbeats mapping here. + + if let Some(Transaction { id, len }) = &mut current_transaction { + // if there's an ongoing transaction, something may be wrong tracing::warn!( id, len, - "transaction is larger than {MAX_ROWS_FOR_TRANSACTION} rows, force commit" + "got a batch of empty messages during an ongoing transaction" ); - *len = 0; // reset `len` while keeping `id` - yield builder.take(batch_len); - } else { - last_batch_not_yielded = true + // for the sake of simplicity, let's force emit the partial transaction chunk + if *len > 0 { + *len = 0; // reset `len` while keeping `id` + yield chunk_builder.take_and_reserve(1); // next chunk will only contain the heartbeat + } } - } else { - // Clean state. Reserve capacity for the builder. - assert!(builder.is_empty()); - let _ = builder.take(batch_len); + + // According to the invariant we mentioned at the beginning of the `for batch` loop, + // there should be no data of previous batch in `chunk_builder`. + assert!(chunk_builder.is_empty()); + + let heartbeat_msg = batch.last().unwrap(); + tracing::debug!( + offset = heartbeat_msg.offset, + "emitting a heartbeat message" + ); + // TODO(rc): should be `chunk_builder.append_heartbeat` instead, which is simpler + parser.append_empty_row(chunk_builder.row_writer().invisible().with_meta( + MessageMeta { + meta: &heartbeat_msg.meta, + split_id: &heartbeat_msg.split_id, + offset: &heartbeat_msg.offset, + }, + )); + yield chunk_builder.take_and_reserve(batch_len); + + continue; } + // When we reach here, there is at least one data message in the batch. We should ignore all + // heartbeat messages. + + let mut txn_started_in_last_batch = current_transaction.is_some(); let process_time_ms = chrono::Utc::now().timestamp_millis(); + for (i, msg) in batch.into_iter().enumerate() { - if msg.key.is_none() && msg.payload.is_none() { - tracing::debug!( - offset = msg.offset, - "got a empty message, could be a heartbeat" - ); - // Emit an empty invisible row for the heartbeat message. - parser.emit_empty_row(heartbeat_builder.row_writer().invisible().with_meta( - MessageMeta { - meta: &msg.meta, - split_id: &msg.split_id, - offset: &msg.offset, - }, - )); + if msg.is_cdc_heartbeat() { + // ignore heartbeat messages continue; } @@ -330,12 +355,12 @@ async fn into_chunk_stream_inner( direct_cdc_event_lag_latency.observe(lag_ms as f64); } - let old_len = builder.len(); + let old_len = chunk_builder.len(); match parser .parse_one_with_txn( msg.key, msg.payload, - builder.row_writer().with_meta(MessageMeta { + chunk_builder.row_writer().with_meta(MessageMeta { meta: &msg.meta, split_id: &msg.split_id, offset: &msg.offset, @@ -348,7 +373,7 @@ async fn into_chunk_stream_inner( res @ (Ok(ParseResult::Rows) | Err(_)) => { // Aggregate the number of new rows into the current transaction. if let Some(Transaction { len, .. }) = &mut current_transaction { - let n_new_rows = builder.len() - old_len; + let n_new_rows = chunk_builder.len() - old_len; *len += n_new_rows; } @@ -394,9 +419,9 @@ async fn into_chunk_stream_inner( tracing::debug!(id, "commit upstream transaction"); current_transaction = None; - if last_batch_not_yielded { - yield builder.take(batch_len - (i + 1)); - last_batch_not_yielded = false; + if txn_started_in_last_batch { + yield chunk_builder.take_and_reserve(batch_len - (i + 1)); + txn_started_in_last_batch = false; } } }, @@ -425,16 +450,22 @@ async fn into_chunk_stream_inner( } } - // emit heartbeat for each message batch - // we must emit heartbeat chunk before the data chunk, - // otherwise the source offset could be backward due to the heartbeat - if !heartbeat_builder.is_empty() { - yield heartbeat_builder.take(0); - } - - // If we are not in a transaction, we should yield the chunk now. - if current_transaction.is_none() { - yield builder.take(0); + if let Some(Transaction { len, id }) = &mut current_transaction { + // in transaction, check whether it's too large + if *len > MAX_TRANSACTION_SIZE { + // force commit + tracing::warn!( + id, + len, + "transaction is larger than {MAX_TRANSACTION_SIZE} rows, force commit" + ); + *len = 0; // reset `len` while keeping `id` + yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint + } + // TODO(rc): we will have better chunk size control later + } else if !chunk_builder.is_empty() { + // not in transaction, yield the chunk now + yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint } } } diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 86a1d3d831b6c..1144fdf00fe4b 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -451,7 +451,7 @@ mod tests { _ => panic!("unexpected parse result: {:?}", res), } - let output = builder.take(10); + let output = builder.take_and_reserve(10); assert_eq!(0, output.cardinality()); } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index ec01730ee463a..40b547579f788 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -637,6 +637,11 @@ impl SourceMessage { meta: SourceMeta::Empty, } } + + /// Check whether the source message is a CDC heartbeat message. + pub fn is_cdc_heartbeat(&self) -> bool { + self.key.is_none() && self.payload.is_none() + } } #[derive(Debug, Clone)]