Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query engine integration #2074

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ members = [
"crates/paths",
"crates/physical-plan",
"crates/primitives",
"crates/query",
"crates/sats",
"crates/schema",
"crates/sdk",
Expand Down Expand Up @@ -106,6 +107,7 @@ spacetimedb-metrics = { path = "crates/metrics", version = "1.0.0-rc2" }
spacetimedb-paths = { path = "crates/paths", version = "1.0.0-rc2" }
spacetimedb-physical-plan = { path = "crates/physical-plan", version = "1.0.0-rc2" }
spacetimedb-primitives = { path = "crates/primitives", version = "1.0.0-rc2" }
spacetimedb-query = { path = "crates/query", version = "1.0.0-rc2" }
spacetimedb-sats = { path = "crates/sats", version = "1.0.0-rc2" }
spacetimedb-schema = { path = "crates/schema", version = "1.0.0-rc2" }
spacetimedb-standalone = { path = "crates/standalone", version = "1.0.0-rc2" }
Expand Down
2 changes: 2 additions & 0 deletions crates/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ bench = false
spacetimedb-client-api = { path = "../client-api" }
spacetimedb-core = { path = "../core", features = ["test"] }
spacetimedb-data-structures.workspace = true
spacetimedb-execution = { path = "../execution" }
spacetimedb-lib = { path = "../lib" }
spacetimedb-paths.workspace = true
spacetimedb-primitives = { path = "../primitives" }
spacetimedb-query = { path = "../query" }
spacetimedb-sats = { path = "../sats" }
spacetimedb-schema = { workspace = true, features = ["test"] }
spacetimedb-standalone = { path = "../standalone" }
Expand Down
33 changes: 32 additions & 1 deletion crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use spacetimedb::execution_context::Workload;
use spacetimedb::host::module_host::DatabaseTableUpdate;
use spacetimedb::identity::AuthCtx;
use spacetimedb::messages::websocket::BsatnFormat;
use spacetimedb::sql::ast::SchemaViewer;
use spacetimedb::subscription::query::compile_read_only_query;
use spacetimedb::subscription::subscription::ExecutionSet;
use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compression};
use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
use spacetimedb_primitives::{col_list, TableId};
use spacetimedb_query::SubscribePlan;
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductValue};

fn create_table_location(db: &RelationalDB) -> Result<TableId, DBError> {
Expand Down Expand Up @@ -99,6 +101,22 @@ fn eval(c: &mut Criterion) {
let ins_rhs = insert_op(rhs, "location", new_rhs_row);
let update = [&ins_lhs, &ins_rhs];

// A benchmark runner for the new query engine
let bench_query = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Subscribe);
let auth = AuthCtx::for_testing();
let schema_viewer = &SchemaViewer::new(&tx, &auth);
let plan = SubscribePlan::compile(sql, schema_viewer).unwrap();

b.iter(|| {
drop(black_box(
plan.collect_table_update::<BsatnFormat>(Compression::None, &tx),
))
})
});
};

let bench_eval = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Update);
Expand All @@ -116,6 +134,19 @@ fn eval(c: &mut Criterion) {
});
};

// Join 1M rows on the left with 12K rows on the right.
// Note, this should use an index join so as not to read the entire footprint table.
let name = format!(
r#"
select f.*
from location l join footprint f on l.entity_id = f.entity_id
where l.chunk_index = {chunk_index}
"#
);

