Skip to content

Commit

Permalink
feat(processing_engine): respond to PR feedback, put python code behi…
Browse files Browse the repository at this point in the history
…nd system-py feature flag
  • Loading branch information
jacksonrnewhouse committed Dec 13, 2024
1 parent e12a5cd commit a119b65
Show file tree
Hide file tree
Showing 20 changed files with 288 additions and 423 deletions.
13 changes: 0 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ members = [
"influxdb3_id",
"influxdb3_load_generator",
"influxdb3_process",
"influxdb3_processing_engine",
"influxdb3_py_api",
"influxdb3_server",
"influxdb3_telemetry",
Expand Down
1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ tokio_console = ["console-subscriber", "tokio/tracing", "observability_deps/rele

# Use jemalloc as the default allocator.
jemalloc_replacing_malloc = ["influxdb3_process/jemalloc_replacing_malloc"]
system-py = ["influxdb3_write/system-py"]

[dev-dependencies]
# Core Crates
Expand Down
1 change: 0 additions & 1 deletion influxdb3_catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ iox_time.workspace = true
# Local deps
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_processing_engine = { path = "../influxdb3_processing_engine" }

# crates.io dependencies
arrow.workspace = true
Expand Down
42 changes: 10 additions & 32 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
//! Implementation of the Catalog that sits entirely in memory.
use crate::catalog::Error::{
ProcessingEngineCallExists, ProcessingEnginePluginNotFound, ProcessingEngineTriggerExists,
TableNotFound,
ProcessingEngineCallExists, ProcessingEngineTriggerExists, TableNotFound,
};
use bimap::BiHashMap;
use hashbrown::HashMap;
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_processing_engine::processing_engine_plugins::{
ProcessingEnginePlugin, ProcessingEngineTrigger,
};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions,
FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
Expand Down Expand Up @@ -519,9 +515,9 @@ pub struct DatabaseSchema {
/// The database is a map of tables
pub tables: SerdeVecMap<TableId, Arc<TableDefinition>>,
pub table_map: BiHashMap<TableId, Arc<str>>,
pub processing_engine_plugins: HashMap<String, ProcessingEnginePlugin>,
pub processing_engine_plugins: HashMap<String, PluginDefinition>,
// TODO: care about performance of triggers
pub processing_engine_triggers: HashMap<String, ProcessingEngineTrigger>,
pub processing_engine_triggers: HashMap<String, TriggerDefinition>,
pub deleted: bool,
}

Expand Down Expand Up @@ -758,21 +754,19 @@ impl UpdateDatabaseSchema for PluginDefinition {
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
let plugin: ProcessingEnginePlugin = self.into();

match schema.processing_engine_plugins.get(&self.plugin_name) {
Some(current) if plugin.eq(current) => {}
Some(current) if self.eq(current) => {}
Some(_) => {
return Err(ProcessingEngineCallExists {
database_name: schema.name.to_string(),
call_name: plugin.plugin_name.to_string(),
call_name: self.plugin_name.to_string(),
})
}
None => {
schema
.to_mut()
.processing_engine_plugins
.insert(self.plugin_name.to_string(), plugin);
.insert(self.plugin_name.to_string(), self.clone());
}
}

Expand All @@ -785,35 +779,19 @@ impl UpdateDatabaseSchema for TriggerDefinition {
&self,
mut schema: Cow<'a, DatabaseSchema>,
) -> Result<Cow<'a, DatabaseSchema>> {
let trigger_name = self.trigger_name.to_string();
let Some(plugin) = schema
.processing_engine_plugins
.get(&self.plugin_name)
.cloned()
else {
return Err(ProcessingEnginePluginNotFound {
plugin_name: self.plugin_name.to_string(),
database_name: schema.name.to_string(),
});
};
let trigger = ProcessingEngineTrigger {
trigger_name: trigger_name.to_string(),
plugin,
trigger: (&self.trigger).into(),
};
if let Some(current) = schema.processing_engine_triggers.get(&trigger_name) {
if current == &trigger {
if let Some(current) = schema.processing_engine_triggers.get(&self.trigger_name) {
if current == self {
return Ok(schema);
}
return Err(ProcessingEngineTriggerExists {
database_name: schema.name.to_string(),
trigger_name,
trigger_name: self.trigger_name.to_string(),
});
}
schema
.to_mut()
.processing_engine_triggers
.insert(trigger_name, trigger);
.insert(self.trigger_name.to_string(), self.clone());
Ok(schema)
}
}
Expand Down
22 changes: 11 additions & 11 deletions influxdb3_catalog/src/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use influxdb3_id::ColumnId;
use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_processing_engine::processing_engine_plugins::{
ProcessingEnginePlugin, ProcessingEngineTrigger,
use influxdb3_wal::{
LastCacheDefinition, LastCacheValueColumnsDef, PluginDefinition, TriggerDefinition,
};
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef};
use schema::InfluxColumnType;
use schema::InfluxFieldType;
use schema::TIME_DATA_TIMEZONE;
Expand Down Expand Up @@ -95,14 +94,15 @@ impl From<DatabaseSnapshot> for DatabaseSchema {
.into_iter()
.map(|(name, trigger)| {
// TODO: Decide whether to handle errors
let plugin = processing_engine_plugins
let plugin: PluginDefinition = processing_engine_plugins
.get(&trigger.plugin_name)
.cloned()
.expect("should have plugin");
(
name,
ProcessingEngineTrigger {
TriggerDefinition {
trigger_name: trigger.trigger_name,
plugin_name: plugin.plugin_name.to_string(),
plugin,
trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(),
},
Expand Down Expand Up @@ -405,8 +405,8 @@ impl From<TableSnapshot> for TableDefinition {
}
}

impl From<&ProcessingEnginePlugin> for ProcessingEnginePluginSnapshot {
fn from(plugin: &ProcessingEnginePlugin) -> Self {
impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot {
fn from(plugin: &PluginDefinition) -> Self {
Self {
plugin_name: plugin.plugin_name.to_string(),
code: plugin.code.to_string(),
Expand All @@ -416,7 +416,7 @@ impl From<&ProcessingEnginePlugin> for ProcessingEnginePluginSnapshot {
}
}

impl From<ProcessingEnginePluginSnapshot> for ProcessingEnginePlugin {
impl From<ProcessingEnginePluginSnapshot> for PluginDefinition {
fn from(plugin: ProcessingEnginePluginSnapshot) -> Self {
Self {
plugin_name: plugin.plugin_type.to_string(),
Expand All @@ -427,11 +427,11 @@ impl From<ProcessingEnginePluginSnapshot> for ProcessingEnginePlugin {
}
}

impl From<&ProcessingEngineTrigger> for ProcessingEngineTriggerSnapshot {
fn from(trigger: &ProcessingEngineTrigger) -> Self {
impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot {
fn from(trigger: &TriggerDefinition) -> Self {
ProcessingEngineTriggerSnapshot {
trigger_name: trigger.trigger_name.to_string(),
plugin_name: trigger.plugin.plugin_name.to_string(),
plugin_name: trigger.plugin_name.to_string(),
trigger_specification: serde_json::to_string(&trigger.trigger)
.expect("should be able to serialize trigger specification"),
}
Expand Down
14 changes: 0 additions & 14 deletions influxdb3_processing_engine/Cargo.toml

This file was deleted.

1 change: 0 additions & 1 deletion influxdb3_processing_engine/src/lib.rs

This file was deleted.

102 changes: 0 additions & 102 deletions influxdb3_processing_engine/src/processing_engine_plugins.rs

This file was deleted.

6 changes: 5 additions & 1 deletion influxdb3_py_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@ authors.workspace = true
edition.workspace = true
license.workspace = true


[features]
system-py = ["pyo3"]
[dependencies]
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_catalog = {path = "../influxdb3_catalog"}
influxdb-line-protocol = { workspace = true }
schema = { workspace = true }

[dependencies.pyo3]
version = "0.23.3"
# this is necessary to automatically initialize the Python interpreter
features = ["auto-initialize"]
optional = true


[lints]
workspace = true
Loading

0 comments on commit a119b65

Please sign in to comment.