Skip to content

Commit

Permalink
reader-map: Add unique identifier to metrics
Browse files Browse the repository at this point in the history
Plumb through the global node index of the reader down to the
reader-map metrics, and add it as a label to the histograms. This is
the only identifier available when creating the reader node during
migration/recovery.

Change-Id: Ic6052036dc92d76d87f797c5af8b5aa42fbfa02c
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/7884
Tested-by: Buildkite CI
Reviewed-by: Sidney Cammeresi <sac@readyset.io>
  • Loading branch information
jasobrown-rs committed Aug 27, 2024
1 parent 3988524 commit 7c63128
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions reader-map/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ itertools = { workspace = true }
iter-enum = { workspace = true }
left-right = { workspace = true }
metrics = { workspace = true }
petgraph = { workspace = true }
rand = { workspace = true }
smallvec = { workspace = true, features = ["union"] }
thiserror = { workspace = true }
Expand Down
14 changes: 10 additions & 4 deletions reader-map/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use iter_enum::{ExactSizeIterator, Iterator};
use itertools::Either;
use metrics::{register_histogram, Histogram};
use partial_map::PartialMap;
use petgraph::graph::NodeIndex;
use readyset_client::internal::IndexType;
use readyset_util::ranges::{Bound, RangeBounds};

Expand Down Expand Up @@ -321,9 +322,13 @@ pub(crate) struct WriteMetrics {
}

impl WriteMetrics {
fn new() -> Self {
let entry_updated = register_histogram!(READER_MAP_UPDATES);
let lifetime_evict = register_histogram!(READER_MAP_LIFETIMES);
fn new(node_index: Option<NodeIndex>) -> Self {
let idx = match node_index {
Some(idx) => idx.index().to_string(),
None => "-1".to_string(),
};
let entry_updated = register_histogram!(READER_MAP_UPDATES, "node_idx" => idx.clone());
let lifetime_evict = register_histogram!(READER_MAP_LIFETIMES, "node_idx" => idx);

Self {
entry_updated,
Expand Down Expand Up @@ -409,6 +414,7 @@ where
hasher: S,
eviction_strategy: EvictionStrategy,
insertion_order: Option<I>,
node_index: Option<NodeIndex>,
) -> Self {
Inner {
data: Data::with_index_type_and_hasher(index_type, hasher.clone()),
Expand All @@ -418,7 +424,7 @@ where
hasher,
eviction_strategy,
insertion_order,
metrics: WriteMetrics::new(),
metrics: WriteMetrics::new(node_index),
}
}

Expand Down
18 changes: 18 additions & 0 deletions reader-map/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ use std::hash::{BuildHasher, Hash};

pub use eviction::EvictionStrategy;
use partial_map::InsertionOrder;
use petgraph::graph::NodeIndex;
use readyset_client::internal::IndexType;

use crate::inner::Inner;
Expand Down Expand Up @@ -277,6 +278,9 @@ pub struct Options<M, T, S, I> {
capacity: Option<usize>,
eviction_strategy: EvictionStrategy,
insertion_order: Option<I>,

// The global index of the reader node that will hold this `reader-map`.
node_index: Option<NodeIndex>,
}

impl<M, T, S, I> fmt::Debug for Options<M, T, S, I>
Expand All @@ -291,6 +295,7 @@ where
.field("timestamp", &self.timestamp)
.field("capacity", &self.capacity)
.field("order", &self.insertion_order)
.field("node_index", &self.node_index)
.finish_non_exhaustive()
}
}
Expand All @@ -305,6 +310,7 @@ impl Default for Options<(), (), RandomState, DefaultInsertionOrder> {
capacity: None,
eviction_strategy: Default::default(),
insertion_order: None,
node_index: None,
}
}
}
Expand All @@ -320,6 +326,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
capacity: self.capacity,
eviction_strategy: self.eviction_strategy,
insertion_order: self.insertion_order,
node_index: self.node_index,
}
}

