diff --git a/README.md b/README.md index bce2ddd4..6532c32d 100644 --- a/README.md +++ b/README.md @@ -68,14 +68,20 @@ The following table lists how ORC data types are read into Arrow data types: | Binary | Binary | | | Decimal | Decimal128 | | | Date | Date32 | | -| Timestamp | Timestamp(Nanosecond, None) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` | -| Timestamp instant | Timestamp(Nanosecond, UTC) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` | +| Timestamp | Timestamp(Nanosecond, None) | ¹ | +| Timestamp instant | Timestamp(Nanosecond, UTC) | ¹ | | Struct | Struct | | | List | List | | | Map | Map | | -| Union | Union(_, Sparse)* | | +| Union | Union(_, Sparse) | ² | -*Currently only supports a maximum of 127 variants +¹: `ArrowReaderBuilder::with_schema` allows configuring different time units or decoding to +`Decimal128(38, 9)` (i128 of non-leap nanoseconds since UNIX epoch). +Overflows may happen while decoding to a non-Seconds time unit, and results in `OrcError`. +Loss of precision may happen while decoding to a non-Nanosecond time unit, and results in `OrcError`. +`Decimal128(38, 9)` avoids both overflows and loss of precision. + +²: Currently only supports a maximum of 127 variants ## Contributing diff --git a/src/array_decoder/timestamp.rs b/src/array_decoder/timestamp.rs index 6922eea1..bc39c04a 100644 --- a/src/array_decoder/timestamp.rs +++ b/src/array_decoder/timestamp.rs @@ -10,16 +10,20 @@ use crate::{ }; use arrow::array::ArrayRef; use arrow::datatypes::{ - ArrowTimestampType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, + ArrowTimestampType, Decimal128Type, DecimalType, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; use chrono::offset::TimeZone; -use chrono_tz::Tz; +use chrono::TimeDelta; +use chrono_tz::{Tz, UTC}; use snafu::ensure; -use super::{ArrayBatchDecoder, PrimitiveArrayDecoder}; +use super::{ArrayBatchDecoder, DecimalArrayDecoder, PrimitiveArrayDecoder}; use crate::error::UnsupportedTypeVariantSnafu; +const NANOSECONDS_IN_SECOND: i128 = 1_000_000_000; +const NANOSECOND_DIGITS: i8 = 9; + /// Statically dispatches to the right ArrowTimestampType based on the value of $time_unit /// to create a $decoder_type with that type as type parameter and $iter/$present as value /// parameters, then applies $f to it and $tz. @@ -40,7 +44,7 @@ macro_rules! decoder_for_time_unit { match $time_unit { TimeUnit::Second => { - let iter = Box::new(TimestampIterator::::new( + let iter = Box::new(TimestampIterator::::new( $seconds_since_unix_epoch, data, secondary, @@ -51,7 +55,7 @@ macro_rules! decoder_for_time_unit { ))) } TimeUnit::Millisecond => { - let iter = Box::new(TimestampIterator::::new( + let iter = Box::new(TimestampIterator::::new( $seconds_since_unix_epoch, data, secondary, @@ -62,7 +66,7 @@ macro_rules! decoder_for_time_unit { ))) } TimeUnit::Microsecond => { - let iter = Box::new(TimestampIterator::::new( + let iter = Box::new(TimestampIterator::::new( $seconds_since_unix_epoch, data, secondary, @@ -73,7 +77,7 @@ macro_rules! decoder_for_time_unit { ))) } TimeUnit::Nanosecond => { - let iter = Box::new(TimestampIterator::::new( + let iter = Box::new(TimestampIterator::::new( $seconds_since_unix_epoch, data, secondary, @@ -87,6 +91,56 @@ macro_rules! decoder_for_time_unit { }}; } +fn decimal128_decoder( + column: &Column, + stripe: &Stripe, + seconds_since_unix_epoch: i64, + writer_tz: Option, +) -> Result { + let data = stripe.stream_map().get(column, Kind::Data); + let data = get_rle_reader(column, data)?; + + let secondary = stripe.stream_map().get(column, Kind::Secondary); + let secondary = get_rle_reader(column, secondary)?; + + let present = get_present_vec(column, stripe)? + .map(|iter| Box::new(iter.into_iter()) as Box + Send>); + + let iter = TimestampIterator::::new( + seconds_since_unix_epoch, + data, + secondary, + ); + + let iter: Box + Send> = match writer_tz { + Some(UTC) => Box::new(iter), // Avoid overflow-able operations below + Some(writer_tz) => Box::new(iter.map(move |ts| { + let ts = ts?; + let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND); + let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND); + + // The addition may panic, because chrono stores dates in an i32, + // which can be overflowed with an i64 of seconds. + let dt = (writer_tz.timestamp_nanos(0) + + TimeDelta::new(seconds as i64, nanoseconds as u32) + .expect("TimeDelta duration out of bound")) + .naive_local() + .and_utc(); + + Ok((dt.timestamp() as i128) * NANOSECONDS_IN_SECOND + + (dt.timestamp_subsec_nanos() as i128)) + })), + None => Box::new(iter), + }; + + Ok(DecimalArrayDecoder::new( + Decimal128Type::MAX_PRECISION, + NANOSECOND_DIGITS, + iter, + present, + )) +} + /// Seconds from ORC epoch of 1 January 2015, which serves as the 0 /// point for all timestamp values, to the UNIX epoch of 1 January 1970. const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400; @@ -99,100 +153,117 @@ pub fn new_timestamp_decoder( field_type: ArrowDataType, stripe: &Stripe, ) -> Result> { - let ArrowDataType::Timestamp(time_unit, None) = field_type else { - MismatchedSchemaSnafu { - orc_type: column.data_type().clone(), - arrow_type: field_type, - } - .fail()? - }; - - match stripe.writer_tz() { + let seconds_since_unix_epoch = match stripe.writer_tz() { Some(writer_tz) => { // If writer timezone exists then we must take the ORC epoch according // to that timezone, and find seconds since UTC UNIX epoch for the base. - let seconds_since_unix_epoch = writer_tz + writer_tz .with_ymd_and_hms(2015, 1, 1, 0, 0, 0) .unwrap() - .timestamp(); + .timestamp() + } + None => { + // No writer timezone, we can assume UTC, so we can use known fixed value + // for the base offset. + ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH + } + }; - fn f( - decoder: PrimitiveArrayDecoder, - writer_tz: Tz, - ) -> TimestampOffsetArrayDecoder { - TimestampOffsetArrayDecoder { - inner: decoder, + match field_type { + ArrowDataType::Timestamp(time_unit, None) => match stripe.writer_tz() { + Some(writer_tz) => { + fn f( + decoder: PrimitiveArrayDecoder, + writer_tz: Tz, + ) -> TimestampOffsetArrayDecoder { + TimestampOffsetArrayDecoder { + inner: decoder, + writer_tz, + } + } + decoder_for_time_unit!( + column, + time_unit, + seconds_since_unix_epoch, + stripe, writer_tz, + f, + ) + } + None => { + fn f( + decoder: PrimitiveArrayDecoder, + _writer_tz: (), + ) -> PrimitiveArrayDecoder { + decoder } + + decoder_for_time_unit!(column, time_unit, seconds_since_unix_epoch, stripe, (), f,) } - decoder_for_time_unit!( + }, + ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => { + Ok(Box::new(decimal128_decoder( column, - time_unit, - seconds_since_unix_epoch, stripe, - writer_tz, - f, - ) + seconds_since_unix_epoch, + stripe.writer_tz(), + )?)) } - None => { + _ => MismatchedSchemaSnafu { + orc_type: column.data_type().clone(), + arrow_type: field_type, + } + .fail(), + } +} + +/// Decodes a TIMESTAMP_INSTANT column stripe into batches of +/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone. +pub fn new_timestamp_instant_decoder( + column: &Column, + field_type: ArrowDataType, + stripe: &Stripe, +) -> Result> { + match field_type { + ArrowDataType::Timestamp(time_unit, Some(tz)) => { + ensure!( + tz.as_ref() == "UTC", + UnsupportedTypeVariantSnafu { + msg: "Non-UTC Arrow timestamps" + } + ); + fn f( decoder: PrimitiveArrayDecoder, _writer_tz: (), - ) -> PrimitiveArrayDecoder { - decoder + ) -> TimestampInstantArrayDecoder { + TimestampInstantArrayDecoder(decoder) } decoder_for_time_unit!( column, time_unit, - // No writer timezone, we can assume UTC, so we can use known fixed value - // for the base offset. + // TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH, stripe, (), f, ) } - } -} - -/// Decodes a TIMESTAMP_INSTANT column stripe into batches of -/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone. -pub fn new_timestamp_instant_decoder( - column: &Column, - field_type: ArrowDataType, - stripe: &Stripe, -) -> Result> { - let ArrowDataType::Timestamp(time_unit, Some(tz)) = field_type else { - MismatchedSchemaSnafu { + ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => { + Ok(Box::new(decimal128_decoder( + column, + stripe, + ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH, + None, + )?)) + } + _ => MismatchedSchemaSnafu { orc_type: column.data_type().clone(), arrow_type: field_type, } - .fail()? - }; - ensure!( - tz.as_ref() == "UTC", - UnsupportedTypeVariantSnafu { - msg: "Non-UTC Arrow timestamps" - } - ); - - fn f( - decoder: PrimitiveArrayDecoder, - _writer_tz: (), - ) -> TimestampInstantArrayDecoder { - TimestampInstantArrayDecoder(decoder) + .fail()?, } - - decoder_for_time_unit!( - column, - time_unit, - // TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe - ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH, - stripe, - (), - f, - ) } /// Wrapper around PrimitiveArrayDecoder to decode timestamps which are encoded in diff --git a/src/reader/decode/timestamp.rs b/src/reader/decode/timestamp.rs index 5f846dd8..834d0166 100644 --- a/src/reader/decode/timestamp.rs +++ b/src/reader/decode/timestamp.rs @@ -7,14 +7,14 @@ use crate::error::{DecodeTimestampSnafu, Result}; const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000; -pub struct TimestampIterator { +pub struct TimestampIterator> { base_from_epoch: i64, data: Box> + Send>, secondary: Box> + Send>, - _marker: PhantomData, + _marker: PhantomData<(T, Item)>, } -impl TimestampIterator { +impl> TimestampIterator { pub fn new( base_from_epoch: i64, data: Box> + Send>, @@ -29,22 +29,23 @@ impl TimestampIterator { } } -impl Iterator for TimestampIterator { - type Item = Result; +impl> Iterator for TimestampIterator { + type Item = Result; fn next(&mut self) -> Option { // TODO: throw error for mismatched stream lengths? let (seconds_since_orc_base, nanoseconds) = self.data.by_ref().zip(self.secondary.by_ref()).next()?; - decode_timestamp::(self.base_from_epoch, seconds_since_orc_base, nanoseconds).transpose() + decode_timestamp::(self.base_from_epoch, seconds_since_orc_base, nanoseconds) + .transpose() } } -fn decode_timestamp( +fn decode_timestamp>( base: i64, seconds_since_orc_base: Result, nanoseconds: Result, -) -> Result> { +) -> Result> { let data = seconds_since_orc_base?; let mut nanoseconds = nanoseconds?; // Last 3 bits indicate how many trailing zeros were truncated diff --git a/tests/basic/main.rs b/tests/basic/main.rs index 402bfc47..9a541f7b 100644 --- a/tests/basic/main.rs +++ b/tests/basic/main.rs @@ -1,7 +1,7 @@ use std::fs::File; use std::sync::Arc; -use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use arrow::datatypes::{DataType, Decimal128Type, DecimalType, Field, Schema, TimeUnit}; use arrow::record_batch::{RecordBatch, RecordBatchReader}; use arrow::util::pretty; #[cfg(feature = "async")] @@ -457,6 +457,35 @@ fn custom_precision_timestamps_test(time_unit: TimeUnit) { assert_batches_eq(&batch, &expected); } +#[test] +pub fn decimal128_timestamps_test() { + let path = basic_path("overflowing_timestamps.orc"); + let f = File::open(path).expect("no file found"); + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_schema(Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new( + "ts", + DataType::Decimal128(Decimal128Type::MAX_PRECISION, 9), + true, + ), + ]))) + .build(); + let batch = reader.collect::, _>>().unwrap(); + println!("{:?}", batch[0].column(1)); + let expected = [ + "+----+------------------------+", + "| id | ts |", + "+----+------------------------+", + "| 1 | 12345678.000000000 |", + "| 2 | -62135596800.000000000 |", + "| 3 | 12345678.000000000 |", + "+----+------------------------+", + ]; + assert_batches_eq(&batch, &expected); +} + // From https://github.com/apache/arrow-rs/blob/7705acad845e8b2a366a08640f7acb4033ed7049/arrow-flight/src/sql/metadata/mod.rs#L67-L75 pub fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { let formatted = pretty::pretty_format_batches(batches).unwrap().to_string();