Skip to content

Commit

Permalink
Add support for decoding Timestamp as Decimal128
Browse files Browse the repository at this point in the history
This allows support for the full range of ORC timestamp, with full precision.
  • Loading branch information
progval committed Jun 19, 2024
1 parent 16b5704 commit 90c7ee3
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 84 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,20 @@ The following table lists how ORC data types are read into Arrow data types:
| Binary | Binary | |
| Decimal | Decimal128 | |
| Date | Date32 | |
| Timestamp | Timestamp(Nanosecond, None) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Timestamp instant | Timestamp(Nanosecond, UTC) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` |
| Timestamp | Timestamp(Nanosecond, None) | ¹ |
| Timestamp instant | Timestamp(Nanosecond, UTC) | ¹ |
| Struct | Struct | |
| List | List | |
| Map | Map | |
| Union | Union(_, Sparse)* | |
| Union | Union(_, Sparse) | ² |

*Currently only supports a maximum of 127 variants
¹: `ArrowReaderBuilder::with_schema` allows configuring different time units or decoding to
`Decimal128(38, 9)` (i128 of non-leap nanoseconds since UNIX epoch).
Overflows may happen while decoding to a non-Seconds time unit, and results in `OrcError`.
Loss of precision may happen while deconding to a non-Nanosecond time unit, and results in `OrcError`.

Check warning on line 81 in README.md

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"deconding" should be "decoding".
`Decimal128(38, 9)` avoids both overflows and loss of precision.

²: Currently only supports a maximum of 127 variants

## Contributing

Expand Down
212 changes: 141 additions & 71 deletions src/array_decoder/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ use crate::{
};
use arrow::array::ArrayRef;
use arrow::datatypes::{
ArrowTimestampType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
TimestampNanosecondType, TimestampSecondType,
ArrowTimestampType, Decimal128Type, DecimalType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use chrono::offset::TimeZone;
use chrono::TimeDelta;
use chrono_tz::Tz;
use snafu::ensure;

use super::{ArrayBatchDecoder, PrimitiveArrayDecoder};
use super::{ArrayBatchDecoder, DecimalArrayDecoder, PrimitiveArrayDecoder};
use crate::error::UnsupportedTypeVariantSnafu;

const NANOSECONDS_IN_SECOND: i128 = 1_000_000_000;
const NANOSECOND_DIGITS: i8 = 9;

/// Statically dispatches to the right ArrowTimestampType based on the value of $time_unit
/// to create a $decoder_type with that type as type parameter and $iter/$present as value
/// parameters, then applies $f to it and $tz.
Expand All @@ -40,7 +44,7 @@ macro_rules! decoder_for_time_unit {

match $time_unit {
TimeUnit::Second => {
let iter = Box::new(TimestampIterator::<TimestampSecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampSecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -51,7 +55,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Millisecond => {
let iter = Box::new(TimestampIterator::<TimestampMillisecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampMillisecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -62,7 +66,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Microsecond => {
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampMicrosecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -73,7 +77,7 @@ macro_rules! decoder_for_time_unit {
)))
}
TimeUnit::Nanosecond => {
let iter = Box::new(TimestampIterator::<TimestampNanosecondType>::new(
let iter = Box::new(TimestampIterator::<TimestampNanosecondType, _>::new(
$seconds_since_unix_epoch,
data,
secondary,
Expand All @@ -87,6 +91,54 @@ macro_rules! decoder_for_time_unit {
}};
}

fn decimal128_decoder(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
writer_tz: Option<Tz>,
) -> Result<DecimalArrayDecoder> {
println!("col {:?}", column);
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_rle_reader(column, data)?;

let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_rle_reader(column, secondary)?;

let present = get_present_vec(column, stripe)?
.map(|iter| Box::new(iter.into_iter()) as Box<dyn Iterator<Item = bool> + Send>);

let iter = TimestampIterator::<TimestampNanosecondType, i128>::new(
seconds_since_unix_epoch,
data,
secondary,
);

let iter: Box<dyn Iterator<Item = _> + Send> = match writer_tz {
Some(writer_tz) => Box::new(iter.map(move |ts| {
let ts = ts?;
let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND);
let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND);
println!("{} ns = {} s + {} ns", ts, seconds, nanoseconds);
let dt = (writer_tz.timestamp_nanos(0)
+ TimeDelta::new(seconds as i64, nanoseconds as u32)
.expect("TimeDelta duration out of bound"))
.naive_local()
.and_utc();

Ok((dt.timestamp() as i128) * NANOSECONDS_IN_SECOND
+ (dt.timestamp_subsec_nanos() as i128))
})),
None => Box::new(iter),
};

Ok(DecimalArrayDecoder::new(
Decimal128Type::MAX_PRECISION,
NANOSECOND_DIGITS,
iter,
present,
))
}

/// Seconds from ORC epoch of 1 January 2015, which serves as the 0
/// point for all timestamp values, to the UNIX epoch of 1 January 1970.
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;
Expand All @@ -99,100 +151,118 @@ pub fn new_timestamp_decoder(
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let ArrowDataType::Timestamp(time_unit, None) = field_type else {
MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};

match stripe.writer_tz() {
let seconds_since_unix_epoch = match stripe.writer_tz() {
Some(writer_tz) => {
// If writer timezone exists then we must take the ORC epoch according
// to that timezone, and find seconds since UTC UNIX epoch for the base.
let seconds_since_unix_epoch = writer_tz
writer_tz
.with_ymd_and_hms(2015, 1, 1, 0, 0, 0)
.unwrap()
.timestamp();
.timestamp()
}
None => {
// No writer timezone, we can assume UTC, so we can use known fixed value
// for the base offset.
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH
}
};

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
match field_type {
ArrowDataType::Timestamp(time_unit, None) => match stripe.writer_tz() {
Some(writer_tz) => {
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
writer_tz: Tz,
) -> TimestampOffsetArrayDecoder<T> {
TimestampOffsetArrayDecoder {
inner: decoder,
writer_tz,
}
}
decoder_for_time_unit!(
column,
time_unit,
seconds_since_unix_epoch,
stripe,
writer_tz,
f,
)
}
None => {
fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
}

decoder_for_time_unit!(column, time_unit, seconds_since_unix_epoch, stripe, (), f,)
}
decoder_for_time_unit!(
},
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
// TODO: add support for non-None writer TZ
Ok(Box::new(decimal128_decoder(
column,
time_unit,
seconds_since_unix_epoch,
stripe,
writer_tz,
f,
)
seconds_since_unix_epoch,
stripe.writer_tz(),
)?))
}
None => {
_ => MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail(),
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of
/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
match field_type {
ArrowDataType::Timestamp(time_unit, Some(tz)) => {
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> PrimitiveArrayDecoder<T> {
decoder
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
}

decoder_for_time_unit!(
column,
time_unit,
// No writer timezone, we can assume UTC, so we can use known fixed value
// for the base offset.
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
stripe,
(),
f,
)
}
}
}

/// Decodes a TIMESTAMP_INSTANT column stripe into batches of
/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone.
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let ArrowDataType::Timestamp(time_unit, Some(tz)) = field_type else {
MismatchedSchemaSnafu {
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
None,
)?))
}
_ => MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?
};
ensure!(
tz.as_ref() == "UTC",
UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps"
}
);

fn f<T: ArrowTimestampType>(
decoder: PrimitiveArrayDecoder<T>,
_writer_tz: (),
) -> TimestampInstantArrayDecoder<T> {
TimestampInstantArrayDecoder(decoder)
.fail()?,
}

decoder_for_time_unit!(
column,
time_unit,
// TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
stripe,
(),
f,
)
}

/// Wrapper around PrimitiveArrayDecoder to decode timestamps which are encoded in
Expand Down
17 changes: 9 additions & 8 deletions src/reader/decode/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use crate::error::{DecodeTimestampSnafu, Result};

const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000;

pub struct TimestampIterator<T: ArrowTimestampType> {
pub struct TimestampIterator<T: ArrowTimestampType, Item: TryFrom<i128>> {
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
secondary: Box<dyn Iterator<Item = Result<u64>> + Send>,
_marker: PhantomData<T>,
_marker: PhantomData<(T, Item)>,
}

impl<T: ArrowTimestampType> TimestampIterator<T> {
impl<T: ArrowTimestampType, Item: TryFrom<i128>> TimestampIterator<T, Item> {
pub fn new(
base_from_epoch: i64,
data: Box<dyn Iterator<Item = Result<i64>> + Send>,
Expand All @@ -29,22 +29,23 @@ impl<T: ArrowTimestampType> TimestampIterator<T> {
}
}

impl<T: ArrowTimestampType> Iterator for TimestampIterator<T> {
type Item = Result<i64>;
impl<T: ArrowTimestampType, Item: TryFrom<i128>> Iterator for TimestampIterator<T, Item> {
type Item = Result<Item>;

fn next(&mut self) -> Option<Self::Item> {
// TODO: throw error for mismatched stream lengths?
let (seconds_since_orc_base, nanoseconds) =
self.data.by_ref().zip(self.secondary.by_ref()).next()?;
decode_timestamp::<T>(self.base_from_epoch, seconds_since_orc_base, nanoseconds).transpose()
decode_timestamp::<T, _>(self.base_from_epoch, seconds_since_orc_base, nanoseconds)
.transpose()
}
}

fn decode_timestamp<T: ArrowTimestampType>(
fn decode_timestamp<T: ArrowTimestampType, Ret: TryFrom<i128>>(
base: i64,
seconds_since_orc_base: Result<i64>,
nanoseconds: Result<u64>,
) -> Result<Option<i64>> {
) -> Result<Option<Ret>> {
let data = seconds_since_orc_base?;
let mut nanoseconds = nanoseconds?;
// Last 3 bits indicate how many trailing zeros were truncated
Expand Down
Loading

0 comments on commit 90c7ee3

Please sign in to comment.