Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for decoding Timestamp as Decimal128 #96

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,20 @@ The following table lists how ORC data types are read into Arrow data types:
| Binary | Binary | |
| Decimal | Decimal128 | |
| Date | Date32 | |
| Timestamp | Timestamp(Nanosecond, None) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Timestamp instant | Timestamp(Nanosecond, UTC) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Timestamp | Timestamp(Nanosecond, None) | ¹ |
| Timestamp instant | Timestamp(Nanosecond, UTC) | ¹ |
| Struct | Struct | |
| List | List | |
| Map | Map | |
| Union | Union(_, Sparse)* | |
| Union | Union(_, Sparse) | ² |

*Currently only supports a maximum of 127 variants
¹: `ArrowReaderBuilder::with_schema` allows configuring different time units or decoding to
`Decimal128(38, 9)` (i128 of non-leap nanoseconds since UNIX epoch).
Overflows may happen while decoding to a non-Seconds time unit, and results in `OrcError`.
Loss of precision may happen while decoding to a non-Nanosecond time unit, and results in `OrcError`.
`Decimal128(38, 9)` avoids both overflows and loss of precision.

²: Currently only supports a maximum of 127 variants

## Contributing

Expand Down
215 changes: 143 additions & 72 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ use crate::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::{
ArrowTimestampType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
ArrowTimestampType, Decimal128Type, DecimalType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use chrono::offset::TimeZone;
use chrono_tz::Tz;
use chrono::TimeDelta;
use chrono_tz::{Tz, UTC};
use snafu::ensure;

use super::{ArrayBatchDecoder, PrimitiveArrayDecoder};
use super::{ArrayBatchDecoder, DecimalArrayDecoder, PrimitiveArrayDecoder};
use crate::error::UnsupportedTypeVariantSnafu;

const NANOSECONDS_IN_SECOND: i128 = 1_000_000_000;
const NANOSECOND_DIGITS: i8 = 9;

/// Statically dispatches to the right ArrowTimestampType based on the value of $time_unit
/// to create a $decoder_type with that type as type parameter and $iter/$present as value
/// parameters, then applies $f to it and $tz.
Expand All @@ -40,7 +44,7 @@ macro_rules! decoder_for_time_unit {

match $time_unit {
TimeUnit::Second => {
let iter = Box::new(TimestampIterator::<TimestampSecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampSecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -51,7 +55,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Millisecond => {
let iter = Box::new(TimestampIterator::<TimestampMillisecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampMillisecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -62,7 +66,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Microsecond => {
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -73,7 +77,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Nanosecond => {
let iter = Box::new(TimestampIterator::<TimestampNanosecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampNanosecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -87,6 +91,56 @@ macro_rules! decoder_for_time_unit {
}};
}

fn decimal128_decoder(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
writer_tz: Option<Tz>,
) -> Result<DecimalArrayDecoder> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let iter = TimestampIterator::<TimestampNanosecondType, i128>::new(
seconds_since_unix_epoch,
data,
secondary,
);

let iter: Box<dyn Iterator<Item = _> + Send> = match writer_tz {
Some(UTC) => Box::new(iter), // Avoid overflow-able operations below
Some(writer_tz) => Box::new(iter.map(move |ts| {
let ts = ts?;
let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND);
let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND);

// The addition may panic, because chrono stores dates in an i32,
// which can be overflowed with an i64 of seconds.
let dt = (writer_tz.timestamp_nanos(0)
+ TimeDelta::new(seconds as i64, nanoseconds as u32)
.expect("TimeDelta duration out of bound"))
.naive_local()
.and_utc();

Ok((dt.timestamp() as i128) * NANOSECONDS_IN_SECOND
+ (dt.timestamp_subsec_nanos() as i128))
})),
None => Box::new(iter),
};

Ok(DecimalArrayDecoder::new(
Decimal128Type::MAX_PRECISION,
NANOSECOND_DIGITS,
iter,
present,
))
}

/// Seconds from ORC epoch of 1 January 2015, which serves as the 0
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;
Expand All @@ -99,100 +153,117 @@ pub fn new_timestamp_decoder(
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let ArrowDataType::Timestamp(time_unit, None) = field_type else {
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};

match stripe.writer_tz() {
let seconds_since_unix_epoch = match stripe.writer_tz() {
Some(writer_tz) => {
// If writer timezone exists then we must take the ORC epoch according
// to that timezone, and find seconds since UTC UNIX epoch for the base.
let seconds_since_unix_epoch = writer_tz
writer_tz
.with_ymd_and_hms(2015, 1, 1, 0, 0, 0)
.unwrap()
.timestamp();
.timestamp()
}
None => {
// No writer timezone, we can assume UTC, so we can use known fixed value
// for the base offset.
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH
}
};

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
match field_type {
ArrowDataType::Timestamp(time_unit, None) => match stripe.writer_tz() {
Some(writer_tz) => {
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz,
}
}
decoder_for_time_unit!(
column,
time_unit,
seconds_since_unix_epoch,
stripe,
writer_tz,
f,
)
}
None => {
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
}

decoder_for_time_unit!(column, time_unit, seconds_since_unix_epoch, stripe, (), f,)
}
decoder_for_time_unit!(
},
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
time_unit,
seconds_since_unix_epoch,
stripe,
writer_tz,
f,
)
seconds_since_unix_epoch,
stripe.writer_tz(),
)?))
}
None => {
_ => MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail(),
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of
/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
match field_type {
ArrowDataType::Timestamp(time_unit, Some(tz)) => {
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
}

decoder_for_time_unit!(
column,
time_unit,
// No writer timezone, we can assume UTC, so we can use known fixed value
// for the base offset.
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
stripe,
(),
f,
)
}
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of
/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let ArrowDataType::Timestamp(time_unit, Some(tz)) = field_type else {
MismatchedSchemaSnafu {
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
None,
)?))
}
_ => MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
.fail()?,
}

