Skip to content

Commit

Permalink
fix: Snapshot QueryableBuffer error
Browse files Browse the repository at this point in the history
Fixes bug in queryable buffer where if a block of data was missing one of the columns defined in a table sort key, the creation of the logical plan to sort and dedupe the data would fail, causing a panic.

Fixes #25670
  • Loading branch information
pauldix committed Dec 17, 2024
1 parent 7d92b75 commit fe1a22a
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ authz = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba969
clap_blocks = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" }
data_types = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" }
datafusion_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" }
executor = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" }
influxdb-line-protocol = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160", features = ["v3"] }
influxdb_influxql_parser = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" }
influxdb_iox_client = { git = "https://github.com/influxdata/influxdb3_core", rev = "7eefba96954bdfb6fe69b26cd6d1fc9acd8e9160" }
Expand Down
1 change: 1 addition & 0 deletions influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license.workspace = true
# Core Crates
data_types.workspace = true
datafusion_util.workspace = true
executor.workspace = true
influxdb-line-protocol.workspace = true
iox_catalog.workspace = true
iox_http.workspace = true
Expand Down
156 changes: 156 additions & 0 deletions influxdb3_write/src/write_buffer/queryable_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,3 +691,159 @@ async fn sort_dedupe_persist(
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::write_buffer::validator::WriteValidator;
use crate::Precision;
use datafusion_util::config::register_iox_object_store;
use executor::{register_current_runtime_for_io, DedicatedExecutor};
use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
use iox_query::exec::ExecutorConfig;
use iox_time::{MockProvider, Time, TimeProvider};
use object_store::memory::InMemory;
use object_store::ObjectStore;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::num::NonZeroUsize;

#[tokio::test]
async fn snapshot_works_with_not_all_columns_in_buffer() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let metrics = Arc::new(metric::Registry::default());

let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let exec = Arc::new(Executor::new_with_config_and_executor(
ExecutorConfig {
target_query_partitions: NonZeroUsize::new(1).unwrap(),
object_stores: [&parquet_store]
.into_iter()
.map(|store| (store.id(), Arc::clone(store.object_store())))
.collect(),
metric_registry: Arc::clone(&metrics),
// Default to 1gb
mem_pool_size: 1024 * 1024 * 1024, // 1024 (b/kb) * 1024 (kb/mb) * 1024 (mb/gb)
},
DedicatedExecutor::new_testing(),
));
let runtime_env = exec.new_context().inner().runtime_env();
register_iox_object_store(runtime_env, parquet_store.id(), Arc::clone(&object_store));
register_current_runtime_for_io();

let catalog = Arc::new(Catalog::new("hosta".into(), "foo".into()));
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "hosta"));
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));

let queryable_buffer_args = QueryableBufferArgs {
executor: Arc::clone(&exec),
catalog: Arc::clone(&catalog),
persister: Arc::clone(&persister),
last_cache_provider: LastCacheProvider::new_from_catalog(Arc::clone(&catalog)).unwrap(),
meta_cache_provider: MetaCacheProvider::new_from_catalog(
Arc::clone(&time_provider),
Arc::clone(&catalog),
)
.unwrap(),
persisted_files: Arc::new(Default::default()),
parquet_cache: None,
};
let queryable_buffer = QueryableBuffer::new(queryable_buffer_args);

let db = data_types::NamespaceName::new("testdb").unwrap();

// create the initial write with two tags
let val = WriteValidator::initialize(db.clone(), Arc::clone(&catalog), 0).unwrap();
let lp = "foo,t1=a,t2=b f1=1i 1000000000";

let lines = val
.v1_parse_lines_and_update_schema(lp, false, time_provider.now(), Precision::Nanosecond)
.unwrap()
.convert_lines_to_buffer(Gen1Duration::new_1m());
let batch: WriteBatch = lines.into();
let wal_contents = WalContents {
persist_timestamp_ms: 0,
min_timestamp_ns: batch.min_time_ns,
max_timestamp_ns: batch.max_time_ns,
wal_file_number: WalFileSequenceNumber::new(1),
ops: vec![WalOp::Write(batch)],
snapshot: None,
};
let end_time =
wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64;