bench_query(c, "footprint-scan", "select * from footprint");
bench_query(c, "footprint-semijoin", &name);

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-scan --exact --profile-time=30
// Iterate 1M rows.
Expand All @@ -124,7 +155,7 @@ fn eval(c: &mut Criterion) {
// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-join --exact --profile-time=30
// Join 1M rows on the left with 12K rows on the right.
// Note, this should use an index join so as not to read the entire lhs table.
// Note, this should use an index join so as not to read the entire footprint table.
let name = format!(
r#"
select footprint.*
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ spacetimedb-durability.workspace = true
spacetimedb-metrics.workspace = true
spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-query.workspace = true
spacetimedb-sats = { workspace = true, features = ["serde"] }
spacetimedb-schema.workspace = true
spacetimedb-table.workspace = true
spacetimedb-vm.workspace = true
spacetimedb-snapshot.workspace = true
spacetimedb-expr.workspace = true
spacetimedb-execution.workspace = true

anyhow = { workspace = true, features = ["backtrace"] }
arrayvec.workspace = true
Expand Down
35 changes: 26 additions & 9 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;
use derive_more::From;
use futures::prelude::*;
use spacetimedb_client_api_messages::websocket::{CallReducerFlags, Compression, FormatSwitch};
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, WebsocketFormat,
};
use spacetimedb_lib::identity::RequestId;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::AbortHandle;
Expand Down Expand Up @@ -294,26 +296,41 @@ impl ClientConnection {
.unwrap()
}

pub fn one_off_query(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
let result = self.module.one_off_query(self.id.identity, query.to_owned());
pub fn one_off_query_json(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
let response = self.one_off_query::<JsonFormat>(query, message_id, timer);
self.send_message(response)?;
Ok(())
}

pub fn one_off_query_bsatn(&self, query: &str, message_id: &[u8], timer: Instant) -> Result<(), anyhow::Error> {
let response = self.one_off_query::<BsatnFormat>(query, message_id, timer);
self.send_message(response)?;
Ok(())
}

fn one_off_query<F: WebsocketFormat>(
&self,
query: &str,
message_id: &[u8],
timer: Instant,
) -> OneOffQueryResponseMessage<F> {
let result = self.module.one_off_query::<F>(self.id.identity, query.to_owned());
let message_id = message_id.to_owned();
let total_host_execution_duration = timer.elapsed().as_micros() as u64;
let response = match result {
match result {
Ok(results) => OneOffQueryResponseMessage {
message_id,
error: None,
results,
results: vec![results],
total_host_execution_duration,
},
Err(err) => OneOffQueryResponseMessage {
message_id,
error: Some(format!("{}", err)),
results: Vec::new(),
results: vec![],
total_host_execution_duration,
},
};
self.send_message(response)?;
Ok(())
}
}

pub async fn disconnect(self) {
Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::messages::{SubscriptionUpdateMessage, SwitchedServerMessage, ToProtocol, TransactionUpdateMessage};
use super::{ClientConnection, DataMessage};
use super::{ClientConnection, DataMessage, Protocol};
use crate::energy::EnergyQuanta;
use crate::execution_context::WorkloadType;
use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall};
Expand Down Expand Up @@ -91,7 +91,10 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
query_string: query,
message_id,
}) => {
let res = client.one_off_query(&query, &message_id, timer);
let res = match client.config.protocol {
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer),
Protocol::Text => client.one_off_query_json(&query, &message_id, timer),
};
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Sql, &address, "")
Expand Down
64 changes: 31 additions & 33 deletions crates/core/src/client/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use crate::host::ArgsTuple;
use crate::messages::websocket as ws;
use derive_more::From;
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, Compression, FormatSwitch, JsonFormat, WebsocketFormat, SERVER_MSG_COMPRESSION_TAG_BROTLI,
SERVER_MSG_COMPRESSION_TAG_GZIP, SERVER_MSG_COMPRESSION_TAG_NONE,
BsatnFormat, Compression, FormatSwitch, JsonFormat, OneOffTable, RowListLen, WebsocketFormat,
SERVER_MSG_COMPRESSION_TAG_BROTLI, SERVER_MSG_COMPRESSION_TAG_GZIP, SERVER_MSG_COMPRESSION_TAG_NONE,
};
use spacetimedb_lib::identity::RequestId;
use spacetimedb_lib::ser::serde::SerializeWrapper;
use spacetimedb_lib::Address;
use spacetimedb_sats::bsatn;
use spacetimedb_vm::relation::MemTable;
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -63,7 +62,8 @@ pub fn serialize(msg: impl ToProtocol<Encoded = SwitchedServerMessage>, config:

#[derive(Debug, From)]
pub enum SerializableMessage {
Query(OneOffQueryResponseMessage),
QueryBinary(OneOffQueryResponseMessage<BsatnFormat>),
QueryText(OneOffQueryResponseMessage<JsonFormat>),
Identity(IdentityTokenMessage),
Subscribe(SubscriptionUpdateMessage),
TxUpdate(TransactionUpdateMessage),
Expand All @@ -72,7 +72,8 @@ pub enum SerializableMessage {
impl SerializableMessage {
pub fn num_rows(&self) -> Option<usize> {
match self {
Self::Query(msg) => Some(msg.num_rows()),
Self::QueryBinary(msg) => Some(msg.num_rows()),
Self::QueryText(msg) => Some(msg.num_rows()),
Self::Subscribe(msg) => Some(msg.num_rows()),
Self::TxUpdate(msg) => Some(msg.num_rows()),
Self::Identity(_) => None,
Expand All @@ -81,7 +82,7 @@ impl SerializableMessage {

pub fn workload(&self) -> Option<WorkloadType> {
match self {
Self::Query(_) => Some(WorkloadType::Sql),
Self::QueryBinary(_) | Self::QueryText(_) => Some(WorkloadType::Sql),
Self::Subscribe(_) => Some(WorkloadType::Subscribe),
Self::TxUpdate(_) => Some(WorkloadType::Update),
Self::Identity(_) => None,
Expand All @@ -93,7 +94,8 @@ impl ToProtocol for SerializableMessage {
type Encoded = SwitchedServerMessage;
fn to_protocol(self, protocol: Protocol) -> Self::Encoded {
match self {
SerializableMessage::Query(msg) => msg.to_protocol(protocol),
SerializableMessage::QueryBinary(msg) => msg.to_protocol(protocol),
SerializableMessage::QueryText(msg) => msg.to_protocol(protocol),
SerializableMessage::Identity(msg) => msg.to_protocol(protocol),
SerializableMessage::Subscribe(msg) => msg.to_protocol(protocol),
SerializableMessage::TxUpdate(msg) => msg.to_protocol(protocol),
Expand Down Expand Up @@ -243,42 +245,38 @@ impl ToProtocol for SubscriptionUpdateMessage {
}

#[derive(Debug)]
pub struct OneOffQueryResponseMessage {
pub struct OneOffQueryResponseMessage<F: WebsocketFormat> {
pub message_id: Vec<u8>,
pub error: Option<String>,
pub results: Vec<MemTable>,
pub results: Vec<OneOffTable<F>>,
pub total_host_execution_duration: u64,
}

impl OneOffQueryResponseMessage {
impl<F: WebsocketFormat> OneOffQueryResponseMessage<F> {
fn num_rows(&self) -> usize {
self.results.iter().map(|t| t.data.len()).sum()
self.results.iter().map(|table| table.rows.len()).sum()
}
}

impl ToProtocol for OneOffQueryResponseMessage {
impl ToProtocol for OneOffQueryResponseMessage<BsatnFormat> {
type Encoded = SwitchedServerMessage;
fn to_protocol(self, protocol: Protocol) -> Self::Encoded {
fn convert<F: WebsocketFormat>(msg: OneOffQueryResponseMessage) -> ws::ServerMessage<F> {
let tables = msg
.results
.into_iter()
.map(|table| ws::OneOffTable {
table_name: table.head.table_name.clone(),
rows: F::encode_list(table.data.into_iter()).0,
})
.collect();
ws::ServerMessage::OneOffQueryResponse(ws::OneOffQueryResponse {
message_id: msg.message_id.into(),
error: msg.error.map(Into::into),
tables,
total_host_execution_duration_micros: msg.total_host_execution_duration,
})
}
fn to_protocol(self, _: Protocol) -> Self::Encoded {
FormatSwitch::Bsatn(convert(self))
}
}

match protocol {
Protocol::Text => FormatSwitch::Json(convert(self)),
Protocol::Binary => FormatSwitch::Bsatn(convert(self)),
}
impl ToProtocol for OneOffQueryResponseMessage<JsonFormat> {
type Encoded = SwitchedServerMessage;
fn to_protocol(self, _: Protocol) -> Self::Encoded {
FormatSwitch::Json(convert(self))
}
}

fn convert<F: WebsocketFormat>(msg: OneOffQueryResponseMessage<F>) -> ws::ServerMessage<F> {
ws::ServerMessage::OneOffQueryResponse(ws::OneOffQueryResponse {
message_id: msg.message_id.into(),
error: msg.error.map(Into::into),
tables: msg.results.into_boxed_slice(),
total_host_execution_duration_micros: msg.total_host_execution_duration,
})
}
Loading