Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Support tuple parameters like (x, y) = $1 #1264

Open
wants to merge 1 commit into
base: Idc6e6952f2a3cd027f4c58082ac770571d3f1cbc
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions logictests/tuples.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
statement ok
CREATE TABLE t (x int, y int, z int)

statement ok
INSERT INTO t (x, y, z) VALUES (1, 1, 3), (1, 2, -1), (3, 2, 1)

query III rowsort
SELECT * FROM t WHERE (x, y, z) = (1, 2, -1);
----
1
2
-1

statement error
SELECT * FROM t WHERE (x, y, z) = (1, 2);
10 changes: 9 additions & 1 deletion readyset-adapter/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ where
data: DB::PrepareData<'_>,
event: &mut QueryExecutionEvent,
) -> Result<PrepareResultInner<DB>, DB::Error> {
let do_noria = select_meta.should_do_noria;
let do_noria = &select_meta.should_do_noria;
let do_migrate = select_meta.must_migrate;

let up_prep: OptionFuture<_> = self
Expand Down Expand Up @@ -1356,6 +1356,7 @@ where
ticket: Option<Timestamp>,
event: &mut QueryExecutionEvent,
) -> ReadySetResult<QueryResult<'a, DB>> {
println!("execute noria");
use noria_connector::PrepareResult::*;

event.destination = Some(QueryDestination::Readyset);
Expand Down Expand Up @@ -1391,6 +1392,7 @@ where
event: &mut QueryExecutionEvent,
is_fallback: bool,
) -> Result<QueryResult<'a, DB>, DB::Error> {
println!("executing upstream");
let upstream = upstream.as_mut().ok_or_else(|| {
ReadySetError::Internal("This condition requires an upstream connector".to_string())
})?;
Expand Down Expand Up @@ -1453,6 +1455,7 @@ where
"Error received from noria, sending query to fallback");
}

println!("here1");
Self::execute_upstream(upstream, upstream_prep, params, exec_meta, event, true)
.await
}
Expand Down Expand Up @@ -1667,6 +1670,7 @@ where

