diff --git a/src/reader/decompress.rs b/src/reader/decompress.rs index a4b5b58..da25aa6 100644 --- a/src/reader/decompress.rs +++ b/src/reader/decompress.rs @@ -7,7 +7,7 @@ use bytes::{Bytes, BytesMut}; use fallible_streaming_iterator::FallibleStreamingIterator; use snafu::ResultExt; -use crate::error::{self, OrcError}; +use crate::error::{self, OrcError, Result}; use crate::proto::{self, CompressionKind}; // Spec states default is 256K @@ -105,51 +105,94 @@ fn decode_header(bytes: [u8; 3]) -> CompressionHeader { } } -fn decompress_block( - compression: Compression, - compressed_bytes: &[u8], - scratch: &mut Vec, -) -> Result<(), OrcError> { - match compression.compression_type { - CompressionType::Zlib => { - let mut gz = flate2::read::DeflateDecoder::new(compressed_bytes); - scratch.clear(); - gz.read_to_end(scratch).context(error::IoSnafu)?; - } - CompressionType::Zstd => { - let mut reader = - zstd::Decoder::new(compressed_bytes).context(error::BuildZstdDecoderSnafu)?; - scratch.clear(); - reader.read_to_end(scratch).context(error::IoSnafu)?; - } - CompressionType::Snappy => { - let len = snap::raw::decompress_len(compressed_bytes) - .context(error::BuildSnappyDecoderSnafu)?; - scratch.resize(len, 0); - let mut decoder = snap::raw::Decoder::new(); - decoder - .decompress(compressed_bytes, scratch) - .context(error::BuildSnappyDecoderSnafu)?; - } - CompressionType::Lzo => { - let decompressed = lzokay_native::decompress_all(compressed_bytes, None) - .context(error::BuildLzoDecoderSnafu)?; - // TODO: better way to utilize scratch here - scratch.clear(); - scratch.extend(decompressed); - } - CompressionType::Lz4 => { - let decompressed = lz4_flex::block::decompress( - compressed_bytes, - compression.max_decompressed_block_size, - ) - .context(error::BuildLz4DecoderSnafu)?; - // TODO: better way to utilize scratch here - scratch.clear(); - scratch.extend(decompressed); - } - }; - Ok(()) +pub(crate) trait DecompressorVariant: Send { + fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()>; +} + +#[derive(Debug, Clone, Copy)] +struct Zlib; +#[derive(Debug, Clone, Copy)] +struct Zstd; +#[derive(Debug, Clone, Copy)] +struct Snappy; +#[derive(Debug, Clone, Copy)] +struct Lzo; +#[derive(Debug, Clone, Copy)] +struct Lz4 { + max_decompressed_block_size: usize, +} + +impl DecompressorVariant for Zlib { + fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { + let mut gz = flate2::read::DeflateDecoder::new(compressed_bytes); + scratch.clear(); + gz.read_to_end(scratch).context(error::IoSnafu)?; + Ok(()) + } +} + +impl DecompressorVariant for Zstd { + fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { + let mut reader = + zstd::Decoder::new(compressed_bytes).context(error::BuildZstdDecoderSnafu)?; + scratch.clear(); + reader.read_to_end(scratch).context(error::IoSnafu)?; + Ok(()) + } +} + +impl DecompressorVariant for Snappy { + fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { + let len = + snap::raw::decompress_len(compressed_bytes).context(error::BuildSnappyDecoderSnafu)?; + scratch.resize(len, 0); + let mut decoder = snap::raw::Decoder::new(); + decoder + .decompress(compressed_bytes, scratch) + .context(error::BuildSnappyDecoderSnafu)?; + Ok(()) + } +} + +impl DecompressorVariant for Lzo { + fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { + let decompressed = lzokay_native::decompress_all(compressed_bytes, None) + .context(error::BuildLzoDecoderSnafu)?; + // TODO: better way to utilize scratch here + scratch.clear(); + scratch.extend(decompressed); + Ok(()) + } +} + +impl DecompressorVariant for Lz4 { + fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec) -> Result<()> { + let decompressed = + lz4_flex::block::decompress(compressed_bytes, self.max_decompressed_block_size) + .context(error::BuildLz4DecoderSnafu)?; + // TODO: better way to utilize scratch here + scratch.clear(); + scratch.extend(decompressed); + Ok(()) + } +} + +// TODO: push this earlier so we don't check this variant each time +fn get_decompressor_variant( + Compression { + compression_type, + max_decompressed_block_size, + }: Compression, +) -> Box { + match compression_type { + CompressionType::Zlib => Box::new(Zlib), + CompressionType::Snappy => Box::new(Snappy), + CompressionType::Lzo => Box::new(Lzo), + CompressionType::Lz4 => Box::new(Lz4 { + max_decompressed_block_size, + }), + CompressionType::Zstd => Box::new(Zstd), + } } enum State { @@ -160,16 +203,16 @@ enum State { struct DecompressorIter { stream: BytesMut, current: Option, // when we have compression but the value is original - compression: Option, + compression: Option>, scratch: Vec, } impl DecompressorIter { - pub fn new(stream: Bytes, compression: Option, scratch: Vec) -> Self { + fn new(stream: Bytes, compression: Option, scratch: Vec) -> Self { Self { stream: BytesMut::from(stream.as_ref()), current: None, - compression, + compression: compression.map(get_decompressor_variant), scratch, } } @@ -187,7 +230,7 @@ impl FallibleStreamingIterator for DecompressorIter { return Ok(()); } - match self.compression { + match &self.compression { Some(compression) => { // TODO: take stratch from current State::Compressed for re-use let header = self.stream.split_to(3); @@ -199,7 +242,7 @@ impl FallibleStreamingIterator for DecompressorIter { } CompressionHeader::Compressed(length) => { let compressed = self.stream.split_to(length as usize); - decompress_block(compression, &compressed, &mut self.scratch)?; + compression.decompress_block(&compressed, &mut self.scratch)?; self.current = Some(State::Compressed(std::mem::take(&mut self.scratch))); } }; @@ -239,6 +282,15 @@ impl Decompressor { is_first: true, } } + + // TODO: remove need for this upstream + pub fn empty() -> Self { + Self { + decompressor: DecompressorIter::new(Bytes::new(), None, vec![]), + offset: 0, + is_first: true, + } + } } impl std::io::Read for Decompressor { diff --git a/src/stripe.rs b/src/stripe.rs index 106d8db..0b54e8c 100644 --- a/src/stripe.rs +++ b/src/stripe.rs @@ -230,7 +230,7 @@ impl StreamMap { // See the integration::meta_data test for an example of this // TODO: some better way to handle this? self.get_opt(column, kind) - .unwrap_or_else(|| Decompressor::new(Bytes::new(), self.compression, vec![])) + .unwrap_or_else(Decompressor::empty) } pub fn get_opt(&self, column: &Column, kind: Kind) -> Option {