Skip to content

Commit

Permalink
#62 added filtering by rows and columns
Browse files Browse the repository at this point in the history
  • Loading branch information
klangner committed Apr 21, 2024
1 parent 98b4a1e commit c376118
Showing 1 changed file with 106 additions and 16 deletions.
122 changes: 106 additions & 16 deletions src/bin/orc-export.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{fs::File, io, path::PathBuf};

use anyhow::Result;
use arrow::csv;
use clap::Parser;
use orc_rust::ArrowReaderBuilder;
use arrow::{array::RecordBatch, csv, datatypes::DataType, error::ArrowError, json};
use clap::{Parser, ValueEnum};
use json::writer::{JsonFormat, LineDelimited};
use orc_rust::{reader::metadata::read_metadata, ArrowReaderBuilder};

#[derive(Parser)]
#[command(name = "orc-export")]
Expand All @@ -13,30 +14,119 @@ struct Cli {
file: PathBuf,
/// Output file. If not provided output will be printed on console
#[arg(short, long)]
output: Option<PathBuf>,
// TODO: head=N
// TODO: convert_dates
// TODO: format=[csv|json]
// TODO: columns="col1,col2"
output_file: Option<PathBuf>,
/// Output format. If not provided then the output is csv
#[arg(value_enum, short, long)]
format: Option<FileFormat>,
/// export only first N records
#[arg(short, long, value_name = "N")]
num_rows: Option<i64>,
/// export only provided columns. Comma separated list
#[arg(short, long, value_delimiter = ',')]
columns: Option<Vec<String>>,
}

#[derive(Clone, Debug, PartialEq, ValueEnum)]
enum FileFormat {
/// Output data in csv format
Csv,
/// Output data in json format
Json,
}

enum OutputWriter<W: io::Write, F: JsonFormat> {
Csv(csv::Writer<W>),
Json(json::Writer<W, F>),
}

impl<W, F> OutputWriter<W, F>
where
W: io::Write,
F: JsonFormat,
{
fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
match self {
OutputWriter::Csv(w) => w.write(batch),
OutputWriter::Json(w) => w.write(batch),
}
}

fn finish(&mut self) -> Result<(), ArrowError> {
match self {
OutputWriter::Csv(_) => Ok(()),
OutputWriter::Json(w) => w.finish(),
}
}
}

fn main() -> Result<()> {
let cli = Cli::parse();
let f = File::open(&cli.file)?;
let output_writer: Box<dyn io::Write> = if let Some(output) = cli.output {

// Prepare reader
let mut f = File::open(&cli.file)?;
let metadata = read_metadata(&mut f)?;
let reader = ArrowReaderBuilder::try_new(f)?.build();

// Prepare writer
let writer: Box<dyn io::Write> = if let Some(output) = cli.output_file {
Box::new(File::create(output)?)
} else {
Box::new(io::stdout())
};

let reader = ArrowReaderBuilder::try_new(f)?.build();
let mut writer = csv::WriterBuilder::new()
.with_header(true)
.build(output_writer);
let mut output_writer = match cli.format {
Some(FileFormat::Json) => {
OutputWriter::Json(json::WriterBuilder::new().build::<_, LineDelimited>(writer))
}
_ => OutputWriter::Csv(csv::WriterBuilder::new().with_header(true).build(writer)),
};

// Select columns which should be exported (Binary and Decimal are not supported)
let is_json = cli.format == Some(FileFormat::Json);
let skip_cols: Vec<usize> = metadata
.root_data_type()
.children()
.iter()
.enumerate()
.filter(|(_, nc)| match nc.data_type().to_arrow_data_type() {
DataType::Binary => true,
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => is_json,
_ => {
if let Some(cols) = &cli.columns {
!cols.iter().any(|c| nc.name().eq(c))
} else {
false
}
}
})
.map(|(i, _)| i)
.rev()
.collect();

for batch in reader.flatten() {
writer.write(&batch)?;
// Convert data
let mut num_rows = cli.num_rows.unwrap_or(i64::MAX);
for mut batch in reader.flatten() {
// Restrict rows
if num_rows < batch.num_rows() as i64 {
batch = batch.slice(0, num_rows as usize);
}

// Restrict column
for i in &skip_cols {
batch.remove_column(*i);
}

// Save
output_writer.write(&batch)?;

// Have we reached limit on number of rows?
num_rows = num_rows - batch.num_rows() as i64;
if num_rows <= 0 {
break;
}
}

output_writer.finish()?;

Ok(())
}

0 comments on commit c376118

Please sign in to comment.