diff --git a/src/bin/orc-export.rs b/src/bin/orc-export.rs index eab30cd..7c9389c 100644 --- a/src/bin/orc-export.rs +++ b/src/bin/orc-export.rs @@ -1,9 +1,10 @@ use std::{fs::File, io, path::PathBuf}; use anyhow::Result; -use arrow::csv; -use clap::Parser; -use orc_rust::ArrowReaderBuilder; +use arrow::{array::RecordBatch, csv, datatypes::DataType, error::ArrowError, json}; +use clap::{Parser, ValueEnum}; +use json::writer::{JsonFormat, LineDelimited}; +use orc_rust::{projection::ProjectionMask, reader::metadata::read_metadata, ArrowReaderBuilder}; #[derive(Parser)] #[command(name = "orc-export")] @@ -13,30 +14,120 @@ struct Cli { file: PathBuf, /// Output file. If not provided output will be printed on console #[arg(short, long)] - output: Option, - // TODO: head=N - // TODO: convert_dates - // TODO: format=[csv|json] - // TODO: columns="col1,col2" + output_file: Option, + /// Output format. If not provided then the output is csv + #[arg(value_enum, short, long, default_value_t = FileFormat::Csv)] + format: FileFormat, + /// export only first N records + #[arg(short, long, value_name = "N")] + num_rows: Option, + /// export only provided columns. Comma separated list + #[arg(short, long, value_delimiter = ',')] + columns: Option>, +} + +#[derive(Clone, Debug, PartialEq, ValueEnum)] +enum FileFormat { + /// Output data in csv format + Csv, + /// Output data in json format + Json, +} + +enum OutputWriter { + Csv(csv::Writer), + Json(json::Writer), +} + +impl OutputWriter +where + W: io::Write, + F: JsonFormat, +{ + fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { + match self { + OutputWriter::Csv(w) => w.write(batch), + OutputWriter::Json(w) => w.write(batch), + } + } + + fn finish(&mut self) -> Result<(), ArrowError> { + match self { + OutputWriter::Csv(_) => Ok(()), + OutputWriter::Json(w) => w.finish(), + } + } } fn main() -> Result<()> { let cli = Cli::parse(); - let f = File::open(&cli.file)?; - let output_writer: Box = if let Some(output) = cli.output { + + // Prepare reader + let mut f = File::open(&cli.file)?; + let metadata = read_metadata(&mut f)?; + + // Select columns which should be exported (Binary and Decimal are not supported) + let cols: Vec = metadata + .root_data_type() + .children() + .iter() + .enumerate() + // TODO: handle nested types + .filter(|(_, nc)| match nc.data_type().to_arrow_data_type() { + DataType::Binary => false, + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { + matches!(cli.format, FileFormat::Csv) + } + _ => { + if let Some(cols) = &cli.columns { + cols.iter().any(|c| nc.name().eq(c)) + } else { + true + } + } + }) + .map(|(i, _)| i) + .collect(); + + let projection = ProjectionMask::roots(metadata.root_data_type(), cols); + let reader = ArrowReaderBuilder::try_new(f)? + .with_projection(projection) + .build(); + + // Prepare writer + let writer: Box = if let Some(output) = cli.output_file { Box::new(File::create(output)?) } else { Box::new(io::stdout()) }; - let reader = ArrowReaderBuilder::try_new(f)?.build(); - let mut writer = csv::WriterBuilder::new() - .with_header(true) - .build(output_writer); + let mut output_writer = match cli.format { + FileFormat::Json => { + OutputWriter::Json(json::WriterBuilder::new().build::<_, LineDelimited>(writer)) + } + _ => OutputWriter::Csv(csv::WriterBuilder::new().with_header(true).build(writer)), + }; + + // Convert data + let mut num_rows = cli.num_rows.unwrap_or(u64::MAX); + for mut batch in reader.flatten() { + // Restrict rows + if num_rows < batch.num_rows() as u64 { + batch = batch.slice(0, num_rows as usize); + } - for batch in reader.flatten() { - writer.write(&batch)?; + // Save + output_writer.write(&batch)?; + + // Have we reached limit on the number of rows? + if num_rows > batch.num_rows() as u64 { + num_rows = num_rows - batch.num_rows() as u64; + } else { + break; + } } + output_writer.finish()?; + Ok(()) }