Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: suport projection pushdown in metadata cache #25675

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions influxdb3_cache/src/meta_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,23 +218,38 @@ impl MetaCache {
/// filtered out of the result.
pub(crate) fn to_record_batch(
&self,
schema: SchemaRef,
predicates: &IndexMap<ColumnId, Predicate>,
projection: Option<&[usize]>,
limit: Option<usize>,
) -> Result<RecordBatch, ArrowError> {
let n_columns = projection
.as_ref()
.and_then(|p| p.iter().max().copied())
.unwrap_or(self.column_ids.len())
+ 1;

// predicates may not be provided for all columns in the cache, or not be provided in the
// order of columns in the cache. This re-orders them to the correct order, and fills in any
// gaps with None.
let predicates: Vec<Option<&Predicate>> = self
.column_ids
.iter()
.map(|id| predicates.get(id))
// if a projection was included, we only want to take as many columns as necessary:
.take(n_columns)
.collect();

// Uses a [`StringViewBuilder`] to compose the set of [`RecordBatch`]es. This is convenient for
// the sake of nested caches, where a predicate on a higher branch in the cache will need to have
// its value in the outputted batches duplicated.
let mut builders: Vec<StringViewBuilder> = (0..self.column_ids.len())
.map(|_| StringViewBuilder::new())
let mut builders: Vec<Option<StringViewBuilder>> = (0..self.column_ids.len())
.map(|i| {
projection
.map(|p| p.contains(&i).then(StringViewBuilder::new))
.unwrap_or_else(|| Some(StringViewBuilder::new()))
})
.take(n_columns)
.collect();

let expired_time_ns = self.expired_time_ns();
Expand All @@ -248,9 +263,10 @@ impl MetaCache {
);

RecordBatch::try_new(
Arc::clone(&self.schema),
schema,
builders
.into_iter()
.flatten()
.map(|mut builder| Arc::new(builder.finish()) as ArrayRef)
.collect(),
)
Expand Down Expand Up @@ -419,12 +435,12 @@ impl Node {
expired_time_ns: i64,
predicates: &[Option<&Predicate>],
mut limit: usize,
builders: &mut [StringViewBuilder],
builders: &mut [Option<StringViewBuilder>],
) -> usize {
let mut total_count = 0;
let (predicate, next_predicates) = predicates
.split_first()
.expect("predicates should not be empty");
let Some((predicate, next_predicates)) = predicates.split_first() else {
return total_count;
};
// if there is a predicate, evaluate it, otherwise, just grab everything from the node:
let values_and_nodes = if let Some(predicate) = predicate {
self.evaluate_predicate(expired_time_ns, predicate, limit)
Expand All @@ -450,24 +466,32 @@ impl Node {
next_builders,
);
if count > 0 {
// we are not on a terminal node in the cache, so create a block, as this value
// repeated `count` times, i.e., depending on how many values come out of
// subsequent nodes:
let block = builder.append_block(value.0.as_bytes().into());
for _ in 0..count {
builder
.try_append_view(block, 0u32, value.0.as_bytes().len() as u32)
.expect("append view for known valid block, offset and length");
if let Some(builder) = builder {
// we are not on a terminal node in the cache, so create a block, as this value
// repeated `count` times, i.e., depending on how many values come out of
// subsequent nodes:
let block = builder.append_block(value.0.as_bytes().into());
for _ in 0..count {
builder
.try_append_view(block, 0u32, value.0.as_bytes().len() as u32)
.expect("append view for known valid block, offset and length");
}
}
total_count += count;
} else if next_predicates.is_empty() && next_builders.is_empty() {
if let Some(builder) = builder {
builder.append_value(value.0);
}
}
if let Some(new_limit) = limit.checked_sub(count) {
limit = new_limit;
} else {
break;
}
} else {
builder.append_value(value.0);
if let Some(builder) = builder {
builder.append_value(value.0);
}
total_count += 1;
}
}
Expand Down
73 changes: 44 additions & 29 deletions influxdb3_cache/src/meta_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ mod tests {

for tc in test_cases {
let records = cache
.to_record_batch(&tc.predicates, None)
.to_record_batch(cache.arrow_schema(), &tc.predicates, None, None)
.expect("get record batches");
println!("{}", tc.desc);
assert_batches_sorted_eq!(tc.expected, &[records]);
Expand Down Expand Up @@ -241,7 +241,9 @@ mod tests {
// check the cache before prune:
// NOTE: this does not include entries that have surpassed the max_age of the cache, though,
// there are still more than the cache's max cardinality, as it has not yet been pruned.
let records = cache.to_record_batch(&Default::default(), None).unwrap();
let records = cache
.to_record_batch(cache.arrow_schema(), &Default::default(), None, None)
.unwrap();
assert_batches_sorted_eq!(
[
"+-----------+------+",
Expand All @@ -267,7 +269,9 @@ mod tests {
&[records]
);
cache.prune();
let records = cache.to_record_batch(&Default::default(), None).unwrap();
let records = cache
.to_record_batch(cache.arrow_schema(), &Default::default(), None, None)
.unwrap();
assert_batches_sorted_eq!(
[
"+-----------+------+",
Expand Down Expand Up @@ -330,7 +334,9 @@ mod tests {
}

// no limit produces all records in the cache:
let batches = cache.to_record_batch(&Default::default(), None).unwrap();
let batches = cache
.to_record_batch(cache.arrow_schema(), &Default::default(), None, None)
.unwrap();
assert_batches_eq!(
[
"+---------+------+",
Expand All @@ -354,7 +360,9 @@ mod tests {
);

// applying a limit only returns that number of records from the cache:
let batches = cache.to_record_batch(&Default::default(), Some(5)).unwrap();
let batches = cache
.to_record_batch(cache.arrow_schema(), &Default::default(), None, Some(5))
.unwrap();
assert_batches_eq!(
[
"+---------+------+",
Expand Down Expand Up @@ -499,7 +507,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -513,7 +521,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -527,7 +535,7 @@ mod tests {
"| us-east | a |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -542,7 +550,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -556,7 +564,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 NOT IN (a)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -575,7 +583,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -594,7 +602,7 @@ mod tests {
"| eu-west | l |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 NOT IN (ca-cent,ca-east,us-east,us-west)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -610,7 +618,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-east,us-east)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -625,7 +633,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[host@1 IN (d,e)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -640,7 +648,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -655,7 +663,7 @@ mod tests {
"| us-east | b |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (us-east)], [host@1 IN (a,b)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -668,18 +676,13 @@ mod tests {
"| ca-cent |",
"| ca-east |",
"| ca-west |",
"| ca-west |",
"| eu-cent |",
"| eu-cent |",
"| eu-west |",
"| eu-west |",
"| us-east |",
"| us-east |",
"| us-west |",
"| us-west |",
"+---------+",
],
explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -698,7 +701,7 @@ mod tests {
"| us-west |",
"+---------+",
],
explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0] inner=MemoryExec: partitions=1, partition_sizes=[1",
// it seems that DISTINCT changes around the order of results
use_sorted_assert: true,
},
Expand All @@ -723,12 +726,25 @@ mod tests {
"| l |", // commenting for no new line
"+------+", // commenting for no new line
],
explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
// this column will not be sorted since the order of elements depends on the next level
// up in the cache, so the `region` column is iterated over in order, but the nested
// `host` values, although sorted within `region`s, will not be globally sorted.
use_sorted_assert: true,
},
TestCase {
_desc: "project host column",
sql: "SELECT host FROM meta_cache('cpu') WHERE region = 'ca-cent'",
expected: &[
"+------+", // commenting for no new line
"| host |", // commenting for no new line
"+------+", // commenting for no new line
"| f |", // commenting for no new line
"+------+", // commenting for no new line
],
explain_contains: "MetaCacheExec: projection=[region@0, host@1] predicates=[[region@0 IN (ca-cent)]] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
_desc: "limit clause",
sql: "SELECT * FROM meta_cache('cpu') LIMIT 8",
Expand All @@ -746,7 +762,7 @@ mod tests {
"| eu-west | l |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] limit=8 inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -762,7 +778,7 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] limit=16 inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
TestCase {
Expand All @@ -779,15 +795,14 @@ mod tests {
"| us-west | d |",
"+---------+------+",
],
explain_contains: "MetaCacheExec: inner=MemoryExec: partitions=1, partition_sizes=[1]",
explain_contains: "MetaCacheExec: projection=[region@0, host@1] inner=MemoryExec: partitions=1, partition_sizes=[1]",
use_sorted_assert: false,
},
];

for tc in test_cases {
println!("test case: {}\n{}\n", tc._desc, tc.sql);
let results = ctx.sql(tc.sql).await.unwrap().collect().await.unwrap();
// Uncommenting may make figuring out which test case is failing easier:
println!("test case: {}", tc._desc);
if tc.use_sorted_assert {
assert_batches_sorted_eq!(tc.expected, &results);
} else {
Expand Down
Loading
Loading