// write the lp into the buffer
queryable_buffer.notify(wal_contents);

// now force a snapshot, persisting the data to parquet file. Also, buffer up a new write
let snapshot_sequence_number = SnapshotSequenceNumber::new(1);
let snapshot_details = SnapshotDetails {
snapshot_sequence_number,
end_time_marker: end_time,
last_wal_sequence_number: WalFileSequenceNumber::new(2),
};

// create another write, this time with only one tag, in a different gen1 block
let lp = "foo,t2=b f1=1i 240000000000";
let val = WriteValidator::initialize(db, Arc::clone(&catalog), 0).unwrap();

let lines = val
.v1_parse_lines_and_update_schema(lp, false, time_provider.now(), Precision::Nanosecond)
.unwrap()
.convert_lines_to_buffer(Gen1Duration::new_1m());
let batch: WriteBatch = lines.into();
let wal_contents = WalContents {
persist_timestamp_ms: 0,
min_timestamp_ns: batch.min_time_ns,
max_timestamp_ns: batch.max_time_ns,
wal_file_number: WalFileSequenceNumber::new(2),
ops: vec![WalOp::Write(batch)],
snapshot: None,
};
let end_time =
wal_contents.max_timestamp_ns + Gen1Duration::new_1m().as_duration().as_nanos() as i64;

let details = queryable_buffer
.notify_and_snapshot(wal_contents, snapshot_details)
.await;
let _details = details.await.unwrap();

// validate we have a single persisted file
let db = catalog.db_schema("testdb").unwrap();
let table = db.table_definition("foo").unwrap();
let files = queryable_buffer
.persisted_files
.get_files(db.id, table.table_id);
assert_eq!(files.len(), 1);

// now force another snapshot, persisting the data to parquet file
let snapshot_sequence_number = SnapshotSequenceNumber::new(2);
let snapshot_details = SnapshotDetails {
snapshot_sequence_number,
end_time_marker: end_time,
last_wal_sequence_number: WalFileSequenceNumber::new(3),
};
queryable_buffer
.notify_and_snapshot(
WalContents {
persist_timestamp_ms: 0,
min_timestamp_ns: 0,
max_timestamp_ns: 0,
wal_file_number: WalFileSequenceNumber::new(3),
ops: vec![],
snapshot: Some(snapshot_details),
},
snapshot_details,
)
.await
.await
.unwrap();

// validate we have two persisted files
let files = queryable_buffer
.persisted_files
.get_files(db.id, table.table_id);
assert_eq!(files.len(), 2);
}
}
18 changes: 16 additions & 2 deletions influxdb3_write/src/write_buffer/table_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,9 @@ impl MutableTableChunk {
fn into_schema_record_batch(self, table_def: Arc<TableDefinition>) -> (Schema, RecordBatch) {
let mut cols = Vec::with_capacity(self.data.len());
let mut schema_builder = SchemaBuilder::new();
let mut cols_in_batch = HashSet::new();
for (col_id, builder) in self.data.into_iter() {
cols_in_batch.insert(col_id);
let (col_type, col) = builder.into_influxcol_and_arrow();
schema_builder.influx_column(
table_def
Expand All @@ -455,6 +457,20 @@ impl MutableTableChunk {
);
cols.push(col);
}

// ensure that every series key column is present in the batch
for col_id in &table_def.series_key {
if !cols_in_batch.contains(col_id) {
let col_name = table_def
.column_id_to_name(col_id)
.expect("valid column id");
schema_builder.influx_column(col_name.as_ref(), InfluxColumnType::Tag);
let mut tag_builder: StringDictionaryBuilder<Int32Type> =
StringDictionaryBuilder::new();
tag_builder.append_nulls(self.row_count);
cols.push(Arc::new(tag_builder.finish()));
}
}
let schema = schema_builder
.build()
.expect("should always be able to build schema");
Expand Down Expand Up @@ -779,8 +795,6 @@ mod tests {
.partitioned_record_batches(Arc::clone(&table_def), &[])
.unwrap();

println!("{partitioned_batches:#?}");

assert_eq!(10, partitioned_batches.len());

for t in 0..10 {
Expand Down

0 comments on commit fe1a22a

Please sign in to comment.