Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#62 added filtering by rows and columns #87

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 107 additions & 16 deletions src/bin/orc-export.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{fs::File, io, path::PathBuf};

use anyhow::Result;
use arrow::csv;
use clap::Parser;
use orc_rust::ArrowReaderBuilder;
use arrow::{array::RecordBatch, csv, datatypes::DataType, error::ArrowError, json};
use clap::{Parser, ValueEnum};
use json::writer::{JsonFormat, LineDelimited};
use orc_rust::{projection::ProjectionMask, reader::metadata::read_metadata, ArrowReaderBuilder};

#[derive(Parser)]
#[command(name = "orc-export")]
Expand All @@ -13,30 +14,120 @@ struct Cli {
file: PathBuf,
/// Output file. If not provided output will be printed on console
#[arg(short, long)]
output: Option<PathBuf>,
// TODO: head=N
// TODO: convert_dates
// TODO: format=[csv|json]
// TODO: columns="col1,col2"
output_file: Option<PathBuf>,
/// Output format. If not provided then the output is csv
#[arg(value_enum, short, long, default_value_t = FileFormat::Csv)]
format: FileFormat,
/// export only first N records
#[arg(short, long, value_name = "N")]
num_rows: Option<u64>,
/// export only provided columns. Comma separated list
#[arg(short, long, value_delimiter = ',')]
columns: Option<Vec<String>>,
}

#[derive(Clone, Debug, PartialEq, ValueEnum)]
enum FileFormat {
/// Output data in csv format
Csv,
/// Output data in json format
Json,
}

enum OutputWriter<W: io::Write, F: JsonFormat> {
Csv(csv::Writer<W>),
Json(json::Writer<W, F>),
}

impl<W, F> OutputWriter<W, F>
where
W: io::Write,
F: JsonFormat,
{
fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
match self {
OutputWriter::Csv(w) => w.write(batch),
OutputWriter::Json(w) => w.write(batch),
}
}

fn finish(&mut self) -> Result<(), ArrowError> {
match self {
OutputWriter::Csv(_) => Ok(()),
OutputWriter::Json(w) => w.finish(),
}
}
}

fn main() -> Result<()> {
let cli = Cli::parse();
let f = File::open(&cli.file)?;
let output_writer: Box<dyn io::Write> = if let Some(output) = cli.output {

// Prepare reader
let mut f = File::open(&cli.file)?;
let metadata = read_metadata(&mut f)?;

// Select columns which should be exported (Binary and Decimal are not supported)
let cols: Vec<usize> = metadata
.root_data_type()
.children()
.iter()
.enumerate()
// TODO: handle nested types
.filter(|(_, nc)| match nc.data_type().to_arrow_data_type() {
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
DataType::Binary => false,
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
matches!(cli.format, FileFormat::Csv)
}
_ => {
if let Some(cols) = &cli.columns {
cols.iter().any(|c| nc.name().eq(c))
} else {
true
}
}
})
.map(|(i, _)| i)
.collect();

let projection = ProjectionMask::roots(metadata.root_data_type(), cols);
let reader = ArrowReaderBuilder::try_new(f)?
.with_projection(projection)
.build();

// Prepare writer
let writer: Box<dyn io::Write> = if let Some(output) = cli.output_file {
Box::new(File::create(output)?)
} else {
Box::new(io::stdout())
};

let reader = ArrowReaderBuilder::try_new(f)?.build();
let mut writer = csv::WriterBuilder::new()
.with_header(true)
.build(output_writer);
let mut output_writer = match cli.format {
FileFormat::Json => {
OutputWriter::Json(json::WriterBuilder::new().build::<_, LineDelimited>(writer))
}
_ => OutputWriter::Csv(csv::WriterBuilder::new().with_header(true).build(writer)),
};

// Convert data
let mut num_rows = cli.num_rows.unwrap_or(u64::MAX);
for mut batch in reader.flatten() {
// Restrict rows
if num_rows < batch.num_rows() as u64 {
batch = batch.slice(0, num_rows as usize);
}

for batch in reader.flatten() {
writer.write(&batch)?;
// Save
output_writer.write(&batch)?;

// Have we reached limit on the number of rows?
if num_rows > batch.num_rows() as u64 {
num_rows = num_rows - batch.num_rows() as u64;
} else {
break;
}
}

output_writer.finish()?;

Ok(())
}
Loading