Skip to content

Commit

Permalink
Add subcommand to perform conversion and split of file
Browse files Browse the repository at this point in the history
  • Loading branch information
natir committed Jun 28, 2022
1 parent 827c3fa commit 40131d2
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 26 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,9 @@ lib = ["vcf2parquet-lib"]
bin = ["vcf2parquet-bin"]


[[bin]]
name = "vcf2parquet"
required-features = ["bin"]

[package.metadata.docs.rs]
all-features = true
107 changes: 86 additions & 21 deletions vcf2parquet-bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,52 @@ pub struct Command {
#[clap(short = 'i', long = "input")]
input: std::path::PathBuf,

/// Output path
#[clap(short = 'o', long = "output")]
output: std::path::PathBuf,

/// Batch size (default 100,000)
#[clap(short = 'b', long = "batch-size")]
batch_size: Option<usize>,

/// Compression method (default snappy)
#[clap(arg_enum, short = 'c', long = "compression")]
compression: Option<Compression>,

#[clap(subcommand)]
subcommand: SubCommand,
}

#[derive(clap::Parser, std::fmt::Debug, Clone)]
pub enum SubCommand {
Convert(Convert),
Split(Split),
}

/// Convert a vcf in a parquet
#[derive(clap::Parser, std::fmt::Debug, Clone)]
pub struct Convert {
/// Output path
#[clap(short = 'o', long = "output")]
output: std::path::PathBuf,
}

/// Convert a vcf in multiple parquet file each file contains `batch_size` record
#[derive(clap::Parser, std::fmt::Debug, Clone)]
pub struct Split {
/// Output format string, first {} are replace by number
#[clap(short = 'f', long = "output-format")]
format: String,
}

