Skip to content

Commit

Permalink
replicators: Implement MySQL JSON print
Browse files Browse the repository at this point in the history
MySQL has its own implementation for printing JSON, which is
implicitly called when executing a SELECT on a table that has a JSON
column. This implementation adds spaces after the colon and comma in
each key-value pair.
During snapshot we retrieve the data via SELECT * FROM table, which
returns the JSON data in the MySQL own format.
When MySQL sends data via binlog, it sends the json without the extra
spaces. Our current implementation to lookup a row, requires a full
match of the row values, which fails when the JSON data is not in the
MySQL format.

This commit adds a new function to the MySQL connector to print
the JSON data using MySQL format. This function is used in the
replicator to transform the JSON data before converting it to DfValue.

Fixes: REA-4724
Fixes: #1361

Release-Note-Core: Fixes an issue where the MySQL replicator fails to
  match rows when the table has JSON fields, causing the record
  to become stalled.

Change-Id: I62e1885b2ee08f498e621e52ac4044bcc1664019
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7896
Tested-by: Buildkite CI
Reviewed-by: Johnathan Davis <jcd@readyset.io>
  • Loading branch information
altmannmarcelo committed Aug 27, 2024
1 parent 7c63128 commit 7094779
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 1 deletion.
3 changes: 2 additions & 1 deletion replicators/src/mysql_connector/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use replication_offset::ReplicationOffset;
use rust_decimal::Decimal;
use tracing::{error, info, warn};

use super::utils::mysql_json_print;
use crate::mysql_connector::utils::mysql_pad_collation_column;
use crate::noria_adapter::{Connector, ReplicationAction};

Expand Down Expand Up @@ -1111,7 +1112,7 @@ fn binlog_row_to_noria_row(
BinlogValue::Jsonb(val) => {
let json: Result<serde_json::Value, _> = val.clone().try_into(); // urgh no TryFrom impl
match json {
Ok(val) => Ok(DfValue::from(val.to_string())),
Ok(val) => Ok(DfValue::from(mysql_json_print(&val))),
Err(JsonbToJsonError::Opaque) => match val {
jsonb::Value::Opaque(opaque_val) => {
// As far as I can *tell* Opaque is just a raw JSON string, which we
Expand Down
58 changes: 58 additions & 0 deletions replicators/src/mysql_connector/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::string::FromUtf8Error;
use std::sync::Arc;

use itertools::Itertools;
use mysql_common::collations::{self, Collation, CollationId};
use mysql_srv::ColumnType;
use readyset_data::DfValue;
Expand Down Expand Up @@ -58,6 +59,39 @@ pub fn parse_mysql_version(version: &str) -> mysql_async::Result<u32> {
Ok(major * 10000 + minor * 100 + patch)
}

/// MySQL has its own implementation of json print that deserializes the json and adds a space after
/// the ":" in between key->value and adds a space after the "," in between key->value pairs.
/// This function is a re-implementation of that.
/// More details can be found at [1].
///
/// # Arguments
/// * `json` - the json value to print
///
/// # Returns
/// This function returns a string that represents the printed json
///
/// [1]: https://linear.app/readyset/issue/REA-4724/
pub fn mysql_json_print(json: &serde_json::Value) -> String {
match json {
serde_json::Value::Object(obj) => {
let res = obj
.iter()
.map(|(key, value)| format!("\"{}\": {}", key, mysql_json_print(value)))
.join(", ");
format!("{{{}}}", res)
}
serde_json::Value::Array(arr) => {
let res = arr.iter().map(mysql_json_print).join(", ");

format!("[{}]", res)
}
serde_json::Value::String(s) => format!("\"{}\"", s),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
serde_json::Value::Null => "null".to_string(),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -76,4 +110,28 @@ mod tests {
let version_number = parse_mysql_version(version).unwrap();
assert_eq!(version_number, 80023);
}

#[test]
fn test_mysql_json_print() {
let json = serde_json::json!({
"key1": "value1",
"key2": {
"key3": "value3",
"key4": {
"key5": "value5"
}
},
"key6": [
"value6",
{
"key7": "value7"
}
]
});
let pretty_json = mysql_json_print(&json);
assert_eq!(
pretty_json,
r#"{"key1": "value1", "key2": {"key3": "value3", "key4": {"key5": "value5"}}, "key6": ["value6", {"key7": "value7"}]}"#
);
}
}
75 changes: 75 additions & 0 deletions replicators/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3542,3 +3542,78 @@ async fn mysql_handle_dml_in_statement_events() {
assert_eq!(caches.len(), 0);
shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial_test::serial]
#[slow]
async fn mysql_replicate_json_field() {
readyset_tracing::init_test_logging();
let url = mysql_url();
let mut client = DbConnection::connect(&url).await.unwrap();

client
.query(
"DROP TABLE IF EXISTS j_table;
CREATE TABLE j_table (id INT PRIMARY KEY, data JSON, c CHAR(1));
INSERT INTO j_table (id, data, c) VALUES (1, '{\"age\":30,\"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}', 'A');
INSERT INTO j_table (id, data, c) VALUES (2, NULL, 'A');",
)
.await
.unwrap();

let (mut ctx, shutdown_tx) = TestHandle::start_noria(url.to_string(), None)
.await
.unwrap();
ctx.notification_channel
.as_mut()
.unwrap()
.snapshot_completed()
.await
.unwrap();

// Check that the row is replicated correctly
ctx.check_results(
"j_table",
"Snapshot1",
&[
&[
DfValue::Int(1),
DfValue::Text(
"{\"age\": 30, \"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}"
.into(),
),
DfValue::Text("A".into()),
],
&[DfValue::Int(2), DfValue::None, DfValue::Text("A".into())],
],
)
.await
.unwrap();

// Update the JSON data
client
.query("UPDATE j_table SET c = 'B' WHERE id = 1 OR id = 2;")
.await
.unwrap();

// Check that the update is replicated correctly
ctx.check_results(
"j_table",
"Replication",
&[
&[
DfValue::Int(1),
DfValue::Text(
"{\"age\": 30, \"car\": [\"Ford\", \"BMW\", \"Fiat\"], \"name\": \"John\"}"
.into(),
),
DfValue::Text("B".into()),
],
&[DfValue::Int(2), DfValue::None, DfValue::Text("B".into())],
],
)
.await
.unwrap();

shutdown_tx.shutdown().await;
}

0 comments on commit 7094779

Please sign in to comment.