From 40131d2ef14dd52e54f8e38757552fb5d21632da Mon Sep 17 00:00:00 2001 From: Pierre Marijon Date: Tue, 28 Jun 2022 17:47:28 +0200 Subject: [PATCH] Add subcommand to perform conversion and split of file --- Cargo.toml | 4 ++ vcf2parquet-bin/src/cli.rs | 107 +++++++++++++++++++++++++++++-------- vcf2parquet-bin/src/lib.rs | 28 +++++++++- vcf2parquet-lib/Cargo.toml | 3 +- vcf2parquet-lib/src/lib.rs | 29 ++++++++-- 5 files changed, 145 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7301a11..08ae4b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,5 +28,9 @@ lib = ["vcf2parquet-lib"] bin = ["vcf2parquet-bin"] +[[bin]] +name = "vcf2parquet" +required-features = ["bin"] + [package.metadata.docs.rs] all-features = true \ No newline at end of file diff --git a/vcf2parquet-bin/src/cli.rs b/vcf2parquet-bin/src/cli.rs index 4d271f5..b3e4058 100644 --- a/vcf2parquet-bin/src/cli.rs +++ b/vcf2parquet-bin/src/cli.rs @@ -23,10 +23,6 @@ pub struct Command { #[clap(short = 'i', long = "input")] input: std::path::PathBuf, - /// Output path - #[clap(short = 'o', long = "output")] - output: std::path::PathBuf, - /// Batch size (default 100,000) #[clap(short = 'b', long = "batch-size")] batch_size: Option, @@ -34,21 +30,45 @@ pub struct Command { /// Compression method (default snappy) #[clap(arg_enum, short = 'c', long = "compression")] compression: Option, + + #[clap(subcommand)] + subcommand: SubCommand, +} + +#[derive(clap::Parser, std::fmt::Debug, Clone)] +pub enum SubCommand { + Convert(Convert), + Split(Split), +} + +/// Convert a vcf in a parquet +#[derive(clap::Parser, std::fmt::Debug, Clone)] +pub struct Convert { + /// Output path + #[clap(short = 'o', long = "output")] + output: std::path::PathBuf, +} + +/// Convert a vcf in multiple parquet file each file contains `batch_size` record +#[derive(clap::Parser, std::fmt::Debug, Clone)] +pub struct Split { + /// Output format string, first {} are replace by number + #[clap(short = 'f', long = "output-format")] + format: String, } impl Command { + /// Get input pub fn input(&self) -> &std::path::PathBuf { &self.input } - pub fn output(&self) -> &std::path::PathBuf { - &self.output - } - + /// Get batch_size set by user or default value pub fn batch_size(&self) -> usize { self.batch_size.unwrap_or(100_000) } + /// Get compression set by user or default value pub fn compression(&self) -> arrow2::io::parquet::write::CompressionOptions { match self.compression { Some(Compression::Uncompressed) => { @@ -63,6 +83,25 @@ impl Command { None => arrow2::io::parquet::write::CompressionOptions::Snappy, } } + + /// Get subcommand + pub fn subcommand(&self) -> &SubCommand { + &self.subcommand + } +} + +impl Convert { + /// Get output + pub fn output(&self) -> &std::path::PathBuf { + &self.output + } +} + +impl Split { + /// Get output format + pub fn format(&self) -> &str { + &self.format + } } #[cfg(test)] @@ -73,9 +112,11 @@ mod tests { fn basic_value() { let mut params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: None, compression: Some(Compression::Snappy), + subcommand: SubCommand::Convert(Convert { + output: std::path::Path::new("test/output.parquet").to_path_buf(), + }), }; assert_eq!( @@ -83,30 +124,42 @@ mod tests { &std::path::Path::new("test/input.vcf").to_path_buf() ); - assert_eq!( - params.output(), - &std::path::Path::new("test/output.parquet").to_path_buf() - ); + match params.subcommand.clone() { + SubCommand::Convert(c) => assert_eq!( + c.output(), + &std::path::Path::new("test/output.parquet").to_path_buf() + ), + _ => unreachable!(), + } assert_eq!(params.batch_size(), 100_000); params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: Some(100), compression: Some(Compression::Snappy), + subcommand: SubCommand::Split(Split { + format: "test_{}.parquet".to_string(), + }), }; assert_eq!(params.batch_size(), 100); + + match params.subcommand.clone() { + SubCommand::Split(s) => assert_eq!(s.format(), "test_{}.parquet"), + _ => unreachable!(), + } } #[test] fn compression() { let mut params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: None, compression: None, + subcommand: SubCommand::Convert(Convert { + output: std::path::Path::new("test/output.parquet").to_path_buf(), + }), }; assert_eq!( @@ -116,9 +169,11 @@ mod tests { params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: None, compression: Some(Compression::Uncompressed), + subcommand: SubCommand::Convert(Convert { + output: std::path::Path::new("test/output.parquet").to_path_buf(), + }), }; assert_eq!( @@ -128,9 +183,11 @@ mod tests { params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: None, compression: Some(Compression::Snappy), + subcommand: SubCommand::Convert(Convert { + output: std::path::Path::new("test/output.parquet").to_path_buf(), + }), }; assert_eq!( @@ -140,9 +197,11 @@ mod tests { params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: None, compression: Some(Compression::Gzip), + subcommand: SubCommand::Convert(Convert { + output: std::path::Path::new("test/output.parquet").to_path_buf(), + }), }; assert_eq!( @@ -152,9 +211,11 @@ mod tests { params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: None, compression: Some(Compression::Lzo), + subcommand: SubCommand::Convert(Convert { + output: std::path::Path::new("test/output.parquet").to_path_buf(), + }), }; assert_eq!( @@ -164,9 +225,11 @@ mod tests { params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: None, compression: Some(Compression::Brotli), + subcommand: SubCommand::Convert(Convert { + output: std::path::Path::new("test/output.parquet").to_path_buf(), + }), }; assert_eq!( @@ -176,9 +239,11 @@ mod tests { params = Command { input: std::path::Path::new("test/input.vcf").to_path_buf(), - output: std::path::Path::new("test/output.parquet").to_path_buf(), batch_size: None, compression: Some(Compression::Lz4), + subcommand: SubCommand::Convert(Convert { + output: std::path::Path::new("test/output.parquet").to_path_buf(), + }), }; assert_eq!( diff --git a/vcf2parquet-bin/src/lib.rs b/vcf2parquet-bin/src/lib.rs index 76a75c6..37e4732 100644 --- a/vcf2parquet-bin/src/lib.rs +++ b/vcf2parquet-bin/src/lib.rs @@ -15,6 +15,13 @@ pub mod error; pub fn main() -> error::Result<()> { let params = cli::Command::parse(); + match params.subcommand() { + cli::SubCommand::Convert(subparams) => convert(¶ms, &subparams), + cli::SubCommand::Split(subparams) => split(¶ms, &subparams), + } +} + +fn convert(params: &cli::Command, subparams: &cli::Convert) -> error::Result<()> { let mut reader = std::fs::File::open(params.input()) .map_err(error::mapping) .map(Box::new) @@ -22,7 +29,7 @@ pub fn main() -> error::Result<()> { .map_err(error::mapping)? .map(|(file, _)| std::io::BufReader::new(file))?; - let mut output = std::fs::File::create(params.output()).map_err(error::mapping)?; + let mut output = std::fs::File::create(subparams.output()).map_err(error::mapping)?; lib::vcf2parquet( &mut reader, @@ -34,3 +41,22 @@ pub fn main() -> error::Result<()> { Ok(()) } + +fn split(params: &cli::Command, subparams: &cli::Split) -> error::Result<()> { + let mut reader = std::fs::File::open(params.input()) + .map_err(error::mapping) + .map(Box::new) + .map(|x| niffler::get_reader(x)) + .map_err(error::mapping)? + .map(|(file, _)| std::io::BufReader::new(file))?; + + lib::vcf2multiparquet( + &mut reader, + subparams.format(), + params.batch_size(), + params.compression(), + ) + .map_err(error::mapping)?; + + Ok(()) +} diff --git a/vcf2parquet-lib/Cargo.toml b/vcf2parquet-lib/Cargo.toml index 38cb3eb..3c545d5 100644 --- a/vcf2parquet-lib/Cargo.toml +++ b/vcf2parquet-lib/Cargo.toml @@ -14,4 +14,5 @@ log = { version = "0.4" } thiserror = { version = "1" } [dev-dependencies] -lazy_static = { version = "1" } \ No newline at end of file +lazy_static = { version = "1" } +tempfile = { version = "3" } \ No newline at end of file diff --git a/vcf2parquet-lib/src/lib.rs b/vcf2parquet-lib/src/lib.rs index b808736..cb55728 100644 --- a/vcf2parquet-lib/src/lib.rs +++ b/vcf2parquet-lib/src/lib.rs @@ -68,9 +68,9 @@ where } /// Read `input` vcf and write each row group in a parquet file match with template -pub fn vcf2multiparquet( +pub fn vcf2multiparquet( input: &mut R, - template: String, + template: &str, batch_size: usize, compression: arrow2::io::parquet::write::CompressionOptions, ) -> error::Result<()> @@ -305,7 +305,7 @@ mod tests { ]; #[test] - fn positives() { + fn convert_positives() { let mut input = std::io::BufReader::new(&*VCF_FILE); let mut output = Vec::new(); @@ -334,4 +334,27 @@ mod tests { assert!(result.is_err()); } + + #[test] + fn multi_positives() { + let mut input = std::io::BufReader::new(&*VCF_FILE); + let dir = tempfile::tempdir().unwrap(); + + let format = dir + .path() + .join("test_{}.parquet") + .as_os_str() + .to_str() + .unwrap() + .to_string(); + println!("{}", format); + + vcf2multiparquet( + &mut input, + &format, + 1, + arrow2::io::parquet::write::CompressionOptions::Gzip, + ) + .unwrap(); + } }