Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Add GROUP BY benchmark and short circuit #1255

Open
wants to merge 3 commits into
base: Iaf28fb394c4964e5d7e9869b3741fc2017c492d5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions readyset-client/src/view/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl ResultIterator {
let limit = adapter_limit.or(*limit); // Limit specifies total number of results to return

let inner = match (order_by, aggregates) {
(_, _) if data.len() == 1 => ResultIteratorInner::MultiKey(MultiKeyIterator::new(data)),
// No specific order is required, simply iterate over each result set one by one
(None, None) => ResultIteratorInner::MultiKey(MultiKeyIterator::new(data)),
(Some(order_by), None) => {
Expand Down
22 changes: 22 additions & 0 deletions readyset-clustertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ pub struct ServerStartParams {
wait_for_failpoint: bool,
/// Whether to allow full materialization nodes or not
allow_full_materialization: bool,
/// Whether to allow post-lookup operations
enable_post_lookups: bool,
}

/// Set of parameters defining an entire cluster's topology.
Expand Down Expand Up @@ -451,6 +453,8 @@ pub struct DeploymentBuilder {
enable_experimental_placeholder_inlining: bool,
/// Whether to allow fully materialized nodes or not
allow_full_materialization: bool,
/// Whether to enable post-lookup operations
enable_post_lookups: bool,
/// Whether to allow prometheus metrics
prometheus_metrics: bool,
/// How to execute the adapter and server processes.
Expand Down Expand Up @@ -528,6 +532,7 @@ impl DeploymentBuilder {
cleanup: false,
enable_experimental_placeholder_inlining: false,
allow_full_materialization: false,
enable_post_lookups: false,
prometheus_metrics: true,
}
}
Expand Down Expand Up @@ -685,6 +690,12 @@ impl DeploymentBuilder {
self
}

/// Enable post-lookup operations
pub fn enable_post_lookups(mut self) -> Self {
self.enable_post_lookups = true;
self
}

/// Sets whether or not to automatically create inlined caches for queries with unsupported
/// placeholders
pub fn enable_experimental_placeholder_inlining(mut self) -> Self {
Expand Down Expand Up @@ -729,6 +740,7 @@ impl DeploymentBuilder {
enable_experimental_placeholder_inlining: self.enable_experimental_placeholder_inlining,
deployment_mode: self.deployment_mode,
allow_full_materialization: self.allow_full_materialization,
enable_post_lookups: self.enable_post_lookups,
prometheus_metrics: self.prometheus_metrics,
}
}
Expand All @@ -750,6 +762,7 @@ impl DeploymentBuilder {
auto_restart: self.auto_restart,
wait_for_failpoint,
allow_full_materialization: self.allow_full_materialization,
enable_post_lookups: self.enable_post_lookups,
}
}

Expand Down Expand Up @@ -1546,6 +1559,8 @@ pub struct AdapterStartParams {
deployment_mode: DeploymentMode,
/// Whether or not to allow full materializations
allow_full_materialization: bool,
/// Whether or not to enable post-lookup operations
enable_post_lookups: bool,
/// Whether to enable prometheus metrics
prometheus_metrics: bool,
}
Expand Down Expand Up @@ -1592,6 +1607,9 @@ async fn start_server(
if server_start_params.allow_full_materialization {
builder = builder.allow_full_materialization();
}
if server_start_params.enable_post_lookups {
builder = builder.enable_post_lookups();
}
let addr = Url::parse(&format!("http://127.0.0.1:{}", port)).unwrap();
Ok(ServerHandle {
addr,
Expand Down Expand Up @@ -1688,6 +1706,10 @@ async fn start_adapter(
builder = builder.allow_full_materialization();
}

if params.enable_post_lookups {
builder = builder.enable_post_lookups();
}

if params.prometheus_metrics {
builder = builder.prometheus_metrics()
}
Expand Down
2 changes: 2 additions & 0 deletions readyset-clustertest/src/readyset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ async fn replicated_readers() {
let mut deployment = DeploymentBuilder::new(DatabaseType::MySQL, "ct_replicated_readers")
.with_servers(2, ServerParams::default())
.reader_replicas(2)
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down Expand Up @@ -292,6 +293,7 @@ async fn replicated_readers_with_unions() {
DeploymentBuilder::new(DatabaseType::MySQL, "ct_replicated_readers_with_unions")
.with_servers(2, ServerParams::default())
.reader_replicas(2)
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions readyset-clustertest/src/readyset_mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,7 @@ async fn enable_experimental_placeholder_inlining() {
.with_servers(1, ServerParams::default())
.explicit_migrations(500)
.enable_experimental_placeholder_inlining()
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions readyset-clustertest/src/readyset_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ async fn embedded_readers_adapters_lt_replicas() {
.with_servers(1, ServerParams::default().no_readers())
.embedded_readers(true)
.allow_full_materialization()
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down Expand Up @@ -308,6 +309,7 @@ async fn reader_domain_panic_handling() {
.with_servers(1, ServerParams::default().no_readers())
.embedded_readers(true)
.allow_full_materialization()
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down Expand Up @@ -425,6 +427,7 @@ async fn base_domain_panic_handling() {
.with_servers(1, ServerParams::default().no_readers())
.embedded_readers(true)
.allow_full_materialization()
.enable_post_lookups()
.start()
.await
.unwrap();
Expand Down
8 changes: 8 additions & 0 deletions readyset-clustertest/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ impl ReadysetServerBuilder {
pub fn allow_full_materialization(self) -> Self {
self.push_arg("--allow-full-materialization")
}

pub fn enable_post_lookups(self) -> Self {
self.push_arg("--enable-experimental-post-lookup")
}
}

/// Manages running a readyset binary with the correct arguments.
Expand Down Expand Up @@ -349,6 +353,10 @@ impl AdapterBuilder {
self.push_arg("--allow-full-materialization")
}

pub fn enable_post_lookups(self) -> Self {
self.push_arg("--enable-experimental-post-lookup")
}

pub fn prometheus_metrics(self) -> Self {
self.push_arg("--prometheus-metrics")
}
Expand Down
2 changes: 1 addition & 1 deletion readyset-logictest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Parse {
}

/// Run a test script, or all test scripts in a directory, against either ReadySet or a reference
/// MySQL database
/// upstream database
#[derive(Parser)]
struct Verify {
#[command(flatten)]
Expand Down
41 changes: 36 additions & 5 deletions readyset-server/src/controller/sql/mir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2238,10 +2238,41 @@ impl SqlToMirConverter {
project_order,
);

let aggregates = if view_key.index_type != IndexType::HashMap {
post_lookup_aggregates(query_graph, query_name)?
let post_lookup_aggregates = if view_key.index_type == IndexType::HashMap {
// If we have aggregates under the IndexType::HashMap, they aren't necessarily
// post-lookup operations. For example, `select sum(col2) from t where col1 =
// ?`, the aggregate will be handled in the dataflow graph.
// But if the query originally contained a `where col1 in
// (?, ?)`, the aggregate does need to be executed as a
// post-lookup. Adding a post-lookup is necessary for `where in` for correctly
// aggregating results, but a mild perf impediment for aggregates with a simple
// equality (we'll run an aggregation on a single row). However, we've lost the
// "did this come from a `where in` information" way above, as it's rewritten in
// the adapter. Hence, to avoid that penalty on all users,
// only add the post-lookup to users who have opted in to
// using post-lookups.
if self.config.allow_post_lookup {
match post_lookup_aggregates(query_graph, query_name) {
Ok(aggs) => aggs,
// This part is a hack. When we get an ReadySetError::Unsupported,
// that is because the aggregate was a AVG, COUNT(DISTINCT..), or
// SUM(DISTINCT..). We can only support those (currently!) when the
// query contained an equality clause, and
// not a `where in` clause (that was
// rewritten as an equality). As mentioned above, we don't know which
// one the original query had, thus this
// code opts to preserve the functionality
// of the simple equality. Once again, this only applies if the user
// opted in to using "experimental"
// post-lookups.
Err(ReadySetError::Unsupported(..)) => None,
Err(e) => return Err(e),
}
} else {
None
}
} else {
None
post_lookup_aggregates(query_graph, query_name)?
};

let order_by = query_graph
Expand All @@ -2252,7 +2283,7 @@ impl SqlToMirConverter {
let limit = query_graph.pagination.as_ref().map(|p| p.limit);

if !self.config.allow_post_lookup
&& (aggregates.is_some() || order_by.is_some() || limit.is_some())
&& (post_lookup_aggregates.is_some() || order_by.is_some() || limit.is_some())
{
unsupported!("Queries which perform operations post-lookup are not supported");
}
Expand All @@ -2269,7 +2300,7 @@ impl SqlToMirConverter {
limit,
returned_cols: Some(returned_cols),
default_row: query_graph.default_row.clone(),
aggregates,
aggregates: post_lookup_aggregates,
},
),
&[leaf_project_reorder_node],
Expand Down
5 changes: 3 additions & 2 deletions readyset-server/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4337,6 +4337,7 @@ async fn correct_nested_view_schema() {
("swvc.id".into(), DfType::Int),
("swvc.content".into(), DfType::DEFAULT_TEXT),
("swvc.vc".into(), DfType::BigInt),
("swvc.story".into(), DfType::Int),
];
assert_eq!(
q.schema()
Expand Down Expand Up @@ -8306,9 +8307,9 @@ async fn reroutes_count() {
assert_eq!(
r2.into_vec(),
vec![
vec![DfValue::Int(2)],
vec![DfValue::Int(1)],
vec![DfValue::Int(1)],
vec![DfValue::Int(2)]
vec![DfValue::Int(1)]
]
);

Expand Down
8 changes: 3 additions & 5 deletions readyset-sql-passes/src/adapter_rewrites/autoparameterize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ struct AutoParameterizeVisitor {
autoparameterize_equals: bool,
autoparameterize_ranges: bool,
out: Vec<(usize, Literal)>,
has_aggregates: bool,
in_supported_position: bool,
param_index: usize,
query_depth: u8,
Expand Down Expand Up @@ -116,7 +115,7 @@ impl<'ast> VisitorMut<'ast> for AutoParameterizeVisitor {
e,
Expr::Literal(lit) if !matches!(lit, Literal::Placeholder(_))
)
}) && !self.has_aggregates =>
}) =>
{
if self.autoparameterize_equals {
let exprs = mem::replace(
Expand Down Expand Up @@ -365,7 +364,6 @@ pub fn auto_parameterize_query(
let mut visitor = AutoParameterizeVisitor {
autoparameterize_equals,
autoparameterize_ranges,
has_aggregates: query.contains_aggregate_select(),
..Default::default()
};
#[allow(clippy::unwrap_used)] // error is !, which can never be returned
Expand Down Expand Up @@ -555,8 +553,8 @@ mod tests {
fn in_with_aggregates() {
test_auto_parameterize_mysql(
"SELECT count(*) FROM users WHERE id = 1 AND x IN (1, 2)",
"SELECT count(*) FROM users WHERE id = ? AND x IN (1, 2)",
vec![(0, 1.into())],
"SELECT count(*) FROM users WHERE id = ? AND x IN (?, ?)",
vec![(0, 1.into()), (1, 1.into()), (2, 2.into())],
);
}

Expand Down
15 changes: 2 additions & 13 deletions readyset-sql-passes/src/adapter_rewrites/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,25 +351,14 @@ impl<'ast> VisitorMut<'ast> for CollapseWhereInVisitor {
/// by regular filter nodes in dataflow
fn collapse_where_in(query: &mut SelectStatement) -> ReadySetResult<Vec<RewrittenIn>> {
let mut res = vec![];
let distinct = query.distinct;
let has_aggregates = query.contains_aggregate_select();

if let Some(ref mut w) = query.where_clause {
let mut visitor = CollapseWhereInVisitor::default();
visitor.visit_expr(w)?;
res = visitor.out;

// When a `SELECT` statement contains aggregates, such as `SUM` or `COUNT` (or `DISTINCT`,
// which is implemented via COUNT), we can't use placeholders, as those will aggregate key
// lookups into a multi row response, as opposed to a single row response required by
// aggregates. We could support this pretty easily, but for now it's not in-scope
if !res.is_empty() {
if has_aggregates {
unsupported!("Aggregates with parameterized IN are not supported");
}
if distinct {
unsupported!("DISTINCT with parameterized IN is not supported");
}
if !res.is_empty() && query.distinct {
unsupported!("DISTINCT with parameterized IN is not supported");
}
}
Ok(res)
Expand Down
23 changes: 23 additions & 0 deletions system-benchmarks/bench_data/mysql/aggregates/agg_queries.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
distributions:
- name: ids
range:
start: 0
end: 100
zipf: 1.15
queries:
- spec: >-
SELECT sum(v) name FROM ints WHERE i = ?
params:
- sql_type: bigint
distribution: ids
col: 0
weight: 1
migrate: true
- spec: >-
SELECT count(*) name FROM ints WHERE i = ?
params:
- sql_type: bigint
distribution: ids
col: 0
weight: 1
migrate: true
6 changes: 6 additions & 0 deletions system-benchmarks/bench_data/mysql/aggregates/simple.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SET @ints_rows = 1000;

CREATE TABLE ints (
i int primary key,
v int COMMENT 'UNIFORM 0 @ints_rows'
) COMMENT = 'ROWS=@ints_rows';
15 changes: 15 additions & 0 deletions system-benchmarks/bench_data/mysql/group_by/agg_queries.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
distributions:
- name: ids
range:
start: 0
end: 100
zipf: 1.15
queries:
- spec: >-
SELECT count(v) FROM ints WHERE v = ? GROUP BY i
params:
- sql_type: bigint
distribution: ids
col: 0
weight: 1
migrate: true
7 changes: 7 additions & 0 deletions system-benchmarks/bench_data/mysql/group_by/simple.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
SET @ints_rows = 500000;
SET @num_ints = 100;

CREATE TABLE ints (
i int primary key,
v int COMMENT 'UNIFORM 0 @num_ints'
) COMMENT = 'ROWS=@ints_rows';