Skip to content

Commit

Permalink
sql: Parameterization of aggregates with WHERE IN
Browse files Browse the repository at this point in the history
Allow parameterization of a WHERE IN clause when the SELECT
contains aggregations. The aggregation across the keys in from the
WHERE IN still must execute as a post-lookup operation.

Release-Note-Core: Allow parameterizing WHERE IN
  clauses when the query contains aggregations.
Change-Id: Iaf28fb394c4964e5d7e9869b3741fc2017c492d5
  • Loading branch information
jasobrown-rs committed May 13, 2024
1 parent 7aa9c35 commit a4bd73f
Show file tree
Hide file tree
Showing 40 changed files with 119 additions and 26 deletions.
2 changes: 2 additions & 0 deletions readyset-clustertest/src/readyset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ async fn replicated_readers() {
let mut deployment = DeploymentBuilder::new(DatabaseType::MySQL, "ct_replicated_readers")
.with_servers(2, ServerParams::default())
.reader_replicas(2)
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down Expand Up @@ -292,6 +293,7 @@ async fn replicated_readers_with_unions() {
DeploymentBuilder::new(DatabaseType::MySQL, "ct_replicated_readers_with_unions")
.with_servers(2, ServerParams::default())
.reader_replicas(2)
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions readyset-clustertest/src/readyset_mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,7 @@ async fn enable_experimental_placeholder_inlining() {
.with_servers(1, ServerParams::default())
.explicit_migrations(500)
.enable_experimental_placeholder_inlining()
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions readyset-clustertest/src/readyset_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ async fn embedded_readers_adapters_lt_replicas() {
.with_servers(1, ServerParams::default().no_readers())
.embedded_readers(true)
.allow_full_materialization()
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down Expand Up @@ -308,6 +309,7 @@ async fn reader_domain_panic_handling() {
.with_servers(1, ServerParams::default().no_readers())
.embedded_readers(true)
.allow_full_materialization()
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down Expand Up @@ -425,6 +427,7 @@ async fn base_domain_panic_handling() {
.with_servers(1, ServerParams::default().no_readers())
.embedded_readers(true)
.allow_full_materialization()
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion readyset-logictest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Parse {
}

/// Run a test script, or all test scripts in a directory, against either ReadySet or a reference
/// MySQL database
/// upstream database
#[derive(Parser)]
struct Verify {
#[command(flatten)]
Expand Down
41 changes: 36 additions & 5 deletions readyset-server/src/controller/sql/mir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2238,10 +2238,41 @@ impl SqlToMirConverter {
project_order,
);

let aggregates = if view_key.index_type != IndexType::HashMap {
post_lookup_aggregates(query_graph, query_name)?
let post_lookup_aggregates = if view_key.index_type == IndexType::HashMap {
// If we have aggregates under the IndexType::HashMap, they aren't necessarily
// post-lookup operations. For example, `select sum(col2) from t where col1 =
// ?`, the aggregate will be handled in the dataflow graph.
// But if the query originally contained a `where col1 in
// (?, ?)`, the aggregate does need to be executed as a
// post-lookup. Adding a post-lookup is necessary for `where in` for correctly
// aggregating results, but a mild perf impediment for aggregates with a simple
// equality (we'll run an aggregation on a single row). However, we've lost the
// "did this come from a `where in` information" way above, as it's rewritten in
// the adapter. Hence, to avoid that penalty on all users,
// only add the post-lookup to users who have opted in to
// using post-lookups.
if self.config.allow_post_lookup {
match post_lookup_aggregates(query_graph, query_name) {
Ok(aggs) => aggs,
// This part is a hack. When we get an ReadySetError::Unsupported,
// that is because the aggregate was a AVG, COUNT(DISTINCT..), or
// SUM(DISTINCT..). We can only support those (currently!) when the
// query contained an equality clause, and
// not a `where in` clause (that was
// rewritten as an equality). As mentioned above, we don't know which
// one the original query had, thus this
// code opts to preserve the functionality
// of the simple equality. Once again, this only applies if the user
// opted in to using "experimental"
// post-lookups.
Err(ReadySetError::Unsupported(..)) => None,
Err(e) => return Err(e),
}
} else {
None
}
} else {
None
post_lookup_aggregates(query_graph, query_name)?
};

let order_by = query_graph
Expand All @@ -2252,7 +2283,7 @@ impl SqlToMirConverter {
let limit = query_graph.pagination.as_ref().map(|p| p.limit);

if !self.config.allow_post_lookup
&& (aggregates.is_some() || order_by.is_some() || limit.is_some())
&& (post_lookup_aggregates.is_some() || order_by.is_some() || limit.is_some())
{
unsupported!("Queries which perform operations post-lookup are not supported");
}
Expand All @@ -2269,7 +2300,7 @@ impl SqlToMirConverter {
limit,
returned_cols: Some(returned_cols),
default_row: query_graph.default_row.clone(),
aggregates,
aggregates: post_lookup_aggregates,
},
),
&[leaf_project_reorder_node],
Expand Down
5 changes: 3 additions & 2 deletions readyset-server/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4337,6 +4337,7 @@ async fn correct_nested_view_schema() {
("swvc.id".into(), DfType::Int),
("swvc.content".into(), DfType::DEFAULT_TEXT),
("swvc.vc".into(), DfType::BigInt),
("swvc.story".into(), DfType::Int),
];
assert_eq!(
q.schema()
Expand Down Expand Up @@ -8306,9 +8307,9 @@ async fn reroutes_count() {
assert_eq!(
r2.into_vec(),
vec![
vec![DfValue::Int(2)],
vec![DfValue::Int(1)],
vec![DfValue::Int(1)],
vec![DfValue::Int(2)]
vec![DfValue::Int(1)]
]
);

Expand Down
8 changes: 3 additions & 5 deletions readyset-sql-passes/src/adapter_rewrites/autoparameterize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ struct AutoParameterizeVisitor {
autoparameterize_equals: bool,
autoparameterize_ranges: bool,
out: Vec<(usize, Literal)>,
has_aggregates: bool,
in_supported_position: bool,
param_index: usize,
query_depth: u8,
Expand Down Expand Up @@ -116,7 +115,7 @@ impl<'ast> VisitorMut<'ast> for AutoParameterizeVisitor {
e,
Expr::Literal(lit) if !matches!(lit, Literal::Placeholder(_))
)
}) && !self.has_aggregates =>
}) =>
{
if self.autoparameterize_equals {
let exprs = mem::replace(
Expand Down Expand Up @@ -365,7 +364,6 @@ pub fn auto_parameterize_query(
let mut visitor = AutoParameterizeVisitor {
autoparameterize_equals,
autoparameterize_ranges,
has_aggregates: query.contains_aggregate_select(),
..Default::default()
};
#[allow(clippy::unwrap_used)] // error is !, which can never be returned
Expand Down Expand Up @@ -555,8 +553,8 @@ mod tests {
fn in_with_aggregates() {
test_auto_parameterize_mysql(
"SELECT count(*) FROM users WHERE id = 1 AND x IN (1, 2)",
"SELECT count(*) FROM users WHERE id = ? AND x IN (1, 2)",
vec![(0, 1.into())],
"SELECT count(*) FROM users WHERE id = ? AND x IN (?, ?)",
vec![(0, 1.into()), (1, 1.into()), (2, 2.into())],
);
}

Expand Down
15 changes: 2 additions & 13 deletions readyset-sql-passes/src/adapter_rewrites/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,25 +351,14 @@ impl<'ast> VisitorMut<'ast> for CollapseWhereInVisitor {
/// by regular filter nodes in dataflow
fn collapse_where_in(query: &mut SelectStatement) -> ReadySetResult<Vec<RewrittenIn>> {
let mut res = vec![];
let distinct = query.distinct;
let has_aggregates = query.contains_aggregate_select();

if let Some(ref mut w) = query.where_clause {
let mut visitor = CollapseWhereInVisitor::default();
visitor.visit_expr(w)?;
res = visitor.out;

// When a `SELECT` statement contains aggregates, such as `SUM` or `COUNT` (or `DISTINCT`,
// which is implemented via COUNT), we can't use placeholders, as those will aggregate key
// lookups into a multi row response, as opposed to a single row response required by
// aggregates. We could support this pretty easily, but for now it's not in-scope
if !res.is_empty() {
if has_aggregates {
unsupported!("Aggregates with parameterized IN are not supported");
}
if distinct {
unsupported!("DISTINCT with parameterized IN is not supported");
}
if !res.is_empty() && query.distinct {
unsupported!("DISTINCT with parameterized IN is not supported");
}
}
Ok(res)
Expand Down
15 changes: 15 additions & 0 deletions system-benchmarks/bench_data/mysql/aggregates/count_group_by.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
distributions:
- name: ids
range:
start: 0
end: 100
zipf: 1.15
queries:
- spec: >-
SELECT count(*) FROM ints WHERE v = ? GROUP BY i
params:
- sql_type: bigint
distribution: ids
col: 0
weight: 1
migrate: true
8 changes: 8 additions & 0 deletions system-benchmarks/bench_data/mysql/aggregates/simple.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SET @ints_rows = 50000;
SET @num_ints = 97;


CREATE TABLE ints (
i int primary key,
v int COMMENT 'UNIFORM 0 @num_ints'
) COMMENT = 'ROWS=@ints_rows';
15 changes: 15 additions & 0 deletions system-benchmarks/bench_data/mysql/aggregates/simple_count.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
distributions:
- name: ids
range:
start: 0
end: 100
zipf: 1.15
queries:
- spec: >-
SELECT count(*) FROM ints WHERE v = ?
params:
- sql_type: bigint
distribution: ids
col: 0
weight: 1
migrate: true
15 changes: 15 additions & 0 deletions system-benchmarks/bench_data/mysql/aggregates/simple_sum.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
distributions:
- name: ids
range:
start: 0
end: 100
zipf: 1.15
queries:
- spec: >-
SELECT sum(v) FROM ints WHERE i = ?
params:
- sql_type: bigint
distribution: ids
col: 0
weight: 1
migrate: true
15 changes: 15 additions & 0 deletions system-benchmarks/bench_data/mysql/aggregates/sum_group_by.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
distributions:
- name: ids
range:
start: 0
end: 100
zipf: 1.15
queries:
- spec: >-
SELECT sum(v) FROM ints WHERE v = ? GROUP BY i
params:
- sql_type: bigint
distribution: ids
col: 0
weight: 1
migrate: true

0 comments on commit a4bd73f

Please sign in to comment.