Expand All @@ -333,6 +340,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
capacity: self.capacity,
eviction_strategy: self.eviction_strategy,
insertion_order: self.insertion_order,
node_index: self.node_index,
}
}

Expand All @@ -346,6 +354,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
capacity: Some(capacity),
eviction_strategy: self.eviction_strategy,
insertion_order: self.insertion_order,
node_index: self.node_index,
}
}

Expand All @@ -359,6 +368,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
capacity: self.capacity,
eviction_strategy: self.eviction_strategy,
insertion_order: self.insertion_order,
node_index: self.node_index,
}
}

Expand All @@ -372,6 +382,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
capacity: self.capacity,
eviction_strategy: self.eviction_strategy,
insertion_order,
node_index: self.node_index,
}
}

Expand All @@ -387,6 +398,12 @@ impl<M, T, S, I> Options<M, T, S, I> {
self
}

/// Sets the eviction strategy for the map.
pub fn with_node_index(mut self, node_index: NodeIndex) -> Self {
self.node_index = Some(node_index);
self
}

/// Create the map, and construct the read and write handles used to access it.
#[allow(clippy::type_complexity)]
pub fn construct<K, V>(self) -> (WriteHandle<K, V, I, M, T, S>, ReadHandle<K, V, I, M, T, S>)
Expand All @@ -405,6 +422,7 @@ impl<M, T, S, I> Options<M, T, S, I> {
self.hasher,
self.eviction_strategy,
self.insertion_order,
self.node_index,
);

let (mut w, r) = left_right::new_from_empty(inner);
Expand Down
68 changes: 60 additions & 8 deletions readyset-dataflow/src/backlog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@ pub(crate) fn new(
cols: usize,
index: Index,
reader_processing: ReaderProcessing,
node_index: NodeIndex,
) -> (SingleReadHandle, WriteHandle) {
new_inner(cols, index, None, EvictionKind::Random, reader_processing)
new_inner(
cols,
index,
None,
EvictionKind::Random,
reader_processing,
node_index,
)
}

/// Allocate a new partially materialized end-user facing result table.
Expand All @@ -55,6 +63,7 @@ pub(crate) fn new_partial<F>(
trigger: F,
eviction_kind: EvictionKind,
reader_processing: ReaderProcessing,
node_index: NodeIndex,
) -> (SingleReadHandle, WriteHandle)
where
F: Fn(&mut dyn Iterator<Item = KeyComparison>, Relation) -> bool + 'static + Send + Sync,
Expand All @@ -65,6 +74,7 @@ where
Some(Arc::new(trigger)),
eviction_kind,
reader_processing,
node_index,
)
}

