diff --git a/replicators/src/mysql_connector/connector.rs b/replicators/src/mysql_connector/connector.rs index e989215b85..ebe21bba11 100644 --- a/replicators/src/mysql_connector/connector.rs +++ b/replicators/src/mysql_connector/connector.rs @@ -26,6 +26,7 @@ use replication_offset::ReplicationOffset; use rust_decimal::Decimal; use tracing::{error, info, warn}; +use super::utils::mysql_json_print; use crate::mysql_connector::utils::mysql_pad_collation_column; use crate::noria_adapter::{Connector, ReplicationAction}; @@ -1111,7 +1112,7 @@ fn binlog_row_to_noria_row( BinlogValue::Jsonb(val) => { let json: Result = val.clone().try_into(); // urgh no TryFrom impl match json { - Ok(val) => Ok(DfValue::from(val.to_string())), + Ok(val) => Ok(DfValue::from(mysql_json_print(&val))), Err(JsonbToJsonError::Opaque) => match val { jsonb::Value::Opaque(opaque_val) => { // As far as I can *tell* Opaque is just a raw JSON string, which we diff --git a/replicators/src/mysql_connector/utils.rs b/replicators/src/mysql_connector/utils.rs index 26c80ff088..50d04a8047 100644 --- a/replicators/src/mysql_connector/utils.rs +++ b/replicators/src/mysql_connector/utils.rs @@ -1,6 +1,7 @@ use std::string::FromUtf8Error; use std::sync::Arc; +use itertools::Itertools; use mysql_common::collations::{self, Collation, CollationId}; use mysql_srv::ColumnType; use readyset_data::DfValue; @@ -58,6 +59,39 @@ pub fn parse_mysql_version(version: &str) -> mysql_async::Result { Ok(major * 10000 + minor * 100 + patch) } +/// MySQL has its own implementation of json print that deserializes the json and adds a space after +/// the ":" in between key->value and adds a space after the "," in between key->value pairs. +/// This function is a re-implementation of that. +/// More details can be found at [1]. +/// +/// # Arguments +/// * `json` - the json value to print +/// +/// # Returns +/// This function returns a string that represents the printed json +/// +/// [1]: https://linear.app/readyset/issue/REA-4724/ +pub fn mysql_json_print(json: &serde_json::Value) -> String { + match json { + serde_json::Value::Object(obj) => { + let res = obj + .iter() + .map(|(key, value)| format!("\"{}\": {}", key, mysql_json_print(value))) + .join(", "); + format!("{{{}}}", res) + } + serde_json::Value::Array(arr) => { + let res = arr.iter().map(mysql_json_print).join(", "); + + format!("[{}]", res) + } + serde_json::Value::String(s) => format!("\"{}\"", s), + serde_json::Value::Number(n) => n.to_string(), + serde_json::Value::Bool(b) => b.to_string(), + serde_json::Value::Null => "null".to_string(), + } +} + #[cfg(test)] mod tests { use super::*; @@ -76,4 +110,28 @@ mod tests { let version_number = parse_mysql_version(version).unwrap(); assert_eq!(version_number, 80023); } + + #[test] + fn test_mysql_json_print() { + let json = serde_json::json!({ + "key1": "value1", + "key2": { + "key3": "value3", + "key4": { + "key5": "value5" + } + }, + "key6": [ + "value6", + { + "key7": "value7" + } + ] + }); + let pretty_json = mysql_json_print(&json); + assert_eq!( + pretty_json, + r#"{"key1": "value1", "key2": {"key3": "value3", "key4": {"key5": "value5"}}, "key6": ["value6", {"key7": "value7"}]}"# + ); + } } diff --git a/replicators/tests/tests.rs b/replicators/tests/tests.rs index be6a4a7770..4af3b09036 100644 --- a/replicators/tests/tests.rs +++ b/replicators/tests/tests.rs @@ -3542,3 +3542,78 @@ async fn mysql_handle_dml_in_statement_events() { assert_eq!(caches.len(), 0); shutdown_tx.shutdown().await; } + +#[tokio::test(flavor = "multi_thread")] +#[serial_test::serial] +#[slow] +async fn mysql_replicate_json_field() { + readyset_tracing::init_test_logging(); + let url = mysql_url(); + let mut client = DbConnection::connect(&url).await.unwrap(); + + client + .query( + "DROP TABLE IF EXISTS j_table; + CREATE TABLE j_table (id INT PRIMARY KEY, data JSON, c CHAR(1)); + INSERT INTO j_table (id, data, c) VALUES (1, '{\"age\":30,\"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}', 'A'); + INSERT INTO j_table (id, data, c) VALUES (2, NULL, 'A');", + ) + .await + .unwrap(); + + let (mut ctx, shutdown_tx) = TestHandle::start_noria(url.to_string(), None) + .await + .unwrap(); + ctx.notification_channel + .as_mut() + .unwrap() + .snapshot_completed() + .await + .unwrap(); + + // Check that the row is replicated correctly + ctx.check_results( + "j_table", + "Snapshot1", + &[ + &[ + DfValue::Int(1), + DfValue::Text( + "{\"age\": 30, \"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}" + .into(), + ), + DfValue::Text("A".into()), + ], + &[DfValue::Int(2), DfValue::None, DfValue::Text("A".into())], + ], + ) + .await + .unwrap(); + + // Update the JSON data + client + .query("UPDATE j_table SET c = 'B' WHERE id = 1 OR id = 2;") + .await + .unwrap(); + + // Check that the update is replicated correctly + ctx.check_results( + "j_table", + "Replication", + &[ + &[ + DfValue::Int(1), + DfValue::Text( + "{\"age\": 30, \"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}" + .into(), + ), + DfValue::Text("B".into()), + ], + &[DfValue::Int(2), DfValue::None, DfValue::Text("B".into())], + ], + ) + .await + .unwrap(); + + shutdown_tx.shutdown().await; +}