impl Command {
/// Get input
pub fn input(&self) -> &std::path::PathBuf {
&self.input
}

pub fn output(&self) -> &std::path::PathBuf {
&self.output
}

/// Get batch_size set by user or default value
pub fn batch_size(&self) -> usize {
self.batch_size.unwrap_or(100_000)
}

/// Get compression set by user or default value
pub fn compression(&self) -> arrow2::io::parquet::write::CompressionOptions {
match self.compression {
Some(Compression::Uncompressed) => {
Expand All @@ -63,6 +83,25 @@ impl Command {
None => arrow2::io::parquet::write::CompressionOptions::Snappy,
}
}

/// Get subcommand
pub fn subcommand(&self) -> &SubCommand {
&self.subcommand
}
}

impl Convert {
/// Get output
pub fn output(&self) -> &std::path::PathBuf {
&self.output
}
}

impl Split {
/// Get output format
pub fn format(&self) -> &str {
&self.format
}
}

#[cfg(test)]
Expand All @@ -73,40 +112,54 @@ mod tests {
fn basic_value() {
let mut params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: None,
compression: Some(Compression::Snappy),
subcommand: SubCommand::Convert(Convert {
output: std::path::Path::new("test/output.parquet").to_path_buf(),
}),
};

assert_eq!(
params.input(),
&std::path::Path::new("test/input.vcf").to_path_buf()
);

assert_eq!(
params.output(),
&std::path::Path::new("test/output.parquet").to_path_buf()
);
match params.subcommand.clone() {
SubCommand::Convert(c) => assert_eq!(
c.output(),
&std::path::Path::new("test/output.parquet").to_path_buf()
),
_ => unreachable!(),
}

assert_eq!(params.batch_size(), 100_000);

params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: Some(100),
compression: Some(Compression::Snappy),
subcommand: SubCommand::Split(Split {
format: "test_{}.parquet".to_string(),
}),
};

assert_eq!(params.batch_size(), 100);

match params.subcommand.clone() {
SubCommand::Split(s) => assert_eq!(s.format(), "test_{}.parquet"),
_ => unreachable!(),
}
}

#[test]
fn compression() {
let mut params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: None,
compression: None,
subcommand: SubCommand::Convert(Convert {
output: std::path::Path::new("test/output.parquet").to_path_buf(),
}),
};

assert_eq!(
Expand All @@ -116,9 +169,11 @@ mod tests {

params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: None,
compression: Some(Compression::Uncompressed),
subcommand: SubCommand::Convert(Convert {
output: std::path::Path::new("test/output.parquet").to_path_buf(),
}),
};

assert_eq!(
Expand All @@ -128,9 +183,11 @@ mod tests {

params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: None,
compression: Some(Compression::Snappy),
subcommand: SubCommand::Convert(Convert {
output: std::path::Path::new("test/output.parquet").to_path_buf(),
}),
};

assert_eq!(
Expand All @@ -140,9 +197,11 @@ mod tests {

params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: None,
compression: Some(Compression::Gzip),
subcommand: SubCommand::Convert(Convert {
output: std::path::Path::new("test/output.parquet").to_path_buf(),
}),
};

assert_eq!(
Expand All @@ -152,9 +211,11 @@ mod tests {

params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: None,
compression: Some(Compression::Lzo),
subcommand: SubCommand::Convert(Convert {
output: std::path::Path::new("test/output.parquet").to_path_buf(),
}),
};

assert_eq!(
Expand All @@ -164,9 +225,11 @@ mod tests {

params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: None,
compression: Some(Compression::Brotli),
subcommand: SubCommand::Convert(Convert {
output: std::path::Path::new("test/output.parquet").to_path_buf(),
}),
};

assert_eq!(
Expand All @@ -176,9 +239,11 @@ mod tests {

params = Command {
input: std::path::Path::new("test/input.vcf").to_path_buf(),
output: std::path::Path::new("test/output.parquet").to_path_buf(),
batch_size: None,
compression: Some(Compression::Lz4),
subcommand: SubCommand::Convert(Convert {
output: std::path::Path::new("test/output.parquet").to_path_buf(),
}),
};

assert_eq!(
Expand Down
28 changes: 27 additions & 1 deletion vcf2parquet-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,21 @@ pub mod error;
pub fn main() -> error::Result<()> {
let params = cli::Command::parse();

match params.subcommand() {
cli::SubCommand::Convert(subparams) => convert(&params, &subparams),
cli::SubCommand::Split(subparams) => split(&params, &subparams),
}
}

fn convert(params: &cli::Command, subparams: &cli::Convert) -> error::Result<()> {
let mut reader = std::fs::File::open(params.input())
.map_err(error::mapping)
.map(Box::new)
.map(|x| niffler::get_reader(x))
.map_err(error::mapping)?
.map(|(file, _)| std::io::BufReader::new(file))?;

let mut output = std::fs::File::create(params.output()).map_err(error::mapping)?;
let mut output = std::fs::File::create(subparams.output()).map_err(error::mapping)?;

lib::vcf2parquet(
&mut reader,
Expand All @@ -34,3 +41,22 @@ pub fn main() -> error::Result<()> {

Ok(())
}

fn split(params: &cli::Command, subparams: &cli::Split) -> error::Result<()> {
let mut reader = std::fs::File::open(params.input())
.map_err(error::mapping)
.map(Box::new)
.map(|x| niffler::get_reader(x))
.map_err(error::mapping)?
.map(|(file, _)| std::io::BufReader::new(file))?;

lib::vcf2multiparquet(
&mut reader,
subparams.format(),
params.batch_size(),
params.compression(),
)
.map_err(error::mapping)?;

Ok(())
}
3 changes: 2 additions & 1 deletion vcf2parquet-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ log = { version = "0.4" }
thiserror = { version = "1" }

[dev-dependencies]
lazy_static = { version = "1" }
lazy_static = { version = "1" }
tempfile = { version = "3" }
29 changes: 26 additions & 3 deletions vcf2parquet-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ where
}

/// Read `input` vcf and write each row group in a parquet file match with template
pub fn vcf2multiparquet<R, W>(
pub fn vcf2multiparquet<R>(
input: &mut R,
template: String,
template: &str,
batch_size: usize,
compression: arrow2::io::parquet::write::CompressionOptions,
) -> error::Result<()>
Expand Down Expand Up @@ -305,7 +305,7 @@ mod tests {
];

#[test]
fn positives() {
fn convert_positives() {
let mut input = std::io::BufReader::new(&*VCF_FILE);
let mut output = Vec::new();

Expand Down Expand Up @@ -334,4 +334,27 @@ mod tests {

assert!(result.is_err());
}

#[test]
fn multi_positives() {
let mut input = std::io::BufReader::new(&*VCF_FILE);
let dir = tempfile::tempdir().unwrap();

let format = dir
.path()
.join("test_{}.parquet")
.as_os_str()
.to_str()
.unwrap()
.to_string();
println!("{}", format);

vcf2multiparquet(
&mut input,
&format,
1,
arrow2::io::parquet::write::CompressionOptions::Gzip,
)
.unwrap();
}
}

0 comments on commit 40131d2

Please sign in to comment.