Skip to content

Commit

Permalink
fix(sink): fix mogodb write error handling (#19869)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Dec 24, 2024
1 parent 2d75798 commit b7e62d8
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions src/connector/src/sink/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,68 @@ use crate::sink::{
SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT,
};

<<<<<<< HEAD
=======
mod send_bulk_write_command_future {
use core::future::Future;

use anyhow::anyhow;
use mongodb::bson::Document;
use mongodb::Database;

use crate::sink::{Result, SinkError};

pub(super) type SendBulkWriteCommandFuture = impl Future<Output = Result<()>> + 'static;

pub(super) fn send_bulk_write_commands(
db: Database,
upsert: Option<Document>,
delete: Option<Document>,
) -> SendBulkWriteCommandFuture {
async move {
if let Some(upsert) = upsert {
send_bulk_write_command(db.clone(), upsert).await?;
}
if let Some(delete) = delete {
send_bulk_write_command(db, delete).await?;
}
Ok(())
}
}

async fn send_bulk_write_command(db: Database, command: Document) -> Result<()> {
let result = db.run_command(command, None).await.map_err(|err| {
SinkError::Mongodb(anyhow!(err).context(format!(
"sending bulk write command failed, database: {}",
db.name()
)))
})?;

if let Ok(ok) = result.get_i32("ok")
&& ok != 1
{
return Err(SinkError::Mongodb(anyhow!("bulk write write errors")));
}

if let Ok(write_errors) = result.get_array("writeErrors") {
return Err(SinkError::Mongodb(anyhow!(
"bulk write respond with write errors: {:?}",
write_errors,
)));
}

if let Ok(write_concern_error) = result.get_array("writeConcernError") {
return Err(SinkError::Mongodb(anyhow!(
"bulk write respond with write errors: {:?}",
write_concern_error,
)));
}

Ok(())
}
}

>>>>>>> 1b9b5a7973 (fix(sink): fix mogodb write error handling (#19869))
pub const MONGODB_SINK: &str = "mongodb";

// 65536 seems like a reasonable limit, but we may consider setting this limit to 100,000,
Expand Down

0 comments on commit b7e62d8

Please sign in to comment.