let result = match &cached_statement.prep.inner {
PrepareResultInner::Noria(prep) => {
println!("executing noria");
Self::execute_noria(noria, prep, params, ticket, &mut event)
.await
.map_err(Into::into)
Expand All @@ -1678,9 +1682,11 @@ where
.query_status_cache
.inlined_cache_miss(cached_statement.as_view_request()?, params.to_vec())
}
println!("here2");
Self::execute_upstream(upstream, prep, params, exec_meta, &mut event, false).await
}
PrepareResultInner::Both(.., uprep) if should_fallback => {
println!("here3");
Self::execute_upstream(upstream, uprep, params, exec_meta, &mut event, false).await
}
PrepareResultInner::Both(nprep, uprep) => {
Expand Down Expand Up @@ -1868,6 +1874,7 @@ where
}
}
// Now migrate the new query
println!("doing adapter rewrites");
adapter_rewrites::process_query(&mut stmt, self.noria.rewrite_params())?;
let migration_state = match self
.noria
Expand Down Expand Up @@ -2506,6 +2513,7 @@ where
event: &mut QueryExecutionEvent,
processed_query_params: ProcessedQueryParams,
) -> Result<QueryResult<'a, DB>, DB::Error> {
println!("adhoc");
let mut status = status.unwrap_or(QueryStatus {
migration_state: MigrationState::Unsupported,
execution_info: None,
Expand Down
2 changes: 2 additions & 0 deletions readyset-adapter/src/backend/noria_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,7 @@ impl NoriaConnector {
create_if_not_exist: bool,
override_schema_search_path: Option<Vec<SqlIdentifier>>,
) -> ReadySetResult<PrepareResult> {
println!("noria prepare select");
// extract parameter columns *for the client*
// note that we have to do this *before* processing the query, otherwise the
// client will be confused about the number of parameters it's supposed to
Expand Down Expand Up @@ -1484,6 +1485,7 @@ impl NoriaConnector {
ticket: Option<Timestamp>,
event: &mut readyset_client_metrics::QueryExecutionEvent,
) -> ReadySetResult<QueryResult<'_>> {
println!("execute select");
let start = Instant::now();
let (qname, processed_query_params, params) = match ctx {
ExecuteSelectContext::Prepared {
Expand Down
1 change: 1 addition & 0 deletions readyset-client/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,7 @@ impl ReaderHandle {
blocking_read: bool,
dialect: Dialect,
) -> ReadySetResult<ViewQuery> {
dbg!(&raw_keys);
trace!("select::lookup");

let (keys, filters) = if raw_keys.is_empty() {
Expand Down
63 changes: 63 additions & 0 deletions readyset-psql/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2076,6 +2076,69 @@ WHERE
shutdown_tx.shutdown().await;
}

async fn assert_last_statement_readyset(conn: &Client) {
let (destination, status) = match conn
.simple_query("EXPLAIN LAST STATEMENT")
.await
.unwrap()
.into_iter()
.next()
.unwrap()
{
SimpleQueryMessage::Row(row) => (
row.get(0).unwrap().to_owned(),
row.get(1).unwrap().to_owned(),
),
_ => panic!(),
};

assert_eq!(destination, "readyset");
assert_eq!(status, "ok");
}

#[tokio::test(flavor = "multi_thread")]
async fn tuple_cache_reuse() {
let (opts, _handle, shutdown_tx) = setup().await;
let conn = connect(opts).await;

conn.simple_query("DROP TABLE IF EXISTS t").await.unwrap();
conn.simple_query("CREATE TABLE t (x int, y int, z int)")
.await
.unwrap();
conn.simple_query("INSERT INTO t (x, y, z) VALUES (1, 1, 3), (1, 2, -1), (3, 2, 1)")
.await
.unwrap();

eventually!(conn
.simple_query("CREATE CACHE FROM SELECT * FROM t WHERE (x, y, z) = $1")
.await
.is_ok());

eventually! {
let len = conn
.query("SELECT * FROM t WHERE (x, y, z) = (1, 1, 3)", &[])
.await
.unwrap()
.len();
len == 1
}

assert_last_statement_readyset(&conn).await;

eventually! {
let len = conn
.query("SELECT * FROM t WHERE x = 3 AND y = 2 AND z = 1", &[])
.await
.unwrap()
.len();
len == 1
}

assert_last_statement_readyset(&conn).await;

shutdown_tx.shutdown().await;
}

mod http_tests {
use super::*;
#[tokio::test(flavor = "multi_thread")]
Expand Down
1 change: 1 addition & 0 deletions readyset-server/src/controller/mir_to_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ pub(super) fn mir_node_to_flow_parts(
internal!("Encountered dependent join when lowering to dataflow")
}
MirNodeInner::ViewKey { ref key } => {
println!("mir to flow");
return Err(ReadySetError::UnsupportedPlaceholders {
placeholders: key.mapped_ref(
|ViewKeyColumn {
Expand Down
1 change: 1 addition & 0 deletions readyset-server/src/controller/sql/mir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2308,6 +2308,7 @@ impl SqlToMirConverter {
&[final_node],
)
} else if let Ok(placeholders) = unsupported_placeholders.try_into() {
println!("named query to mir");
return Err(ReadySetError::UnsupportedPlaceholders { placeholders });
} else {
final_node
Expand Down
182 changes: 182 additions & 0 deletions readyset-sql-passes/src/adapter_rewrites/expand_tuples.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use std::mem;

use nom_sql::analysis::visit_mut::{self, VisitorMut};
use nom_sql::{BinaryOperator, Expr, ItemPlaceholder, Literal, SelectStatement};
use readyset_errors::{invalid_query_err, ReadySetError};

#[derive(Default)]
struct ExpandTuplesVisitor;

impl<'ast> VisitorMut<'ast> for ExpandTuplesVisitor {
type Error = ReadySetError;

// TODO ethan we need to make sure this only touches the where clause; seems like visit_expr is
// called in HAVING clauses as well
fn visit_expr(&mut self, expression: &'ast mut Expr) -> Result<(), Self::Error> {
match expression {
Expr::BinaryOp {
// TODO ethan test that using ROW(...) works in actual postgres/mysql
lhs: box Expr::Row {
exprs: lhs_exprs, ..
},
op: BinaryOperator::Equal,
rhs: box Expr::Row {
exprs: rhs_exprs, ..
},
} => {
debug_assert!(!lhs_exprs.is_empty());
debug_assert!(!rhs_exprs.is_empty());

if lhs_exprs.len() == rhs_exprs.len() {
if !lhs_exprs.is_empty() {
let last_expr = Expr::BinaryOp {
lhs: Box::new(lhs_exprs.pop().unwrap()),
op: BinaryOperator::Equal,
rhs: Box::new(rhs_exprs.pop().unwrap()),
};
let lhs_iter = lhs_exprs.iter_mut();
let rhs_iter = rhs_exprs.iter_mut();

// create new expr
// iterate over zipped lhs and rhs, anding everything tg
let new_expression = lhs_iter
.zip(rhs_iter)
.map(|(lhs, rhs)| Expr::BinaryOp {
lhs: Box::new(lhs.take()),
op: BinaryOperator::Equal,
rhs: Box::new(rhs.take()),
})
.rfold(last_expr, |acc, expr| Expr::BinaryOp {
lhs: Box::new(expr),
op: BinaryOperator::And,
rhs: Box::new(acc),
});
let _ = mem::replace(expression, new_expression);
}
} else {
return Err(invalid_query_err!(
"Cannot compare row expressions of unequal lengths"
));
}
}
// TODO ethan test with empty Row; also do same for above
Expr::BinaryOp {
lhs: box Expr::Row {
exprs: lhs_exprs, ..
},
op: BinaryOperator::Equal,
rhs: box Expr::Literal(Literal::Placeholder(_)),
} if lhs_exprs.iter().all(|expr| matches!(expr, Expr::Column(_))) => {
let last_expr = lhs_exprs.pop().unwrap();
let lhs_iter = lhs_exprs.iter_mut();

let last_expr = Expr::BinaryOp {
lhs: Box::new(last_expr),
op: BinaryOperator::Equal,
rhs: Box::new(Expr::Literal(Literal::Placeholder(
ItemPlaceholder::QuestionMark,
))),
};

let new_expression = lhs_iter
.map(|lhs| Expr::BinaryOp {
lhs: Box::new(lhs.take()),
op: BinaryOperator::Equal,
rhs: Box::new(Expr::Literal(Literal::Placeholder(
ItemPlaceholder::QuestionMark,
))),
})
.rfold(last_expr, |acc, expr| Expr::BinaryOp {
lhs: Box::new(expr),
op: BinaryOperator::And,
rhs: Box::new(acc),
});
let _ = mem::replace(expression, new_expression);
}
_ => (),
};

visit_mut::walk_expr(self, expression)?;
Ok(())
}
}

pub(super) fn expand_tuples(query: &mut SelectStatement) {
let mut visitor = ExpandTuplesVisitor;
visitor.visit_select_statement(query).unwrap();
}

#[cfg(test)]
mod tests {
use nom_sql::{Dialect, DialectDisplay};

use super::*;

const DIALECT: Dialect = Dialect::PostgreSQL;

fn try_parse_select_statement(q: &str) -> Result<SelectStatement, String> {
nom_sql::parse_select_statement(DIALECT, q)
}

fn parse_select_statement(q: &str) -> SelectStatement {
try_parse_select_statement(q).unwrap()
}

fn assert_expected(query: &str, expected: &str) {
let mut actual = parse_select_statement(query);
super::expand_tuples(&mut actual);

assert_eq!(
expected,
actual.where_clause.unwrap().display(DIALECT).to_string(),
);
}

#[test]
fn test_placeholder() {
assert_expected(
"SELECT * FROM t WHERE (w, x) = $1",
r#"(("w" = ?) AND ("x" = ?))"#,
);
}

#[test]
fn test_placeholder_many_columns() {
assert_expected(
"SELECT * FROM t WHERE (w, x, y, z) = $1",
r#"(("w" = ?) AND (("x" = ?) AND (("y" = ?) AND ("z" = ?))))"#,
);
}

#[test]
fn test_placeholder_row_syntax() {
assert_expected(
"SELECT * FROM t WHERE ROW(w, x, y, z) = $1",
r#"(("w" = ?) AND (("x" = ?) AND (("y" = ?) AND ("z" = ?))))"#,
);
}

#[test]
fn test_tuple_equal_tuple() {
assert_expected(
"SELECT * FROM t WHERE (w, x) = (1, 2)",
r#"(("w" = 1) AND ("x" = 2))"#,
);
}

#[test]
fn test_tuple_equal_tuple_many_columns() {
assert_expected(
"SELECT * FROM t WHERE (w, x, y, z) = (1, 2, 3, 4)",
r#"(("w" = 1) AND (("x" = 2) AND (("y" = 3) AND ("z" = 4))))"#,
);
}

#[test]
fn test_tuple_equal_tuple_row_syntax() {
assert_expected(
"SELECT * FROM t WHERE ROW(w, x, y, z) = ROW(1, 2, 3, 4)",
r#"(("w" = 1) AND (("x" = 2) AND (("y" = 3) AND ("z" = 4))))"#,
);
}
}
Loading