diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..a3c19da --- /dev/null +++ b/src/main.rs @@ -0,0 +1,7 @@ +#[cfg(feature = "bin")] +pub use vcf2parquet_bin::*; + +#[cfg(feature = "bin")] +fn main() -> vcf2parquet_bin::error::Result<()> { + vcf2parquet_bin::main() +} diff --git a/vcf2parquet-bin/Cargo.toml b/vcf2parquet-bin/Cargo.toml index bc59f8f..0699158 100644 --- a/vcf2parquet-bin/Cargo.toml +++ b/vcf2parquet-bin/Cargo.toml @@ -3,8 +3,6 @@ name = "vcf2parquet-bin" version = "0.1.0" edition = "2021" -default-run = "vcf2parquet" - [dependencies] vcf2parquet-lib = { path = "../vcf2parquet-lib", version = "0.1.0" } niffler = { version = "2" } @@ -18,7 +16,3 @@ thiserror = { version = "1" } # cli management clap = { version = "3", features = ["derive"] } - -[[bin]] -name = "vcf2parquet" -path = "src/main.rs" \ No newline at end of file diff --git a/vcf2parquet-bin/src/main.rs b/vcf2parquet-bin/src/lib.rs similarity index 88% rename from vcf2parquet-bin/src/main.rs rename to vcf2parquet-bin/src/lib.rs index 14367e4..76a75c6 100644 --- a/vcf2parquet-bin/src/main.rs +++ b/vcf2parquet-bin/src/lib.rs @@ -9,10 +9,10 @@ use clap::Parser as _; use vcf2parquet_lib as lib; /* mod section */ -mod cli; -mod error; +pub mod cli; +pub mod error; -fn main() -> error::Result<()> { +pub fn main() -> error::Result<()> { let params = cli::Command::parse(); let mut reader = std::fs::File::open(params.input()) @@ -24,7 +24,7 @@ fn main() -> error::Result<()> { let mut output = std::fs::File::create(params.output()).map_err(error::mapping)?; - lib::noodles2arrow( + lib::vcf2parquet( &mut reader, &mut output, params.batch_size(), diff --git a/vcf2parquet-lib/src/lib.rs b/vcf2parquet-lib/src/lib.rs index 54ef0ef..b808736 100644 --- a/vcf2parquet-lib/src/lib.rs +++ b/vcf2parquet-lib/src/lib.rs @@ -13,7 +13,7 @@ pub mod record2chunk; pub mod schema; /// Read `input` vcf and write parquet in `output` -pub fn noodles2arrow( +pub fn vcf2parquet( input: &mut R, output: &mut W, batch_size: usize, @@ -67,6 +67,63 @@ where Ok(()) } +/// Read `input` vcf and write each row group in a parquet file match with template +pub fn vcf2multiparquet( + input: &mut R, + template: String, + batch_size: usize, + compression: arrow2::io::parquet::write::CompressionOptions, +) -> error::Result<()> +where + R: std::io::BufRead, +{ + // VCF section + let mut reader = noodles::vcf::Reader::new(input); + + let vcf_header: noodles::vcf::Header = reader + .read_header() + .map_err(error::mapping)? + .parse() + .map_err(error::mapping)?; + + // Parquet section + let schema = schema::from_header(&vcf_header)?; + + let chunk_iterator = record2chunk::Record2Chunk::new( + reader.records(&vcf_header), + batch_size, + vcf_header.clone(), + schema.clone(), + ); + + let options = arrow2::io::parquet::write::WriteOptions { + write_statistics: true, + compression, + version: arrow2::io::parquet::write::Version::V2, + }; + + let encodings = chunk_iterator.encodings(); + let row_groups = arrow2::io::parquet::write::RowGroupIterator::try_new( + chunk_iterator, + &schema, + options, + encodings, + )?; + + for (index, group) in row_groups.enumerate() { + let output = std::fs::File::create(template.replace("{}", &index.to_string())) + .map_err(error::mapping)?; + let mut writer = + arrow2::io::parquet::write::FileWriter::try_new(output, schema.clone(), options)?; + + writer.start()?; + writer.write(group?)?; + writer.end(None)?; + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -252,7 +309,7 @@ mod tests { let mut input = std::io::BufReader::new(&*VCF_FILE); let mut output = Vec::new(); - noodles2arrow( + vcf2parquet( &mut input, &mut output, 1, @@ -268,7 +325,7 @@ mod tests { let mut input = std::io::BufReader::new(&raw_data[..]); let mut output = Vec::new(); - let result = noodles2arrow( + let result = vcf2parquet( &mut input, &mut output, 1,