Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for configuring time units through ArrowReaderBuilder::with_schema #93

Merged
merged 2 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ The following table lists how ORC data types are read into Arrow data types:
| Binary | Binary | |
| Decimal | Decimal128 | |
| Date | Date32 | |
| Timestamp | Timestamp(Nanosecond, None) | Timestamps before 1677-09-21 or after 2261-04-12 return `OrcError` because they cannot be represented as an i64 of nanoseconds |
| Timestamp instant | Timestamp(Nanosecond, UTC) | Timestamps before 1677-09-21 or after 2261-04-12 return `OrcError` because they cannot be represented as an i64 of nanoseconds |
| Timestamp | Timestamp(Nanosecond, None) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Timestamp instant | Timestamp(Nanosecond, UTC) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Struct | Struct | |
| List | List | |
| Map | Map | |
Expand Down
23 changes: 2 additions & 21 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type, UInt64Type};
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::datatypes::{
Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
TimeUnit,
};
use arrow::record_batch::RecordBatch;
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -456,27 +455,9 @@ pub fn array_decoder_factory(
);
new_decimal_decoder(column, stripe, *precision, *scale)?
}
DataType::Timestamp { .. } => {
// TODO: add support for any precision
ensure!(
field_type == ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type
}
);
new_timestamp_decoder(column, stripe)?
}
DataType::Timestamp { .. } => new_timestamp_decoder(column, field_type, stripe)?,
DataType::TimestampWithLocalTimezone { .. } => {
// TODO: add support for any precision and for arbitrary timezones
ensure!(
field_type == ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type
}
);
new_timestamp_instant_decoder(column, stripe)?
new_timestamp_instant_decoder(column, field_type, stripe)?
}

DataType::Date { .. } => {
Expand Down
224 changes: 163 additions & 61 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,106 +1,208 @@
use std::sync::Arc;

use crate::{
array_decoder::ArrowDataType,
column::{get_present_vec, Column},
error::Result,
error::{MismatchedSchemaSnafu, Result},
proto::stream::Kind,
reader::decode::{get_rle_reader, timestamp::TimestampIterator},
stripe::Stripe,
};
use arrow::{array::ArrayRef, datatypes::TimestampNanosecondType};
use arrow::array::ArrayRef;
use arrow::datatypes::{
ArrowTimestampType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use chrono::offset::TimeZone;
use chrono_tz::Tz;
use snafu::ensure;

use super::{ArrayBatchDecoder, PrimitiveArrayDecoder};
use crate::error::UnsupportedTypeVariantSnafu;

/// Statically dispatches to the right ArrowTimestampType based on the value of $time_unit
/// to create a $decoder_type with that type as type parameter and $iter/$present as value
/// parameters, then applies $f to it and $tz.
///
/// $f has to be generic so it cannot be a closure.
macro_rules! decoder_for_time_unit {
($column: expr, $time_unit:expr, $seconds_since_unix_epoch:expr, $stripe:expr, $tz:expr, $f:expr,) => {{
let column = $column;
let stripe = $stripe;
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

match $time_unit {
TimeUnit::Second => {
let iter = Box::new(TimestampIterator::<TimestampSecondType>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampSecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Millisecond => {
let iter = Box::new(TimestampIterator::<TimestampMillisecondType>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampMillisecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Microsecond => {
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampMicrosecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Nanosecond => {
let iter = Box::new(TimestampIterator::<TimestampNanosecondType>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampNanosecondType>::new(iter, present),
$tz,
)))
}
}
}};
}

/// Seconds from ORC epoch of 1 January 2015, which serves as the 0
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;

/// Decodes a TIMESTAMP column stripe into batches of TimestampNanosecondArrays with no
/// timezone. Will convert timestamps from writer timezone to UTC if a writer timezone
/// Decodes a TIMESTAMP column stripe into batches of Timestamp{Nano,Micro,Milli,}secondArrays
/// with no timezone. Will convert timestamps from writer timezone to UTC if a writer timezone
/// is specified for the stripe.
pub fn new_timestamp_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
let ArrowDataType::Timestamp(time_unit, None) = field_type else {
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};

match stripe.writer_tz() {
Some(tz) => {
Some(writer_tz) => {
// If writer timezone exists then we must take the ORC epoch according
// to that timezone, and find seconds since UTC UNIX epoch for the base.
let seconds_since_unix_epoch = tz
let seconds_since_unix_epoch = writer_tz
.with_ymd_and_hms(2015, 1, 1, 0, 0, 0)
.unwrap()
.timestamp();
let iter = Box::new(TimestampIterator::new(

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz,
}
}
decoder_for_time_unit!(
column,
time_unit,
seconds_since_unix_epoch,
data,
secondary,
));
let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz: tz,
}))
stripe,
writer_tz,
f,
)
}
None => {
// No writer timezone, we can assume UTC, so we casn use known fixed value
// for the base offset.
let iter = Box::new(TimestampIterator::new(
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
}

decoder_for_time_unit!(
column,
time_unit,
// No writer timezone, we can assume UTC, so we can use known fixed value
// for the base offset.
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
data,
secondary,
));
let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(decoder))
stripe,
(),
f,
)
}
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of TimestampNanosecondArrays with
/// UTC timezone.
/// Decodes a TIMESTAMP_INSTANT column stripe into batches of
/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;
let ArrowDataType::Timestamp(time_unit, Some(tz)) = field_type else {
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
}

// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
let iter = Box::new(TimestampIterator::new(
decoder_for_time_unit!(
column,
time_unit,
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
data,
secondary,
));

let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(TimestampInstantArrayDecoder(decoder)))
stripe,
(),
f,
)
}

type RawTimestampArrayDecoder = PrimitiveArrayDecoder<TimestampNanosecondType>;

/// Wrapper around RawTimestampArrayDecoder to decode timestamps which are encoded in
/// Wrapper around PrimitiveArrayDecoder to decode timestamps which are encoded in
/// timezone of the writer to their UTC value.
struct TimestampOffsetArrayDecoder {
inner: RawTimestampArrayDecoder,
struct TimestampOffsetArrayDecoder<T: ArrowTimestampType> {
inner: PrimitiveArrayDecoder<T>,
writer_tz: chrono_tz::Tz,
}

impl ArrayBatchDecoder for TimestampOffsetArrayDecoder {
impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampOffsetArrayDecoder<T> {
fn next_batch(
&mut self,
batch_size: usize,
Expand All @@ -121,21 +223,21 @@ impl ArrayBatchDecoder for TimestampOffsetArrayDecoder {
};
let array = array
// first try to convert all non-nullable batches to non-nullable batches
.try_unary::<_, TimestampNanosecondType, _>(|ts| convert_timezone(ts).ok_or(()))
// in the rare case one of the values was out of the 1677-2262 range (see
.try_unary::<_, T, _>(|ts| convert_timezone(ts).ok_or(()))
// in the rare case one of the values was out of the timeunit's range (eg. see
// <https://docs.rs/chrono/latest/chrono/struct.DateTime.html#method.timestamp_nanos_opt>),
// try again by allowing a nullable batch as output
.unwrap_or_else(|()| array.unary_opt::<_, TimestampNanosecondType>(convert_timezone));
// for nanoseconds), try again by allowing a nullable batch as output
.unwrap_or_else(|()| array.unary_opt::<_, T>(convert_timezone));
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}

/// Wrapper around RawTimestampArrayDecoder to allow specifying the timezone of the output
/// Wrapper around PrimitiveArrayDecoder to allow specifying the timezone of the output
/// timestamp array as UTC.
struct TimestampInstantArrayDecoder(RawTimestampArrayDecoder);
struct TimestampInstantArrayDecoder<T: ArrowTimestampType>(PrimitiveArrayDecoder<T>);

impl ArrayBatchDecoder for TimestampInstantArrayDecoder {
impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampInstantArrayDecoder<T> {
fn next_batch(
&mut self,
batch_size: usize,
Expand Down
11 changes: 10 additions & 1 deletion src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct ArrowReaderBuilder<R> {
pub(crate) file_metadata: Arc<FileMetadata>,
pub(crate) batch_size: usize,
pub(crate) projection: ProjectionMask,
pub(crate) schema_ref: Option<SchemaRef>,
}

impl<R> ArrowReaderBuilder<R> {
Expand All @@ -29,6 +30,7 @@ impl<R> ArrowReaderBuilder<R> {
file_metadata,
batch_size: DEFAULT_BATCH_SIZE,
projection: ProjectionMask::all(),
schema_ref: None,
}
}

Expand All @@ -45,6 +47,11 @@ impl<R> ArrowReaderBuilder<R> {
self.projection = projection;
self
}

pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema_ref = Some(schema);
self
}
}

impl<R: ChunkReader> ArrowReaderBuilder<R> {
Expand All @@ -64,7 +71,9 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
projected_data_type,
stripe_index: 0,
};
let schema_ref = Arc::new(create_arrow_schema(&cursor));
let schema_ref = self
.schema_ref
.unwrap_or_else(|| Arc::new(create_arrow_schema(&cursor)));
ArrowReader {
cursor,
schema_ref,
Expand Down
Loading
Loading