From c3761186278628383594a25cc95afcdabaae9c7e Mon Sep 17 00:00:00 2001 From: klangner Date: Sun, 21 Apr 2024 20:55:18 +0200 Subject: [PATCH 1/2] #62 added filtering by rows and columns --- src/bin/orc-export.rs | 122 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 106 insertions(+), 16 deletions(-) diff --git a/src/bin/orc-export.rs b/src/bin/orc-export.rs index eab30cd..9f29634 100644 --- a/src/bin/orc-export.rs +++ b/src/bin/orc-export.rs @@ -1,9 +1,10 @@ use std::{fs::File, io, path::PathBuf}; use anyhow::Result; -use arrow::csv; -use clap::Parser; -use orc_rust::ArrowReaderBuilder; +use arrow::{array::RecordBatch, csv, datatypes::DataType, error::ArrowError, json}; +use clap::{Parser, ValueEnum}; +use json::writer::{JsonFormat, LineDelimited}; +use orc_rust::{reader::metadata::read_metadata, ArrowReaderBuilder}; #[derive(Parser)] #[command(name = "orc-export")] @@ -13,30 +14,119 @@ struct Cli { file: PathBuf, /// Output file. If not provided output will be printed on console #[arg(short, long)] - output: Option, - // TODO: head=N - // TODO: convert_dates - // TODO: format=[csv|json] - // TODO: columns="col1,col2" + output_file: Option, + /// Output format. If not provided then the output is csv + #[arg(value_enum, short, long)] + format: Option, + /// export only first N records + #[arg(short, long, value_name = "N")] + num_rows: Option, + /// export only provided columns. Comma separated list + #[arg(short, long, value_delimiter = ',')] + columns: Option>, +} + +#[derive(Clone, Debug, PartialEq, ValueEnum)] +enum FileFormat { + /// Output data in csv format + Csv, + /// Output data in json format + Json, +} + +enum OutputWriter { + Csv(csv::Writer), + Json(json::Writer), +} + +impl OutputWriter +where + W: io::Write, + F: JsonFormat, +{ + fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { + match self { + OutputWriter::Csv(w) => w.write(batch), + OutputWriter::Json(w) => w.write(batch), + } + } + + fn finish(&mut self) -> Result<(), ArrowError> { + match self { + OutputWriter::Csv(_) => Ok(()), + OutputWriter::Json(w) => w.finish(), + } + } } fn main() -> Result<()> { let cli = Cli::parse(); - let f = File::open(&cli.file)?; - let output_writer: Box = if let Some(output) = cli.output { + + // Prepare reader + let mut f = File::open(&cli.file)?; + let metadata = read_metadata(&mut f)?; + let reader = ArrowReaderBuilder::try_new(f)?.build(); + + // Prepare writer + let writer: Box = if let Some(output) = cli.output_file { Box::new(File::create(output)?) } else { Box::new(io::stdout()) }; - let reader = ArrowReaderBuilder::try_new(f)?.build(); - let mut writer = csv::WriterBuilder::new() - .with_header(true) - .build(output_writer); + let mut output_writer = match cli.format { + Some(FileFormat::Json) => { + OutputWriter::Json(json::WriterBuilder::new().build::<_, LineDelimited>(writer)) + } + _ => OutputWriter::Csv(csv::WriterBuilder::new().with_header(true).build(writer)), + }; + + // Select columns which should be exported (Binary and Decimal are not supported) + let is_json = cli.format == Some(FileFormat::Json); + let skip_cols: Vec = metadata + .root_data_type() + .children() + .iter() + .enumerate() + .filter(|(_, nc)| match nc.data_type().to_arrow_data_type() { + DataType::Binary => true, + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => is_json, + _ => { + if let Some(cols) = &cli.columns { + !cols.iter().any(|c| nc.name().eq(c)) + } else { + false + } + } + }) + .map(|(i, _)| i) + .rev() + .collect(); - for batch in reader.flatten() { - writer.write(&batch)?; + // Convert data + let mut num_rows = cli.num_rows.unwrap_or(i64::MAX); + for mut batch in reader.flatten() { + // Restrict rows + if num_rows < batch.num_rows() as i64 { + batch = batch.slice(0, num_rows as usize); + } + + // Restrict column + for i in &skip_cols { + batch.remove_column(*i); + } + + // Save + output_writer.write(&batch)?; + + // Have we reached limit on number of rows? + num_rows = num_rows - batch.num_rows() as i64; + if num_rows <= 0 { + break; + } } + output_writer.finish()?; + Ok(()) } From d798c4ab654bcadec2d50fe2aa7b7db514650ed9 Mon Sep 17 00:00:00 2001 From: klangner Date: Mon, 22 Apr 2024 21:17:52 +0200 Subject: [PATCH 2/2] CR fixes --- src/bin/orc-export.rs | 73 ++++++++++++++++++++++--------------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/src/bin/orc-export.rs b/src/bin/orc-export.rs index 9f29634..7c9389c 100644 --- a/src/bin/orc-export.rs +++ b/src/bin/orc-export.rs @@ -4,7 +4,7 @@ use anyhow::Result; use arrow::{array::RecordBatch, csv, datatypes::DataType, error::ArrowError, json}; use clap::{Parser, ValueEnum}; use json::writer::{JsonFormat, LineDelimited}; -use orc_rust::{reader::metadata::read_metadata, ArrowReaderBuilder}; +use orc_rust::{projection::ProjectionMask, reader::metadata::read_metadata, ArrowReaderBuilder}; #[derive(Parser)] #[command(name = "orc-export")] @@ -16,11 +16,11 @@ struct Cli { #[arg(short, long)] output_file: Option, /// Output format. If not provided then the output is csv - #[arg(value_enum, short, long)] - format: Option, + #[arg(value_enum, short, long, default_value_t = FileFormat::Csv)] + format: FileFormat, /// export only first N records #[arg(short, long, value_name = "N")] - num_rows: Option, + num_rows: Option, /// export only provided columns. Comma separated list #[arg(short, long, value_delimiter = ',')] columns: Option>, @@ -65,63 +65,64 @@ fn main() -> Result<()> { // Prepare reader let mut f = File::open(&cli.file)?; let metadata = read_metadata(&mut f)?; - let reader = ArrowReaderBuilder::try_new(f)?.build(); - - // Prepare writer - let writer: Box = if let Some(output) = cli.output_file { - Box::new(File::create(output)?) - } else { - Box::new(io::stdout()) - }; - - let mut output_writer = match cli.format { - Some(FileFormat::Json) => { - OutputWriter::Json(json::WriterBuilder::new().build::<_, LineDelimited>(writer)) - } - _ => OutputWriter::Csv(csv::WriterBuilder::new().with_header(true).build(writer)), - }; // Select columns which should be exported (Binary and Decimal are not supported) - let is_json = cli.format == Some(FileFormat::Json); - let skip_cols: Vec = metadata + let cols: Vec = metadata .root_data_type() .children() .iter() .enumerate() + // TODO: handle nested types .filter(|(_, nc)| match nc.data_type().to_arrow_data_type() { - DataType::Binary => true, - DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => is_json, + DataType::Binary => false, + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { + matches!(cli.format, FileFormat::Csv) + } _ => { if let Some(cols) = &cli.columns { - !cols.iter().any(|c| nc.name().eq(c)) + cols.iter().any(|c| nc.name().eq(c)) } else { - false + true } } }) .map(|(i, _)| i) - .rev() .collect(); + let projection = ProjectionMask::roots(metadata.root_data_type(), cols); + let reader = ArrowReaderBuilder::try_new(f)? + .with_projection(projection) + .build(); + + // Prepare writer + let writer: Box = if let Some(output) = cli.output_file { + Box::new(File::create(output)?) + } else { + Box::new(io::stdout()) + }; + + let mut output_writer = match cli.format { + FileFormat::Json => { + OutputWriter::Json(json::WriterBuilder::new().build::<_, LineDelimited>(writer)) + } + _ => OutputWriter::Csv(csv::WriterBuilder::new().with_header(true).build(writer)), + }; + // Convert data - let mut num_rows = cli.num_rows.unwrap_or(i64::MAX); + let mut num_rows = cli.num_rows.unwrap_or(u64::MAX); for mut batch in reader.flatten() { // Restrict rows - if num_rows < batch.num_rows() as i64 { + if num_rows < batch.num_rows() as u64 { batch = batch.slice(0, num_rows as usize); } - // Restrict column - for i in &skip_cols { - batch.remove_column(*i); - } - // Save output_writer.write(&batch)?; - // Have we reached limit on number of rows? - num_rows = num_rows - batch.num_rows() as i64; - if num_rows <= 0 { + // Have we reached limit on the number of rows? + if num_rows > batch.num_rows() as u64 { + num_rows = num_rows - batch.num_rows() as u64; + } else { break; } }