Expand All @@ -85,6 +95,7 @@ fn new_inner(
>,
eviction_kind: EvictionKind,
reader_processing: ReaderProcessing,
node_index: NodeIndex,
) -> (SingleReadHandle, WriteHandle) {
let contiguous = {
let mut contiguous = true;
Expand Down Expand Up @@ -117,6 +128,7 @@ fn new_inner(
use reader_map;
let (mut w, r) = reader_map::Options::default()
.with_meta(-1)
.with_node_index(node_index)
.with_timestamp(Timestamp::default())
.with_hasher(RandomState::default())
.with_index_type(index.index_type)
Expand Down Expand Up @@ -615,7 +627,12 @@ mod tests {
fn store_works() {
let a = vec![1i32.into(), "a".into()].into_boxed_slice();

let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
let (r, mut w) = new(
2,
Index::hash_map(vec![0]),
ReaderProcessing::default(),
Default::default(),
);

w.swap();

Expand All @@ -639,7 +656,12 @@ mod tests {
use std::thread;

let n = 1_000;
let (r, mut w) = new(1, Index::hash_map(vec![0]), ReaderProcessing::default());
let (r, mut w) = new(
1,
Index::hash_map(vec![0]),
ReaderProcessing::default(),
Default::default(),
);
let jh = thread::spawn(move || {
for i in 0..n {
w.add(vec![Record::Positive(vec![i.into()])]);
Expand Down Expand Up @@ -668,7 +690,12 @@ mod tests {
let a = vec![1i32.into(), "a".into()].into_boxed_slice();
let b = vec![1i32.into(), "b".into()].into_boxed_slice();

let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
let (r, mut w) = new(
2,
Index::hash_map(vec![0]),
ReaderProcessing::default(),
Default::default(),
);
w.add(vec![Record::Positive(a.to_vec())]);
w.swap();
w.add(vec![Record::Positive(b.to_vec())]);
Expand All @@ -683,7 +710,12 @@ mod tests {
let b = vec![1i32.into(), "b".into()].into_boxed_slice();
let c = vec![1i32.into(), "c".into()].into_boxed_slice();

let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
let (r, mut w) = new(
2,
Index::hash_map(vec![0]),
ReaderProcessing::default(),
Default::default(),
);
w.add(vec![Record::Positive(a.to_vec())]);
w.add(vec![Record::Positive(b.to_vec())]);
w.swap();
Expand All @@ -699,7 +731,12 @@ mod tests {
let a = vec![1i32.into(), "a".into()].into_boxed_slice();
let b = vec![1i32.into(), "b".into()].into_boxed_slice();

let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
let (r, mut w) = new(
2,
Index::hash_map(vec![0]),
ReaderProcessing::default(),
Default::default(),
);
w.add(vec![Record::Positive(a.to_vec())]);
w.add(vec![Record::Positive(b.to_vec())]);
w.add(vec![Record::Negative(a.to_vec())]);
Expand All @@ -714,7 +751,12 @@ mod tests {
let a = vec![1i32.into(), "a".into()].into_boxed_slice();
let b = vec![1i32.into(), "b".into()].into_boxed_slice();

let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
let (r, mut w) = new(
2,
Index::hash_map(vec![0]),
ReaderProcessing::default(),
Default::default(),
);
w.add(vec![Record::Positive(a.to_vec())]);
w.add(vec![Record::Positive(b.to_vec())]);
w.swap();
Expand All @@ -731,7 +773,12 @@ mod tests {
let b = vec![1i32.into(), "b".into()].into_boxed_slice();
let c = vec![1i32.into(), "c".into()].into_boxed_slice();

let (r, mut w) = new(2, Index::hash_map(vec![0]), ReaderProcessing::default());
let (r, mut w) = new(
2,
Index::hash_map(vec![0]),
ReaderProcessing::default(),
Default::default(),
);
w.add(vec![
Record::Positive(a.to_vec()),
Record::Positive(b.to_vec()),
Expand Down Expand Up @@ -761,6 +808,7 @@ mod tests {
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
Default::default(),
);
w.swap();

Expand All @@ -786,6 +834,7 @@ mod tests {
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
Default::default(),
);
w.swap();

Expand All @@ -805,6 +854,7 @@ mod tests {
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
Default::default(),
);
w.swap();

Expand Down Expand Up @@ -835,6 +885,7 @@ mod tests {
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
Default::default(),
);
w.swap();

Expand All @@ -856,6 +907,7 @@ mod tests {
|_: &mut dyn Iterator<Item = KeyComparison>, _| true,
EvictionKind::Random,
ReaderProcessing::default(),
Default::default(),
);
w.swap();

Expand Down
9 changes: 7 additions & 2 deletions readyset-dataflow/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,7 @@ impl Domain {
},
self.eviction_kind,
r.reader_processing().clone(),
node_index,
);

let shard = *self.shard.as_ref().unwrap_or(&0);
Expand Down Expand Up @@ -1895,8 +1896,12 @@ impl Domain {
expected_type: NodeType::Reader,
})?;

let (r_part, w_part) =
backlog::new(num_columns, index, r.reader_processing().clone());
let (r_part, w_part) = backlog::new(
num_columns,
index,
r.reader_processing().clone(),
node_index,
);

let shard = *self.shard.as_ref().unwrap_or(&0);
// TODO(ENG-838): Don't recreate every single node on leader failure.
Expand Down

0 comments on commit 7c63128

Please sign in to comment.