Skip to content

Commit

Permalink
Add ArrowReaderBuilder::schema()
Browse files Browse the repository at this point in the history
This allows getting the schema before building a reader; so users can
apply transformations to the schema, like this:

```
let reader_builder = reader_builder
    .with_projection(projection.clone())
    .with_batch_size(ORC_BATCH_SIZE);

let schema = transform_schema(&reader_builder.schema());

let reader = reader_builder.with_schema(schema).build();
```

where `transform_schema` could, for example, be a function that changes
the `TimeUnit` of `Timestamp` datatypes.
  • Loading branch information
progval committed Jun 19, 2024
1 parent 95ebf43 commit ebe43b5
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
36 changes: 22 additions & 14 deletions src/arrow_reader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{Schema, SchemaRef};
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::record_batch::{RecordBatch, RecordBatchReader};

Expand Down Expand Up @@ -52,6 +52,26 @@ impl<R> ArrowReaderBuilder<R> {
self.schema_ref = Some(schema);
self
}

/// Returns the currently computed schema
///
/// Unless [`with_schema`](Self::with_schema) was called, this is computed dynamically
/// based on the current projection and the underlying file format.
pub fn schema(&self) -> SchemaRef {
let projected_data_type = self
.file_metadata
.root_data_type()
.project(&self.projection);
let metadata = self
.file_metadata
.user_custom_metadata()
.iter()
.map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
.collect::<HashMap<_, _>>();
self.schema_ref
.clone()
.unwrap_or_else(|| Arc::new(projected_data_type.create_arrow_schema(&metadata)))
}
}

impl<R: ChunkReader> ArrowReaderBuilder<R> {
Expand All @@ -61,6 +81,7 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
}

pub fn build(self) -> ArrowReader<R> {
let schema_ref = self.schema();
let projected_data_type = self
.file_metadata
.root_data_type()
Expand All @@ -71,9 +92,6 @@ impl<R: ChunkReader> ArrowReaderBuilder<R> {
projected_data_type,
stripe_index: 0,
};
let schema_ref = self
.schema_ref
.unwrap_or_else(|| Arc::new(create_arrow_schema(&cursor)));
ArrowReader {
cursor,
schema_ref,
Expand Down Expand Up @@ -111,16 +129,6 @@ impl<R: ChunkReader> ArrowReader<R> {
}
}

pub(crate) fn create_arrow_schema<R>(cursor: &Cursor<R>) -> Schema {
let metadata = cursor
.file_metadata
.user_custom_metadata()
.iter()
.map(|(key, value)| (key.clone(), String::from_utf8_lossy(value).to_string()))
.collect::<HashMap<_, _>>();
cursor.projected_data_type.create_arrow_schema(&metadata)
}

impl<R: ChunkReader> RecordBatchReader for ArrowReader<R> {
fn schema(&self) -> SchemaRef {
self.schema_ref.clone()
Expand Down
4 changes: 2 additions & 2 deletions src/async_arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::{ready, Stream};
use futures_util::FutureExt;

use crate::array_decoder::NaiveStripeDecoder;
use crate::arrow_reader::{create_arrow_schema, Cursor};
use crate::arrow_reader::Cursor;
use crate::error::Result;
use crate::reader::metadata::read_metadata_async;
use crate::reader::AsyncChunkReader;
Expand Down Expand Up @@ -191,13 +191,13 @@ impl<R: AsyncChunkReader + 'static> ArrowReaderBuilder<R> {
.file_metadata()
.root_data_type()
.project(&self.projection);
let schema_ref = self.schema();
let cursor = Cursor {
reader: self.reader,
file_metadata: self.file_metadata,
projected_data_type,
stripe_index: 0,
};
let schema_ref = Arc::new(create_arrow_schema(&cursor));
ArrowStreamReader::new(cursor, self.batch_size, schema_ref)
}
}

0 comments on commit ebe43b5

Please sign in to comment.