Skip to content

Commit

Permalink
fix(catalog): consistent ordering of catalog operations
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse committed Dec 19, 2024
1 parent 5657640 commit 06f6bda
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 215 deletions.
236 changes: 139 additions & 97 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
//! Implementation of the Catalog that sits entirely in memory.
use crate::catalog::Error::{
ProcessingEngineCallExists, ProcessingEngineTriggerExists, TableNotFound,
CatalogUpdatedElsewhere, ProcessingEngineCallExists, ProcessingEngineTriggerExists,
TableNotFound,
};
use bimap::BiHashMap;
use bimap::{BiHashMap, Overwritten};
use hashbrown::HashMap;
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions,
FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
PluginDefinition, TriggerDefinition,
OrderedCatalogBatch, PluginDefinition, TriggerDefinition,
};
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
Expand Down Expand Up @@ -184,10 +185,26 @@ impl Catalog {
}
}

pub fn apply_catalog_batch(&self, catalog_batch: &CatalogBatch) -> Result<()> {
pub fn apply_catalog_batch(
&self,
catalog_batch: CatalogBatch,
) -> Result<Option<OrderedCatalogBatch>> {
self.inner.write().apply_catalog_batch(catalog_batch)
}

// Checks the sequence number to see if it needs to be applied.
pub fn apply_ordered_catalog_batch(
&self,
batch: OrderedCatalogBatch,
) -> Result<Option<CatalogBatch>> {
if batch.sequence_number() >= self.sequence_number().as_u32() {
if let Some(catalog_batch) = self.apply_catalog_batch(batch.batch())? {
return Ok(Some(catalog_batch.batch()));
}
}
Ok(None)
}

pub fn db_or_create(&self, db_name: &str) -> Result<Arc<DatabaseSchema>> {
let db = match self.db_schema(db_name) {
Some(db) => db,
Expand Down Expand Up @@ -259,81 +276,6 @@ impl Catalog {
self.inner.read().clone()
}

pub fn add_meta_cache(&self, db_id: DbId, table_id: TableId, meta_cache: MetaCacheDefinition) {
let mut inner = self.inner.write();
let mut db = inner
.databases
.get(&db_id)
.expect("db should exist")
.as_ref()
.clone();
let mut table = db
.table_definition_by_id(&table_id)
.expect("table should exist")
.as_ref()
.clone();
table.add_meta_cache(meta_cache);
db.insert_table(table_id, Arc::new(table));
inner.upsert_db(db);
}

pub fn remove_meta_cache(&self, db_id: &DbId, table_id: &TableId, name: &str) {
let mut inner = self.inner.write();
let mut db = inner
.databases
.get(db_id)
.expect("db should exist")
.as_ref()
.clone();
let mut table = db
.tables
.get(table_id)
.expect("table should exist")
.as_ref()
.clone();
table.remove_meta_cache(name);
db.insert_table(*table_id, Arc::new(table));
inner.upsert_db(db);
}

pub fn add_last_cache(&self, db_id: DbId, table_id: TableId, last_cache: LastCacheDefinition) {
let mut inner = self.inner.write();
let mut db = inner
.databases
.get(&db_id)
.expect("db should exist")
.as_ref()
.clone();
let mut table = db
.tables
.get(&table_id)
.expect("table should exist")
.as_ref()
.clone();
table.add_last_cache(last_cache);
db.insert_table(table_id, Arc::new(table));
inner.upsert_db(db);
}

pub fn delete_last_cache(&self, db_id: DbId, table_id: TableId, name: &str) {
let mut inner = self.inner.write();
let mut db = inner
.databases
.get(&db_id)
.expect("db should exist")
.as_ref()
.clone();
let mut table = db
.tables
.get(&table_id)
.expect("table should exist")
.as_ref()
.clone();
table.remove_last_cache(name);
db.insert_table(table_id, Arc::new(table));
inner.upsert_db(db);
}

pub fn instance_id(&self) -> Arc<str> {
Arc::clone(&self.inner.read().instance_id)
}
Expand Down Expand Up @@ -478,24 +420,31 @@ impl InnerCatalog {

/// Applies the `CatalogBatch` while validating that all updates are compatible. If updates
/// have already been applied, the sequence number and updated tracker are not updated.
pub fn apply_catalog_batch(&mut self, catalog_batch: &CatalogBatch) -> Result<()> {
pub fn apply_catalog_batch(
&mut self,
catalog_batch: CatalogBatch,
) -> Result<Option<OrderedCatalogBatch>> {
let table_count = self.table_count();

if let Some(db) = self.databases.get(&catalog_batch.database_id) {
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, catalog_batch)? {
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, &catalog_batch)? {
check_overall_table_count(Some(db), &new_db, table_count)?;
self.upsert_db(new_db);
} else {
return Ok(None);
}
} else {
if self.databases.len() >= Catalog::NUM_DBS_LIMIT {
return Err(Error::TooManyDbs);
}
let new_db = DatabaseSchema::new_from_batch(catalog_batch)?;
let new_db = DatabaseSchema::new_from_batch(&catalog_batch)?;
check_overall_table_count(None, &new_db, table_count)?;
self.upsert_db(new_db);
}

Ok(())
Ok(Some(OrderedCatalogBatch::new(
catalog_batch,
self.sequence.0,
)))
}

pub fn db_exists(&self, db_id: DbId) -> bool {
Expand Down Expand Up @@ -596,10 +545,21 @@ impl DatabaseSchema {
&mut self,
table_id: TableId,
table_def: Arc<TableDefinition>,
) -> Option<Arc<TableDefinition>> {
self.table_map
.insert(table_id, Arc::clone(&table_def.table_name));
self.tables.insert(table_id, table_def)
) -> Result<Option<Arc<TableDefinition>>> {
match self
.table_map
.insert(table_id, Arc::clone(&table_def.table_name))
{
Overwritten::Left(_, _) | Overwritten::Right(_, _) | Overwritten::Both(_, _) => {
// This will happen if another table was inserted with the same name between checking
// for existence and insertion.
// We'd like this to be automatically handled by the system,
// but for now it is better to error than get into an inconsistent state.
return Err(CatalogUpdatedElsewhere);
}
Overwritten::Neither | Overwritten::Pair(_, _) => {}
}
Ok(self.tables.insert(table_id, table_def))
}

pub fn table_schema(&self, table_name: impl Into<Arc<str>>) -> Option<Schema> {
Expand Down Expand Up @@ -722,14 +682,14 @@ impl UpdateDatabaseSchema for influxdb3_wal::TableDefinition {
if let Cow::Owned(updated_table) = existing_table.check_and_add_new_fields(self)? {
database_schema
.to_mut()
.insert_table(self.table_id, Arc::new(updated_table));
.insert_table(self.table_id, Arc::new(updated_table))?;
}
}
None => {
let new_table = TableDefinition::new_from_op(self);
database_schema
.to_mut()
.insert_table(new_table.table_id, Arc::new(new_table));
.insert_table(new_table.table_id, Arc::new(new_table))?;
}
}
Ok(database_schema)
Expand Down Expand Up @@ -1146,7 +1106,7 @@ impl<T: TableUpdate> UpdateDatabaseSchema for T {
if let Cow::Owned(new_table) = self.update_table(Cow::Borrowed(table.as_ref()))? {
schema
.to_mut()
.insert_table(new_table.table_id, Arc::new(new_table));
.insert_table(new_table.table_id, Arc::new(new_table))?;
}
Ok(schema)
}
Expand Down Expand Up @@ -1278,12 +1238,15 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT

#[cfg(test)]
mod tests {
use influxdb3_wal::{create, FieldDataType};
use super::*;
use influxdb3_wal::object_store::WalObjectStore;
use influxdb3_wal::CatalogOp::CreateTable;
use influxdb3_wal::{create, FieldDataType, Gen1Duration, WalConfig, WalFileNotifier, WalOp};
use iox_time::{MockProvider, TimeProvider};
use pretty_assertions::assert_eq;
use std::time::Duration;
use test_helpers::assert_contains;

use super::*;

#[test]
fn catalog_serialization() {
let host_id = Arc::from("sample-host-id");
Expand Down Expand Up @@ -1697,7 +1660,7 @@ mod tests {
let catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
catalog.insert_database(DatabaseSchema::new(DbId::new(), Arc::from("foo")));
let db_id = catalog.db_name_to_id("foo").unwrap();
let catalog_batch = create::catalog_batch_op(
let catalog_batch = create::catalog_batch(
db_id,
"foo",
0,
Expand All @@ -1714,7 +1677,7 @@ mod tests {
)],
);
let err = catalog
.apply_catalog_batch(catalog_batch.as_catalog().unwrap())
.apply_catalog_batch(catalog_batch)
.expect_err("should fail to apply AddFields operation for non-existent table");
assert_contains!(err.to_string(), "Table banana not in DB schema for foo");
}
Expand Down Expand Up @@ -1765,7 +1728,9 @@ mod tests {
.unwrap(),
);

database.insert_table(deleted_table_id, table_defn);
database
.insert_table(deleted_table_id, table_defn)
.expect("should be able to insert");
let new_db = DatabaseSchema::new_if_updated_from_batch(
&database,
&CatalogBatch {
Expand All @@ -1788,4 +1753,81 @@ mod tests {
assert!(deleted_table.deleted);
assert!(!deleted_table.series_key.is_empty());
}

// tests that sorting catalog ops by the sequence number returned from apply_catalog_batch
// fixes potential ordering issues.
#[test]
fn test_out_of_order_ops() -> Result<()> {
let catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
let db_id = DbId::new();
let db_name = Arc::from("foo");
let table_id = TableId::new();
let table_name = Arc::from("bar");
let table_definition = influxdb3_wal::TableDefinition {
database_id: db_id,
database_name: Arc::clone(&db_name),
table_name: Arc::clone(&table_name),
table_id,
field_definitions: vec![
FieldDefinition::new(ColumnId::from(0), "tag_1", FieldDataType::Tag),
FieldDefinition::new(ColumnId::from(1), "time", FieldDataType::Timestamp),
FieldDefinition::new(ColumnId::from(2), "field", FieldDataType::String),
],
key: vec![ColumnId::from(0)],
};
let create_op = CatalogBatch {
database_id: db_id,
database_name: Arc::clone(&db_name),
time_ns: 0,
ops: vec![CreateTable(table_definition.clone())],
};
let add_column_op = CatalogBatch {
database_id: db_id,
database_name: Arc::clone(&db_name),
time_ns: 0,
ops: vec![CatalogOp::AddFields(FieldAdditions {
database_name: Arc::clone(&db_name),
database_id: db_id,
table_name,
table_id,
field_definitions: vec![FieldDefinition::new(
ColumnId::from(3),
"tag_2",
FieldDataType::Tag,
)],
})],
};
let create_ordered_op = catalog
.apply_catalog_batch(create_op)?
.expect("should be able to create");
let add_column_op = catalog
.apply_catalog_batch(add_column_op)?
.expect("should produce operation");
let mut ops = vec![
WalOp::Catalog(add_column_op),
WalOp::Catalog(create_ordered_op),
];
ops.sort();

let replayed_catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
for op in ops {
let WalOp::Catalog(catalog_batch) = op else {
panic!("should produce operation");
};
replayed_catalog.apply_catalog_batch(catalog_batch.batch())?;
}
let original_table = catalog
.db_schema_by_id(&db_id)
.unwrap()
.table_definition_by_id(&table_id)
.unwrap();
let replayed_table = catalog
.db_schema_by_id(&db_id)
.unwrap()
.table_definition_by_id(&table_id)
.unwrap();

assert_eq!(original_table, replayed_table);
Ok(())
}
}
8 changes: 4 additions & 4 deletions influxdb3_wal/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ pub fn write_batch_op(write_batch: WriteBatch) -> WalOp {
WalOp::Write(write_batch)
}

pub fn catalog_batch_op(
pub fn catalog_batch(
db_id: DbId,
db_name: impl Into<Arc<str>>,
time_ns: i64,
ops: impl IntoIterator<Item = CatalogOp>,
) -> WalOp {
WalOp::Catalog(CatalogBatch {
) -> CatalogBatch {
CatalogBatch {
database_id: db_id,
database_name: db_name.into(),
time_ns,
ops: ops.into_iter().collect(),
})
}
}

pub fn add_fields_op(
Expand Down
Loading

0 comments on commit 06f6bda

Please sign in to comment.