Skip to content

Commit

Permalink
Add support for configuring time units through ArrowReaderBuilder::wi…
Browse files Browse the repository at this point in the history
…th_schema
  • Loading branch information
progval committed Jun 5, 2024
1 parent 749128b commit bdbadb9
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 97 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ The following table lists how ORC data types are read into Arrow data types:
| Binary | Binary | |
| Decimal | Decimal128 | |
| Date | Date32 | |
| Timestamp | Timestamp(Nanosecond, None) | Timestamps before 1677-09-21 or after 2261-04-12 return `OrcError` because they cannot be represented as an i64 of nanoseconds |
| Timestamp instant | Timestamp(Nanosecond, UTC) | Timestamps before 1677-09-21 or after 2261-04-12 return `OrcError` because they cannot be represented as an i64 of nanoseconds |
| Timestamp | Timestamp(Nanosecond, None) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Timestamp instant | Timestamp(Nanosecond, UTC) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Struct | Struct | |
| List | List | |
| Map | Map | |
Expand Down
23 changes: 2 additions & 21 deletions src/array_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type, UInt64Type};
use arrow::datatypes::{DataType as ArrowDataType, Field};
use arrow::datatypes::{
Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef,
TimeUnit,
};
use arrow::record_batch::RecordBatch;
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -456,27 +455,9 @@ pub fn array_decoder_factory(
);
new_decimal_decoder(column, stripe, *precision, *scale)?
}
DataType::Timestamp { .. } => {
// TODO: add support for any precision
ensure!(
field_type == ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type
}
);
new_timestamp_decoder(column, stripe)?
}
DataType::Timestamp { .. } => new_timestamp_decoder(column, field_type, stripe)?,
DataType::TimestampWithLocalTimezone { .. } => {
// TODO: add support for any precision and for arbitrary timezones
ensure!(
field_type == ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type
}
);
new_timestamp_instant_decoder(column, stripe)?
new_timestamp_instant_decoder(column, field_type, stripe)?
}

DataType::Date { .. } => {
Expand Down
224 changes: 163 additions & 61 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
@@ -1,106 +1,208 @@
use std::sync::Arc;

use crate::{
array_decoder::ArrowDataType,
column::{get_present_vec, Column},
error::Result,
error::{MismatchedSchemaSnafu, Result},
proto::stream::Kind,
reader::decode::{get_rle_reader, timestamp::TimestampIterator},
stripe::Stripe,
};
use arrow::{array::ArrayRef, datatypes::TimestampNanosecondType};
use arrow::array::ArrayRef;
use arrow::datatypes::{
ArrowTimestampType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
};
use chrono::offset::TimeZone;
use chrono_tz::Tz;
use snafu::ensure;

use super::{ArrayBatchDecoder, PrimitiveArrayDecoder};
use crate::error::UnsupportedTypeVariantSnafu;

/// Statically dispatches to the right ArrowTimestampType based on the value of $time_unit
/// to create a $decoder_type with that type as type parameter and $iter/$present as value
/// parameters, then applies $f to it and $tz.
///
/// $f has to be generic so it cannot be a closure.
macro_rules! decoder_for_time_unit {
($column: expr, $time_unit:expr, $seconds_since_unix_epoch:expr, $stripe:expr, $tz:expr, $f:expr,) => {{
let column = $column;
let stripe = $stripe;
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

match $time_unit {
TimeUnit::Second => {
let iter = Box::new(TimestampIterator::<TimestampSecondType>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampSecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Millisecond => {
let iter = Box::new(TimestampIterator::<TimestampMillisecondType>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampMillisecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Microsecond => {
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampMicrosecondType>::new(iter, present),
$tz,
)))
}
TimeUnit::Nanosecond => {
let iter = Box::new(TimestampIterator::<TimestampNanosecondType>::new(
$seconds_since_unix_epoch,
data,
secondary,
));
Ok(Box::new(($f)(
PrimitiveArrayDecoder::<TimestampNanosecondType>::new(iter, present),
$tz,
)))
}
}
}};
}

/// Seconds from ORC epoch of 1 January 2015, which serves as the 0
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;

/// Decodes a TIMESTAMP column stripe into batches of TimestampNanosecondArrays with no
/// timezone. Will convert timestamps from writer timezone to UTC if a writer timezone
/// Decodes a TIMESTAMP column stripe into batches of Timestamp{Nano,Micro,Milli,}secondArrays
/// with no timezone. Will convert timestamps from writer timezone to UTC if a writer timezone
/// is specified for the stripe.
pub fn new_timestamp_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
let ArrowDataType::Timestamp(time_unit, None) = field_type else {
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};

match stripe.writer_tz() {
Some(tz) => {
Some(writer_tz) => {
// If writer timezone exists then we must take the ORC epoch according
// to that timezone, and find seconds since UTC UNIX epoch for the base.
let seconds_since_unix_epoch = tz
let seconds_since_unix_epoch = writer_tz
.with_ymd_and_hms(2015, 1, 1, 0, 0, 0)
.unwrap()
.timestamp();
let iter = Box::new(TimestampIterator::new(

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz,
}
}
decoder_for_time_unit!(
column,
time_unit,
seconds_since_unix_epoch,
data,
secondary,
));
let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz: tz,
}))
stripe,
writer_tz,
f,
)
}
None => {
// No writer timezone, we can assume UTC, so we casn use known fixed value
// for the base offset.
let iter = Box::new(TimestampIterator::new(
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
}

decoder_for_time_unit!(
column,
time_unit,
// No writer timezone, we can assume UTC, so we can use known fixed value
// for the base offset.
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
data,
secondary,
));
let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(decoder))
stripe,
(),
f,
)
}
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of TimestampNanosecondArrays with
/// UTC timezone.
/// Decodes a TIMESTAMP_INSTANT column stripe into batches of
/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;
let ArrowDataType::Timestamp(time_unit, Some(tz)) = field_type else {
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
}

// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
let iter = Box::new(TimestampIterator::new(
decoder_for_time_unit!(
column,
time_unit,
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
data,
secondary,
));

let decoder = RawTimestampArrayDecoder::new(iter, present);
Ok(Box::new(TimestampInstantArrayDecoder(decoder)))
stripe,
(),
f,
)
}

type RawTimestampArrayDecoder = PrimitiveArrayDecoder<TimestampNanosecondType>;

/// Wrapper around RawTimestampArrayDecoder to decode timestamps which are encoded in
/// Wrapper around PrimitiveArrayDecoder to decode timestamps which are encoded in
/// timezone of the writer to their UTC value.
struct TimestampOffsetArrayDecoder {
inner: RawTimestampArrayDecoder,
struct TimestampOffsetArrayDecoder<T: ArrowTimestampType> {
inner: PrimitiveArrayDecoder<T>,
writer_tz: chrono_tz::Tz,
}

impl ArrayBatchDecoder for TimestampOffsetArrayDecoder {
impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampOffsetArrayDecoder<T> {
fn next_batch(
&mut self,
batch_size: usize,
Expand All @@ -121,21 +223,21 @@ impl ArrayBatchDecoder for TimestampOffsetArrayDecoder {
};
let array = array
// first try to convert all non-nullable batches to non-nullable batches
.try_unary::<_, TimestampNanosecondType, _>(|ts| convert_timezone(ts).ok_or(()))
// in the rare case one of the values was out of the 1677-2262 range (see
.try_unary::<_, T, _>(|ts| convert_timezone(ts).ok_or(()))
// in the rare case one of the values was out of the timeunit's range (eg. see
// <https://docs.rs/chrono/latest/chrono/struct.DateTime.html#method.timestamp_nanos_opt>),
// try again by allowing a nullable batch as output
.unwrap_or_else(|()| array.unary_opt::<_, TimestampNanosecondType>(convert_timezone));
// for nanoseconds), try again by allowing a nullable batch as output
.unwrap_or_else(|()| array.unary_opt::<_, T>(convert_timezone));
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
}

/// Wrapper around RawTimestampArrayDecoder to allow specifying the timezone of the output
/// Wrapper around PrimitiveArrayDecoder to allow specifying the timezone of the output
/// timestamp array as UTC.
struct TimestampInstantArrayDecoder(RawTimestampArrayDecoder);
struct TimestampInstantArrayDecoder<T: ArrowTimestampType>(PrimitiveArrayDecoder<T>);

impl ArrayBatchDecoder for TimestampInstantArrayDecoder {
impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampInstantArrayDecoder<T> {
fn next_batch(
&mut self,
batch_size: usize,
Expand Down
11 changes: 10 additions & 1 deletion src/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct ArrowReaderBuilder<R> {
pub(crate) file_metadata: Arc<FileMetadata>,
pub(crate) batch_size: usize,
pub(crate) projection: ProjectionMask,
pub(crate) schema_ref: Option<SchemaRef>,
}

impl<R> ArrowReaderBuilder<R> {
Expand All @@ -29,6 +30,7 @@ impl<R> ArrowReaderBuilder<R> {
file_metadata,
batch_size: DEFAULT_BATCH_SIZE,
projection: ProjectionMask::all(),
schema_ref: None,
}
}

Expand All @@ -45,6 +47,11 @@ impl<R> ArrowReaderBuilder<R> {
self.projection = projection;
self
}

pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema_ref = Some(schema);
self
}
}

impl<R: ChunkReader> ArrowReaderBuilder<R> {
Expand All @@ -64,7 +71,9 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
projected_data_type,
stripe_index: 0,
};
let schema_ref = Arc::new(create_arrow_schema(&cursor));
let schema_ref = self
.schema_ref
.unwrap_or_else(|| Arc::new(create_arrow_schema(&cursor)));
ArrowReader {
cursor,
schema_ref,
Expand Down
Loading

0 comments on commit bdbadb9

Please sign in to comment.