Skip to content

Commit

Permalink
Minor refactoring around decompression handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Jefffrey committed May 13, 2024
1 parent 1ee1df9 commit 0617ecb
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 52 deletions.
154 changes: 103 additions & 51 deletions src/reader/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use bytes::{Bytes, BytesMut};
use fallible_streaming_iterator::FallibleStreamingIterator;
use snafu::ResultExt;

use crate::error::{self, OrcError};
use crate::error::{self, OrcError, Result};
use crate::proto::{self, CompressionKind};

// Spec states default is 256K
Expand Down Expand Up @@ -105,51 +105,94 @@ fn decode_header(bytes: [u8; 3]) -> CompressionHeader {
}
}

fn decompress_block(
compression: Compression,
compressed_bytes: &[u8],
scratch: &mut Vec<u8>,
) -> Result<(), OrcError> {
match compression.compression_type {
CompressionType::Zlib => {
let mut gz = flate2::read::DeflateDecoder::new(compressed_bytes);
scratch.clear();
gz.read_to_end(scratch).context(error::IoSnafu)?;
}
CompressionType::Zstd => {
let mut reader =
zstd::Decoder::new(compressed_bytes).context(error::BuildZstdDecoderSnafu)?;
scratch.clear();
reader.read_to_end(scratch).context(error::IoSnafu)?;
}
CompressionType::Snappy => {
let len = snap::raw::decompress_len(compressed_bytes)
.context(error::BuildSnappyDecoderSnafu)?;
scratch.resize(len, 0);
let mut decoder = snap::raw::Decoder::new();
decoder
.decompress(compressed_bytes, scratch)
.context(error::BuildSnappyDecoderSnafu)?;
}
CompressionType::Lzo => {
let decompressed = lzokay_native::decompress_all(compressed_bytes, None)
.context(error::BuildLzoDecoderSnafu)?;
// TODO: better way to utilize scratch here
scratch.clear();
scratch.extend(decompressed);
}
CompressionType::Lz4 => {
let decompressed = lz4_flex::block::decompress(
compressed_bytes,
compression.max_decompressed_block_size,
)
.context(error::BuildLz4DecoderSnafu)?;
// TODO: better way to utilize scratch here
scratch.clear();
scratch.extend(decompressed);
}
};
Ok(())
pub(crate) trait DecompressorVariant: Send {
fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec<u8>) -> Result<()>;
}

#[derive(Debug, Clone, Copy)]
struct Zlib;
#[derive(Debug, Clone, Copy)]
struct Zstd;
#[derive(Debug, Clone, Copy)]
struct Snappy;
#[derive(Debug, Clone, Copy)]
struct Lzo;
#[derive(Debug, Clone, Copy)]
struct Lz4 {
max_decompressed_block_size: usize,
}

impl DecompressorVariant for Zlib {
fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec<u8>) -> Result<()> {
let mut gz = flate2::read::DeflateDecoder::new(compressed_bytes);
scratch.clear();
gz.read_to_end(scratch).context(error::IoSnafu)?;
Ok(())
}
}

impl DecompressorVariant for Zstd {
fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec<u8>) -> Result<()> {
let mut reader =
zstd::Decoder::new(compressed_bytes).context(error::BuildZstdDecoderSnafu)?;
scratch.clear();
reader.read_to_end(scratch).context(error::IoSnafu)?;
Ok(())
}
}

impl DecompressorVariant for Snappy {
fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec<u8>) -> Result<()> {
let len =
snap::raw::decompress_len(compressed_bytes).context(error::BuildSnappyDecoderSnafu)?;
scratch.resize(len, 0);
let mut decoder = snap::raw::Decoder::new();
decoder
.decompress(compressed_bytes, scratch)
.context(error::BuildSnappyDecoderSnafu)?;
Ok(())
}
}

impl DecompressorVariant for Lzo {
fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec<u8>) -> Result<()> {
let decompressed = lzokay_native::decompress_all(compressed_bytes, None)
.context(error::BuildLzoDecoderSnafu)?;
// TODO: better way to utilize scratch here
scratch.clear();
scratch.extend(decompressed);
Ok(())
}
}

impl DecompressorVariant for Lz4 {
fn decompress_block(&self, compressed_bytes: &[u8], scratch: &mut Vec<u8>) -> Result<()> {
let decompressed =
lz4_flex::block::decompress(compressed_bytes, self.max_decompressed_block_size)
.context(error::BuildLz4DecoderSnafu)?;
// TODO: better way to utilize scratch here
scratch.clear();
scratch.extend(decompressed);
Ok(())
}
}

// TODO: push this earlier so we don't check this variant each time
fn get_decompressor_variant(
Compression {
compression_type,
max_decompressed_block_size,
}: Compression,
) -> Box<dyn DecompressorVariant> {
match compression_type {
CompressionType::Zlib => Box::new(Zlib),
CompressionType::Snappy => Box::new(Snappy),
CompressionType::Lzo => Box::new(Lzo),
CompressionType::Lz4 => Box::new(Lz4 {
max_decompressed_block_size,
}),
CompressionType::Zstd => Box::new(Zstd),
}
}

enum State {
Expand All @@ -160,16 +203,16 @@ enum State {
struct DecompressorIter {
stream: BytesMut,
current: Option<State>, // when we have compression but the value is original
compression: Option<Compression>,
compression: Option<Box<dyn DecompressorVariant>>,
scratch: Vec<u8>,
}

impl DecompressorIter {
pub fn new(stream: Bytes, compression: Option<Compression>, scratch: Vec<u8>) -> Self {
fn new(stream: Bytes, compression: Option<Compression>, scratch: Vec<u8>) -> Self {
Self {
stream: BytesMut::from(stream.as_ref()),
current: None,
compression,
compression: compression.map(get_decompressor_variant),
scratch,
}
}
Expand All @@ -187,7 +230,7 @@ impl FallibleStreamingIterator for DecompressorIter {
return Ok(());
}

match self.compression {
match &self.compression {
Some(compression) => {
// TODO: take stratch from current State::Compressed for re-use
let header = self.stream.split_to(3);
Expand All @@ -199,7 +242,7 @@ impl FallibleStreamingIterator for DecompressorIter {
}
CompressionHeader::Compressed(length) => {
let compressed = self.stream.split_to(length as usize);
decompress_block(compression, &compressed, &mut self.scratch)?;
compression.decompress_block(&compressed, &mut self.scratch)?;
self.current = Some(State::Compressed(std::mem::take(&mut self.scratch)));
}
};
Expand Down Expand Up @@ -239,6 +282,15 @@ impl Decompressor {
is_first: true,
}
}

// TODO: remove need for this upstream
pub fn empty() -> Self {
Self {
decompressor: DecompressorIter::new(Bytes::new(), None, vec![]),
offset: 0,
is_first: true,
}
}
}

impl std::io::Read for Decompressor {
Expand Down
2 changes: 1 addition & 1 deletion src/stripe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl StreamMap {
// See the integration::meta_data test for an example of this
// TODO: some better way to handle this?
self.get_opt(column, kind)
.unwrap_or_else(|| Decompressor::new(Bytes::new(), self.compression, vec![]))
.unwrap_or_else(Decompressor::empty)
}

pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
Expand Down

0 comments on commit 0617ecb

Please sign in to comment.