From f225acbdfaa64f71e255c54a4d9118ee1910f8ba Mon Sep 17 00:00:00 2001 From: Jefffrey Date: Wed, 25 Sep 2024 07:11:30 +1000 Subject: [PATCH] Use PrimitiveValueDecoder::decode_spaced in string/list/map lengths decoding --- src/array_decoder/list.rs | 25 +++++++------------------ src/array_decoder/map.rs | 25 +++++++------------------ src/array_decoder/mod.rs | 23 ----------------------- src/array_decoder/string.rs | 18 ++++++++---------- 4 files changed, 22 insertions(+), 69 deletions(-) diff --git a/src/array_decoder/list.rs b/src/array_decoder/list.rs index 27e6bd2..5ab427f 100644 --- a/src/array_decoder/list.rs +++ b/src/array_decoder/list.rs @@ -22,7 +22,7 @@ use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::{Field, FieldRef}; use snafu::ResultExt; -use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls}; +use crate::array_decoder::derive_present_vec; use crate::column::{get_present_vec, Column}; use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder}; use crate::proto::stream::Kind; @@ -67,27 +67,16 @@ impl ArrayBatchDecoder for ListArrayDecoder { ) -> Result { let present = derive_present_vec(&mut self.present, parent_present, batch_size); - // How many lengths we need to fetch - let elements_to_fetch = if let Some(present) = &present { - present.iter().filter(|&&is_present| is_present).count() + let mut lengths = vec![0; batch_size]; + if let Some(present) = &present { + self.lengths.decode_spaced(&mut lengths, present)?; } else { - batch_size - }; - let lengths = self - .lengths - .by_ref() - .take(elements_to_fetch) - .collect::>>()?; - debug_assert_eq!( - lengths.len(), - elements_to_fetch, - "less lengths than expected in ListArray" - ); + self.lengths.decode(&mut lengths)?; + } let total_length: i64 = lengths.iter().sum(); // Fetch child array as one Array with total_length elements let child_array = self.inner.next_batch(total_length as usize, None)?; - let lengths = populate_lengths_with_nulls(lengths, batch_size, &present); - let offsets = OffsetBuffer::from_lengths(lengths); + let offsets = OffsetBuffer::from_lengths(lengths.into_iter().map(|l| l as usize)); let null_buffer = present.map(NullBuffer::from); let array = ListArray::try_new(self.field.clone(), offsets, child_array, null_buffer) diff --git a/src/array_decoder/map.rs b/src/array_decoder/map.rs index c41f168..e2a3419 100644 --- a/src/array_decoder/map.rs +++ b/src/array_decoder/map.rs @@ -22,7 +22,7 @@ use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::{Field, Fields}; use snafu::ResultExt; -use crate::array_decoder::{derive_present_vec, populate_lengths_with_nulls}; +use crate::array_decoder::derive_present_vec; use crate::column::{get_present_vec, Column}; use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder}; use crate::error::{ArrowSnafu, Result}; @@ -78,22 +78,12 @@ impl ArrayBatchDecoder for MapArrayDecoder { ) -> Result { let present = derive_present_vec(&mut self.present, parent_present, batch_size); - // How many lengths we need to fetch - let elements_to_fetch = if let Some(present) = &present { - present.iter().filter(|&&is_present| is_present).count() + let mut lengths = vec![0; batch_size]; + if let Some(present) = &present { + self.lengths.decode_spaced(&mut lengths, present)?; } else { - batch_size - }; - let lengths = self - .lengths - .by_ref() - .take(elements_to_fetch) - .collect::>>()?; - debug_assert_eq!( - lengths.len(), - elements_to_fetch, - "less lengths than expected in MapArray" - ); + self.lengths.decode(&mut lengths)?; + } let total_length: i64 = lengths.iter().sum(); // Fetch key and value arrays, each with total_length elements // Fetch child array as one Array with total_length elements @@ -103,8 +93,7 @@ impl ArrayBatchDecoder for MapArrayDecoder { let entries = StructArray::try_new(self.fields.clone(), vec![keys_array, values_array], None) .context(ArrowSnafu)?; - let lengths = populate_lengths_with_nulls(lengths, batch_size, &present); - let offsets = OffsetBuffer::from_lengths(lengths); + let offsets = OffsetBuffer::from_lengths(lengths.into_iter().map(|l| l as usize)); let null_buffer = present.map(NullBuffer::from); let field = Arc::new(Field::new_struct("entries", self.fields.clone(), false)); diff --git a/src/array_decoder/mod.rs b/src/array_decoder/mod.rs index 612f873..c271a1f 100644 --- a/src/array_decoder/mod.rs +++ b/src/array_decoder/mod.rs @@ -241,29 +241,6 @@ fn derive_present_vec( } } -/// Fix the lengths to account for nulls (represented as 0 length) -fn populate_lengths_with_nulls( - lengths: Vec, - batch_size: usize, - present: &Option>, -) -> Vec { - if let Some(present) = present { - let mut lengths_with_nulls = Vec::with_capacity(batch_size); - let mut lengths = lengths.iter(); - for &is_present in present { - if is_present { - let length = *lengths.next().unwrap(); - lengths_with_nulls.push(length as usize); - } else { - lengths_with_nulls.push(0); - } - } - lengths_with_nulls - } else { - lengths.into_iter().map(|l| l as usize).collect() - } -} - fn create_null_buffer(present: Option>) -> Option { match present { // Edge case where keys of map cannot have a null buffer diff --git a/src/array_decoder/string.rs b/src/array_decoder/string.rs index e75e323..a227c45 100644 --- a/src/array_decoder/string.rs +++ b/src/array_decoder/string.rs @@ -25,7 +25,7 @@ use arrow::compute::kernels::cast; use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType, GenericStringType}; use snafu::ResultExt; -use crate::array_decoder::{create_null_buffer, derive_present_vec, populate_lengths_with_nulls}; +use crate::array_decoder::{create_null_buffer, derive_present_vec}; use crate::column::{get_present_vec, Column}; use crate::compression::Decompressor; use crate::encoding::{get_unsigned_rle_reader, PrimitiveValueDecoder}; @@ -116,14 +116,12 @@ impl GenericByteArrayDecoder { ) -> Result> { let present = derive_present_vec(&mut self.present, parent_present, batch_size); - // How many lengths we need to fetch - let elements_to_fetch = if let Some(present) = &present { - present.iter().filter(|&&is_present| is_present).count() + let mut lengths = vec![0; batch_size]; + if let Some(present) = &present { + self.lengths.decode_spaced(&mut lengths, present)?; } else { - batch_size - }; - let mut lengths = vec![0; elements_to_fetch]; - self.lengths.decode(&mut lengths)?; + self.lengths.decode(&mut lengths)?; + } let total_length: i64 = lengths.iter().sum(); // Fetch all data bytes at once let mut bytes = Vec::with_capacity(total_length as usize); @@ -133,8 +131,8 @@ impl GenericByteArrayDecoder { .read_to_end(&mut bytes) .context(IoSnafu)?; let bytes = Buffer::from(bytes); - let lengths = populate_lengths_with_nulls(lengths, batch_size, &present); - let offsets = OffsetBuffer::::from_lengths(lengths); + let offsets = + OffsetBuffer::::from_lengths(lengths.into_iter().map(|l| l as usize)); let null_buffer = create_null_buffer(present); let array =