From 7c631283216ba4dc1e7b0890f67d9929620b3753 Mon Sep 17 00:00:00 2001 From: Jason Brown Date: Sat, 24 Aug 2024 07:20:34 -0700 Subject: [PATCH] reader-map: Add unique identifier to metrics Plumb through the global node index of the reader down to the reader-map metrics, and add it as a label to the histograms. This is the only identifier available when creating the reader node during migration/recovery. Change-Id: Ic6052036dc92d76d87f797c5af8b5aa42fbfa02c Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7884 Tested-by: Buildkite CI Reviewed-by: Sidney Cammeresi --- Cargo.lock | 1 + reader-map/Cargo.toml | 1 + reader-map/src/inner.rs | 14 ++++-- reader-map/src/lib.rs | 18 ++++++++ readyset-dataflow/src/backlog/mod.rs | 68 ++++++++++++++++++++++++---- readyset-dataflow/src/domain/mod.rs | 9 +++- 6 files changed, 97 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe8d507462..e53b622105 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4084,6 +4084,7 @@ dependencies = [ "left-right", "metrics", "partial-map", + "petgraph", "proptest", "rand", "readyset-client", diff --git a/reader-map/Cargo.toml b/reader-map/Cargo.toml index 897baed648..d4580f663a 100644 --- a/reader-map/Cargo.toml +++ b/reader-map/Cargo.toml @@ -10,6 +10,7 @@ itertools = { workspace = true } iter-enum = { workspace = true } left-right = { workspace = true } metrics = { workspace = true } +petgraph = { workspace = true } rand = { workspace = true } smallvec = { workspace = true, features = ["union"] } thiserror = { workspace = true } diff --git a/reader-map/src/inner.rs b/reader-map/src/inner.rs index a3fd255848..d817a90dfe 100644 --- a/reader-map/src/inner.rs +++ b/reader-map/src/inner.rs @@ -7,6 +7,7 @@ use iter_enum::{ExactSizeIterator, Iterator}; use itertools::Either; use metrics::{register_histogram, Histogram}; use partial_map::PartialMap; +use petgraph::graph::NodeIndex; use readyset_client::internal::IndexType; use readyset_util::ranges::{Bound, RangeBounds}; @@ -321,9 +322,13 @@ pub(crate) struct WriteMetrics { } impl WriteMetrics { - fn new() -> Self { - let entry_updated = register_histogram!(READER_MAP_UPDATES); - let lifetime_evict = register_histogram!(READER_MAP_LIFETIMES); + fn new(node_index: Option) -> Self { + let idx = match node_index { + Some(idx) => idx.index().to_string(), + None => "-1".to_string(), + }; + let entry_updated = register_histogram!(READER_MAP_UPDATES, "node_idx" => idx.clone()); + let lifetime_evict = register_histogram!(READER_MAP_LIFETIMES, "node_idx" => idx); Self { entry_updated, @@ -409,6 +414,7 @@ where hasher: S, eviction_strategy: EvictionStrategy, insertion_order: Option, + node_index: Option, ) -> Self { Inner { data: Data::with_index_type_and_hasher(index_type, hasher.clone()), @@ -418,7 +424,7 @@ where hasher, eviction_strategy, insertion_order, - metrics: WriteMetrics::new(), + metrics: WriteMetrics::new(node_index), } } diff --git a/reader-map/src/lib.rs b/reader-map/src/lib.rs index afb398182e..42a2b4888a 100644 --- a/reader-map/src/lib.rs +++ b/reader-map/src/lib.rs @@ -230,6 +230,7 @@ use std::hash::{BuildHasher, Hash}; pub use eviction::EvictionStrategy; use partial_map::InsertionOrder; +use petgraph::graph::NodeIndex; use readyset_client::internal::IndexType; use crate::inner::Inner; @@ -277,6 +278,9 @@ pub struct Options { capacity: Option, eviction_strategy: EvictionStrategy, insertion_order: Option, + + // The global index of the reader node that will hold this `reader-map`. + node_index: Option, } impl fmt::Debug for Options @@ -291,6 +295,7 @@ where .field("timestamp", &self.timestamp) .field("capacity", &self.capacity) .field("order", &self.insertion_order) + .field("node_index", &self.node_index) .finish_non_exhaustive() } } @@ -305,6 +310,7 @@ impl Default for Options<(), (), RandomState, DefaultInsertionOrder> { capacity: None, eviction_strategy: Default::default(), insertion_order: None, + node_index: None, } } } @@ -320,6 +326,7 @@ impl Options { capacity: self.capacity, eviction_strategy: self.eviction_strategy, insertion_order: self.insertion_order, + node_index: self.node_index, } } @@ -333,6 +340,7 @@ impl Options { capacity: self.capacity, eviction_strategy: self.eviction_strategy, insertion_order: self.insertion_order, + node_index: self.node_index, } } @@ -346,6 +354,7 @@ impl Options { capacity: Some(capacity), eviction_strategy: self.eviction_strategy, insertion_order: self.insertion_order, + node_index: self.node_index, } } @@ -359,6 +368,7 @@ impl Options { capacity: self.capacity, eviction_strategy: self.eviction_strategy, insertion_order: self.insertion_order, + node_index: self.node_index, } } @@ -372,6 +382,7 @@ impl Options { capacity: self.capacity, eviction_strategy: self.eviction_strategy, insertion_order, + node_index: self.node_index, } } @@ -387,6 +398,12 @@ impl Options { self } + /// Sets the eviction strategy for the map. + pub fn with_node_index(mut self, node_index: NodeIndex) -> Self { + self.node_index = Some(node_index); + self + } + /// Create the map, and construct the read and write handles used to access it. #[allow(clippy::type_complexity)] pub fn construct(self) -> (WriteHandle, ReadHandle) @@ -405,6 +422,7 @@ impl Options { self.hasher, self.eviction_strategy, self.insertion_order, + self.node_index, ); let (mut w, r) = left_right::new_from_empty(inner); diff --git a/readyset-dataflow/src/backlog/mod.rs b/readyset-dataflow/src/backlog/mod.rs index f062396cad..072be769b9 100644 --- a/readyset-dataflow/src/backlog/mod.rs +++ b/readyset-dataflow/src/backlog/mod.rs @@ -32,8 +32,16 @@ pub(crate) fn new( cols: usize, index: Index, reader_processing: ReaderProcessing, + node_index: NodeIndex, ) -> (SingleReadHandle, WriteHandle) { - new_inner(cols, index, None, EvictionKind::Random, reader_processing) + new_inner( + cols, + index, + None, + EvictionKind::Random, + reader_processing, + node_index, + ) } /// Allocate a new partially materialized end-user facing result table. @@ -55,6 +63,7 @@ pub(crate) fn new_partial( trigger: F, eviction_kind: EvictionKind, reader_processing: ReaderProcessing, + node_index: NodeIndex, ) -> (SingleReadHandle, WriteHandle) where F: Fn(&mut dyn Iterator, Relation) -> bool + 'static + Send + Sync, @@ -65,6 +74,7 @@ where Some(Arc::new(trigger)), eviction_kind, reader_processing, + node_index, ) } @@ -85,6 +95,7 @@ fn new_inner( >, eviction_kind: EvictionKind, reader_processing: ReaderProcessing, + node_index: NodeIndex, ) -> (SingleReadHandle, WriteHandle) { let contiguous = { let mut contiguous = true; @@ -117,6 +128,7 @@ fn new_inner( use reader_map; let (mut w, r) = reader_map::Options::default() .with_meta(-1) + .with_node_index(node_index) .with_timestamp(Timestamp::default()) .with_hasher(RandomState::default()) .with_index_type(index.index_type) @@ -615,7 +627,12 @@ mod tests { fn store_works() { let a = vec![1i32.into(), "a".into()].into_boxed_slice(); - let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default()); + let (r, mut w) = new( + 2, + Index::hash_map(vec![0]), + ReaderProcessing::default(), + Default::default(), + ); w.swap(); @@ -639,7 +656,12 @@ mod tests { use std::thread; let n = 1_000; - let (r, mut w) = new(1, Index::hash_map(vec![0]), ReaderProcessing::default()); + let (r, mut w) = new( + 1, + Index::hash_map(vec![0]), + ReaderProcessing::default(), + Default::default(), + ); let jh = thread::spawn(move || { for i in 0..n { w.add(vec![Record::Positive(vec![i.into()])]); @@ -668,7 +690,12 @@ mod tests { let a = vec![1i32.into(), "a".into()].into_boxed_slice(); let b = vec![1i32.into(), "b".into()].into_boxed_slice(); - let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default()); + let (r, mut w) = new( + 2, + Index::hash_map(vec![0]), + ReaderProcessing::default(), + Default::default(), + ); w.add(vec![Record::Positive(a.to_vec())]); w.swap(); w.add(vec![Record::Positive(b.to_vec())]); @@ -683,7 +710,12 @@ mod tests { let b = vec![1i32.into(), "b".into()].into_boxed_slice(); let c = vec![1i32.into(), "c".into()].into_boxed_slice(); - let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default()); + let (r, mut w) = new( + 2, + Index::hash_map(vec![0]), + ReaderProcessing::default(), + Default::default(), + ); w.add(vec![Record::Positive(a.to_vec())]); w.add(vec![Record::Positive(b.to_vec())]); w.swap(); @@ -699,7 +731,12 @@ mod tests { let a = vec![1i32.into(), "a".into()].into_boxed_slice(); let b = vec![1i32.into(), "b".into()].into_boxed_slice(); - let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default()); + let (r, mut w) = new( + 2, + Index::hash_map(vec![0]), + ReaderProcessing::default(), + Default::default(), + ); w.add(vec![Record::Positive(a.to_vec())]); w.add(vec![Record::Positive(b.to_vec())]); w.add(vec![Record::Negative(a.to_vec())]); @@ -714,7 +751,12 @@ mod tests { let a = vec![1i32.into(), "a".into()].into_boxed_slice(); let b = vec![1i32.into(), "b".into()].into_boxed_slice(); - let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default()); + let (r, mut w) = new( + 2, + Index::hash_map(vec![0]), + ReaderProcessing::default(), + Default::default(), + ); w.add(vec![Record::Positive(a.to_vec())]); w.add(vec![Record::Positive(b.to_vec())]); w.swap(); @@ -731,7 +773,12 @@ mod tests { let b = vec![1i32.into(), "b".into()].into_boxed_slice(); let c = vec![1i32.into(), "c".into()].into_boxed_slice(); - let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default()); + let (r, mut w) = new( + 2, + Index::hash_map(vec![0]), + ReaderProcessing::default(), + Default::default(), + ); w.add(vec![ Record::Positive(a.to_vec()), Record::Positive(b.to_vec()), @@ -761,6 +808,7 @@ mod tests { |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), + Default::default(), ); w.swap(); @@ -786,6 +834,7 @@ mod tests { |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), + Default::default(), ); w.swap(); @@ -805,6 +854,7 @@ mod tests { |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), + Default::default(), ); w.swap(); @@ -835,6 +885,7 @@ mod tests { |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), + Default::default(), ); w.swap(); @@ -856,6 +907,7 @@ mod tests { |_: &mut dyn Iterator, _| true, EvictionKind::Random, ReaderProcessing::default(), + Default::default(), ); w.swap(); diff --git a/readyset-dataflow/src/domain/mod.rs b/readyset-dataflow/src/domain/mod.rs index ac87b8145c..3c1e80a687 100644 --- a/readyset-dataflow/src/domain/mod.rs +++ b/readyset-dataflow/src/domain/mod.rs @@ -1846,6 +1846,7 @@ impl Domain { }, self.eviction_kind, r.reader_processing().clone(), + node_index, ); let shard = *self.shard.as_ref().unwrap_or(&0); @@ -1895,8 +1896,12 @@ impl Domain { expected_type: NodeType::Reader, })?; - let (r_part, w_part) = - backlog::new(num_columns, index, r.reader_processing().clone()); + let (r_part, w_part) = backlog::new( + num_columns, + index, + r.reader_processing().clone(), + node_index, + ); let shard = *self.shard.as_ref().unwrap_or(&0); // TODO(ENG-838): Don't recreate every single node on leader failure.