decoder_for_time_unit!(
column,
time_unit,
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
stripe,
(),
f,
)
}

/// Wrapper around PrimitiveArrayDecoder to decode timestamps which are encoded in
Expand Down
17 changes: 9 additions & 8 deletions src/reader/decode/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use crate::error::{DecodeTimestampSnafu, Result};

const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000;

pub struct TimestampIterator<T: ArrowTimestampType> {
pub struct TimestampIterator<T: ArrowTimestampType, Item: TryFrom<i128>> {
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<u64>> + Send>,
_marker: PhantomData<T>,
_marker: PhantomData<(T, Item)>,
}

impl<T: ArrowTimestampType> TimestampIterator<T> {
impl<T: ArrowTimestampType, Item: TryFrom<i128>> TimestampIterator<T, Item> {
pub fn new(
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
Expand All @@ -29,22 +29,23 @@ impl<T: ArrowTimestampType> TimestampIterator<T> {
}
}

impl<T: ArrowTimestampType> Iterator for TimestampIterator<T> {
type Item = Result<i64>;
impl<T: ArrowTimestampType, Item: TryFrom<i128>> Iterator for TimestampIterator<T, Item> {
type Item = Result<Item>;

fn next(&mut self) -> Option<Self::Item> {
// TODO: throw error for mismatched stream lengths?
let (seconds_since_orc_base, nanoseconds) =
self.data.by_ref().zip(self.secondary.by_ref()).next()?;
decode_timestamp::<T>(self.base_from_epoch, seconds_since_orc_base, nanoseconds).transpose()
decode_timestamp::<T, _>(self.base_from_epoch, seconds_since_orc_base, nanoseconds)
.transpose()
}
}

fn decode_timestamp<T: ArrowTimestampType>(
fn decode_timestamp<T: ArrowTimestampType, Ret: TryFrom<i128>>(
base: i64,
seconds_since_orc_base: Result<i64>,
nanoseconds: Result<u64>,
) -> Result<Option<i64>> {
) -> Result<Option<Ret>> {
let data = seconds_since_orc_base?;
let mut nanoseconds = nanoseconds?;
// Last 3 bits indicate how many trailing zeros were truncated
Expand Down
Loading
Loading