diff --git a/README.md b/README.md index c356e9dd..bce2ddd4 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,8 @@ The following table lists how ORC data types are read into Arrow data types: | Binary | Binary | | | Decimal | Decimal128 | | | Date | Date32 | | -| Timestamp | Timestamp(Nanosecond, None) | Timestamps before 1677-09-21 or after 2261-04-12 return `OrcError` because they cannot be represented as an i64 of nanoseconds | -| Timestamp instant | Timestamp(Nanosecond, UTC) | Timestamps before 1677-09-21 or after 2261-04-12 return `OrcError` because they cannot be represented as an i64 of nanoseconds | +| Timestamp | Timestamp(Nanosecond, None) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` | +| Timestamp instant | Timestamp(Nanosecond, UTC) | `ArrowReaderBuilder::with_schema` allows configuring different time units. Overflows or loss of precision while decoding results in `OrcError` | | Struct | Struct | | | List | List | | | Map | Map | | diff --git a/src/array_decoder/mod.rs b/src/array_decoder/mod.rs index 1ef5c9f2..755e920c 100644 --- a/src/array_decoder/mod.rs +++ b/src/array_decoder/mod.rs @@ -6,7 +6,6 @@ use arrow::datatypes::{ArrowPrimitiveType, Decimal128Type, UInt64Type}; use arrow::datatypes::{DataType as ArrowDataType, Field}; use arrow::datatypes::{ Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, SchemaRef, - TimeUnit, }; use arrow::record_batch::RecordBatch; use snafu::{ensure, ResultExt}; @@ -456,27 +455,9 @@ pub fn array_decoder_factory( ); new_decimal_decoder(column, stripe, *precision, *scale)? } - DataType::Timestamp { .. } => { - // TODO: add support for any precision - ensure!( - field_type == ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - MismatchedSchemaSnafu { - orc_type: column.data_type().clone(), - arrow_type: field_type - } - ); - new_timestamp_decoder(column, stripe)? - } + DataType::Timestamp { .. } => new_timestamp_decoder(column, field_type, stripe)?, DataType::TimestampWithLocalTimezone { .. } => { - // TODO: add support for any precision and for arbitrary timezones - ensure!( - field_type == ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())), - MismatchedSchemaSnafu { - orc_type: column.data_type().clone(), - arrow_type: field_type - } - ); - new_timestamp_instant_decoder(column, stripe)? + new_timestamp_instant_decoder(column, field_type, stripe)? } DataType::Date { .. } => { diff --git a/src/array_decoder/timestamp.rs b/src/array_decoder/timestamp.rs index 38e4c0b6..6922eea1 100644 --- a/src/array_decoder/timestamp.rs +++ b/src/array_decoder/timestamp.rs @@ -1,106 +1,208 @@ use std::sync::Arc; use crate::{ + array_decoder::ArrowDataType, column::{get_present_vec, Column}, - error::Result, + error::{MismatchedSchemaSnafu, Result}, proto::stream::Kind, reader::decode::{get_rle_reader, timestamp::TimestampIterator}, stripe::Stripe, }; -use arrow::{array::ArrayRef, datatypes::TimestampNanosecondType}; +use arrow::array::ArrayRef; +use arrow::datatypes::{ + ArrowTimestampType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, +}; use chrono::offset::TimeZone; +use chrono_tz::Tz; +use snafu::ensure; use super::{ArrayBatchDecoder, PrimitiveArrayDecoder}; +use crate::error::UnsupportedTypeVariantSnafu; + +/// Statically dispatches to the right ArrowTimestampType based on the value of $time_unit +/// to create a $decoder_type with that type as type parameter and $iter/$present as value +/// parameters, then applies $f to it and $tz. +/// +/// $f has to be generic so it cannot be a closure. +macro_rules! decoder_for_time_unit { + ($column: expr, $time_unit:expr, $seconds_since_unix_epoch:expr, $stripe:expr, $tz:expr, $f:expr,) => {{ + let column = $column; + let stripe = $stripe; + let data = stripe.stream_map().get(column, Kind::Data); + let data = get_rle_reader(column, data)?; + + let secondary = stripe.stream_map().get(column, Kind::Secondary); + let secondary = get_rle_reader(column, secondary)?; + + let present = get_present_vec(column, stripe)? + .map(|iter| Box::new(iter.into_iter()) as Box + Send>); + + match $time_unit { + TimeUnit::Second => { + let iter = Box::new(TimestampIterator::::new( + $seconds_since_unix_epoch, + data, + secondary, + )); + Ok(Box::new(($f)( + PrimitiveArrayDecoder::::new(iter, present), + $tz, + ))) + } + TimeUnit::Millisecond => { + let iter = Box::new(TimestampIterator::::new( + $seconds_since_unix_epoch, + data, + secondary, + )); + Ok(Box::new(($f)( + PrimitiveArrayDecoder::::new(iter, present), + $tz, + ))) + } + TimeUnit::Microsecond => { + let iter = Box::new(TimestampIterator::::new( + $seconds_since_unix_epoch, + data, + secondary, + )); + Ok(Box::new(($f)( + PrimitiveArrayDecoder::::new(iter, present), + $tz, + ))) + } + TimeUnit::Nanosecond => { + let iter = Box::new(TimestampIterator::::new( + $seconds_since_unix_epoch, + data, + secondary, + )); + Ok(Box::new(($f)( + PrimitiveArrayDecoder::::new(iter, present), + $tz, + ))) + } + } + }}; +} /// Seconds from ORC epoch of 1 January 2015, which serves as the 0 /// point for all timestamp values, to the UNIX epoch of 1 January 1970. const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400; -/// Decodes a TIMESTAMP column stripe into batches of TimestampNanosecondArrays with no -/// timezone. Will convert timestamps from writer timezone to UTC if a writer timezone +/// Decodes a TIMESTAMP column stripe into batches of Timestamp{Nano,Micro,Milli,}secondArrays +/// with no timezone. Will convert timestamps from writer timezone to UTC if a writer timezone /// is specified for the stripe. pub fn new_timestamp_decoder( column: &Column, + field_type: ArrowDataType, stripe: &Stripe, ) -> Result> { - let data = stripe.stream_map().get(column, Kind::Data); - let data = get_rle_reader(column, data)?; - - let secondary = stripe.stream_map().get(column, Kind::Secondary); - let secondary = get_rle_reader(column, secondary)?; - - let present = get_present_vec(column, stripe)? - .map(|iter| Box::new(iter.into_iter()) as Box + Send>); + let ArrowDataType::Timestamp(time_unit, None) = field_type else { + MismatchedSchemaSnafu { + orc_type: column.data_type().clone(), + arrow_type: field_type, + } + .fail()? + }; match stripe.writer_tz() { - Some(tz) => { + Some(writer_tz) => { // If writer timezone exists then we must take the ORC epoch according // to that timezone, and find seconds since UTC UNIX epoch for the base. - let seconds_since_unix_epoch = tz + let seconds_since_unix_epoch = writer_tz .with_ymd_and_hms(2015, 1, 1, 0, 0, 0) .unwrap() .timestamp(); - let iter = Box::new(TimestampIterator::new( + + fn f( + decoder: PrimitiveArrayDecoder, + writer_tz: Tz, + ) -> TimestampOffsetArrayDecoder { + TimestampOffsetArrayDecoder { + inner: decoder, + writer_tz, + } + } + decoder_for_time_unit!( + column, + time_unit, seconds_since_unix_epoch, - data, - secondary, - )); - let decoder = RawTimestampArrayDecoder::new(iter, present); - Ok(Box::new(TimestampOffsetArrayDecoder { - inner: decoder, - writer_tz: tz, - })) + stripe, + writer_tz, + f, + ) } None => { - // No writer timezone, we can assume UTC, so we casn use known fixed value - // for the base offset. - let iter = Box::new(TimestampIterator::new( + fn f( + decoder: PrimitiveArrayDecoder, + _writer_tz: (), + ) -> PrimitiveArrayDecoder { + decoder + } + + decoder_for_time_unit!( + column, + time_unit, + // No writer timezone, we can assume UTC, so we can use known fixed value + // for the base offset. ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH, - data, - secondary, - )); - let decoder = RawTimestampArrayDecoder::new(iter, present); - Ok(Box::new(decoder)) + stripe, + (), + f, + ) } } } -/// Decodes a TIMESTAMP_INSTANT column stripe into batches of TimestampNanosecondArrays with -/// UTC timezone. +/// Decodes a TIMESTAMP_INSTANT column stripe into batches of +/// Timestamp{Nano,Micro,Milli,}secondArrays with UTC timezone. pub fn new_timestamp_instant_decoder( column: &Column, + field_type: ArrowDataType, stripe: &Stripe, ) -> Result> { - let data = stripe.stream_map().get(column, Kind::Data); - let data = get_rle_reader(column, data)?; - - let secondary = stripe.stream_map().get(column, Kind::Secondary); - let secondary = get_rle_reader(column, secondary)?; + let ArrowDataType::Timestamp(time_unit, Some(tz)) = field_type else { + MismatchedSchemaSnafu { + orc_type: column.data_type().clone(), + arrow_type: field_type, + } + .fail()? + }; + ensure!( + tz.as_ref() == "UTC", + UnsupportedTypeVariantSnafu { + msg: "Non-UTC Arrow timestamps" + } + ); - let present = get_present_vec(column, stripe)? - .map(|iter| Box::new(iter.into_iter()) as Box + Send>); + fn f( + decoder: PrimitiveArrayDecoder, + _writer_tz: (), + ) -> TimestampInstantArrayDecoder { + TimestampInstantArrayDecoder(decoder) + } - // TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe - let iter = Box::new(TimestampIterator::new( + decoder_for_time_unit!( + column, + time_unit, + // TIMESTAMP_INSTANT is encoded as UTC so we don't check writer timezone in stripe ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH, - data, - secondary, - )); - - let decoder = RawTimestampArrayDecoder::new(iter, present); - Ok(Box::new(TimestampInstantArrayDecoder(decoder))) + stripe, + (), + f, + ) } -type RawTimestampArrayDecoder = PrimitiveArrayDecoder; - -/// Wrapper around RawTimestampArrayDecoder to decode timestamps which are encoded in +/// Wrapper around PrimitiveArrayDecoder to decode timestamps which are encoded in /// timezone of the writer to their UTC value. -struct TimestampOffsetArrayDecoder { - inner: RawTimestampArrayDecoder, +struct TimestampOffsetArrayDecoder { + inner: PrimitiveArrayDecoder, writer_tz: chrono_tz::Tz, } -impl ArrayBatchDecoder for TimestampOffsetArrayDecoder { +impl ArrayBatchDecoder for TimestampOffsetArrayDecoder { fn next_batch( &mut self, batch_size: usize, @@ -121,21 +223,21 @@ impl ArrayBatchDecoder for TimestampOffsetArrayDecoder { }; let array = array // first try to convert all non-nullable batches to non-nullable batches - .try_unary::<_, TimestampNanosecondType, _>(|ts| convert_timezone(ts).ok_or(())) - // in the rare case one of the values was out of the 1677-2262 range (see + .try_unary::<_, T, _>(|ts| convert_timezone(ts).ok_or(())) + // in the rare case one of the values was out of the timeunit's range (eg. see // ), - // try again by allowing a nullable batch as output - .unwrap_or_else(|()| array.unary_opt::<_, TimestampNanosecondType>(convert_timezone)); + // for nanoseconds), try again by allowing a nullable batch as output + .unwrap_or_else(|()| array.unary_opt::<_, T>(convert_timezone)); let array = Arc::new(array) as ArrayRef; Ok(array) } } -/// Wrapper around RawTimestampArrayDecoder to allow specifying the timezone of the output +/// Wrapper around PrimitiveArrayDecoder to allow specifying the timezone of the output /// timestamp array as UTC. -struct TimestampInstantArrayDecoder(RawTimestampArrayDecoder); +struct TimestampInstantArrayDecoder(PrimitiveArrayDecoder); -impl ArrayBatchDecoder for TimestampInstantArrayDecoder { +impl ArrayBatchDecoder for TimestampInstantArrayDecoder { fn next_batch( &mut self, batch_size: usize, diff --git a/src/arrow_reader.rs b/src/arrow_reader.rs index 513398df..71ba2f57 100644 --- a/src/arrow_reader.rs +++ b/src/arrow_reader.rs @@ -20,6 +20,7 @@ pub struct ArrowReaderBuilder { pub(crate) file_metadata: Arc, pub(crate) batch_size: usize, pub(crate) projection: ProjectionMask, + pub(crate) schema_ref: Option, } impl ArrowReaderBuilder { @@ -29,6 +30,7 @@ impl ArrowReaderBuilder { file_metadata, batch_size: DEFAULT_BATCH_SIZE, projection: ProjectionMask::all(), + schema_ref: None, } } @@ -45,6 +47,11 @@ impl ArrowReaderBuilder { self.projection = projection; self } + + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema_ref = Some(schema); + self + } } impl ArrowReaderBuilder { @@ -64,7 +71,9 @@ impl ArrowReaderBuilder { projected_data_type, stripe_index: 0, }; - let schema_ref = Arc::new(create_arrow_schema(&cursor)); + let schema_ref = self + .schema_ref + .unwrap_or_else(|| Arc::new(create_arrow_schema(&cursor))); ArrowReader { cursor, schema_ref, diff --git a/src/reader/decode/timestamp.rs b/src/reader/decode/timestamp.rs index fa69a7a9..5f846dd8 100644 --- a/src/reader/decode/timestamp.rs +++ b/src/reader/decode/timestamp.rs @@ -1,14 +1,20 @@ +use std::marker::PhantomData; + +use arrow::datatypes::{ArrowTimestampType, TimeUnit}; +use snafu::ensure; + use crate::error::{DecodeTimestampSnafu, Result}; const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000; -pub struct TimestampIterator { +pub struct TimestampIterator { base_from_epoch: i64, data: Box> + Send>, secondary: Box> + Send>, + _marker: PhantomData, } -impl TimestampIterator { +impl TimestampIterator { pub fn new( base_from_epoch: i64, data: Box> + Send>, @@ -18,22 +24,23 @@ impl TimestampIterator { base_from_epoch, data, secondary, + _marker: PhantomData, } } } -impl Iterator for TimestampIterator { +impl Iterator for TimestampIterator { type Item = Result; fn next(&mut self) -> Option { // TODO: throw error for mismatched stream lengths? let (seconds_since_orc_base, nanoseconds) = self.data.by_ref().zip(self.secondary.by_ref()).next()?; - decode_timestamp(self.base_from_epoch, seconds_since_orc_base, nanoseconds).transpose() + decode_timestamp::(self.base_from_epoch, seconds_since_orc_base, nanoseconds).transpose() } } -fn decode_timestamp( +fn decode_timestamp( base: i64, seconds_since_orc_base: Result, nanoseconds: Result, @@ -58,18 +65,37 @@ fn decode_timestamp( }; // Convert into nanoseconds since epoch, which Arrow uses as native representation // of timestamps - // The timestamp may overflow as ORC encodes them as a pair of (seconds, nanoseconds) + // The timestamp may overflow i64 as ORC encodes them as a pair of (seconds, nanoseconds) // while we encode them as a single i64 of nanoseconds in Arrow. - let nanoseconds_since_epoch = seconds - .checked_mul(NANOSECONDS_IN_SECOND) - .and_then(|seconds_in_ns| seconds_in_ns.checked_add(nanoseconds as i64)) - .ok_or(()) - .or_else(|()| { + let nanoseconds_since_epoch = + (seconds as i128 * NANOSECONDS_IN_SECOND as i128) + (nanoseconds as i128); + + let nanoseconds_in_timeunit = match T::UNIT { + TimeUnit::Second => 1_000_000_000, + TimeUnit::Millisecond => 1_000_000, + TimeUnit::Microsecond => 1_000, + TimeUnit::Nanosecond => 1, + }; + + // Error if loss of precision + ensure!( + nanoseconds_since_epoch % nanoseconds_in_timeunit == 0, + DecodeTimestampSnafu { + seconds, + nanoseconds, + } + ); + + // Convert to i64 and error if overflow + let num_since_epoch = (nanoseconds_since_epoch / nanoseconds_in_timeunit) + .try_into() + .or_else(|_| { DecodeTimestampSnafu { seconds, nanoseconds, } .fail() })?; - Ok(Some(nanoseconds_since_epoch)) + + Ok(Some(num_since_epoch)) } diff --git a/tests/basic/main.rs b/tests/basic/main.rs index e0620ccb..402bfc47 100644 --- a/tests/basic/main.rs +++ b/tests/basic/main.rs @@ -419,6 +419,44 @@ pub fn overflowing_timestamps_test() { assert!(reader.collect::, _>>().is_err()); } +#[test] +pub fn second_timestamps_test() { + custom_precision_timestamps_test(TimeUnit::Second) +} + +#[test] +pub fn millisecond_timestamps_test() { + custom_precision_timestamps_test(TimeUnit::Millisecond) +} + +#[test] +pub fn microsecond_timestamps_test() { + custom_precision_timestamps_test(TimeUnit::Microsecond) +} + +fn custom_precision_timestamps_test(time_unit: TimeUnit) { + let path = basic_path("overflowing_timestamps.orc"); + let f = File::open(path).expect("no file found"); + let reader = ArrowReaderBuilder::try_new(f) + .unwrap() + .with_schema(Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("ts", DataType::Timestamp(time_unit, None), true), + ]))) + .build(); + let batch = reader.collect::, _>>().unwrap(); + let expected = [ + "+----+---------------------+", + "| id | ts |", + "+----+---------------------+", + "| 1 | 1970-05-23T21:21:18 |", + "| 2 | 0001-01-01T00:00:00 |", + "| 3 | 1970-05-23T21:21:18 |", + "+----+---------------------+", + ]; + assert_batches_eq(&batch, &expected); +} + // From https://github.com/apache/arrow-rs/blob/7705acad845e8b2a366a08640f7acb4033ed7049/arrow-flight/src/sql/metadata/mod.rs#L67-L75 pub fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { let formatted = pretty::pretty_format_batches(batches).unwrap().to_string();