Skip to content

Commit

Permalink
Bin is produce by main crate add vcf2multiparquet function
Browse files Browse the repository at this point in the history
  • Loading branch information
natir committed Jun 28, 2022
1 parent 6f6fdd5 commit 827c3fa
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 13 deletions.
7 changes: 7 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#[cfg(feature = "bin")]
pub use vcf2parquet_bin::*;

#[cfg(feature = "bin")]
fn main() -> vcf2parquet_bin::error::Result<()> {
vcf2parquet_bin::main()
}
6 changes: 0 additions & 6 deletions vcf2parquet-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ name = "vcf2parquet-bin"
version = "0.1.0"
edition = "2021"

default-run = "vcf2parquet"

[dependencies]
vcf2parquet-lib = { path = "../vcf2parquet-lib", version = "0.1.0" }
niffler = { version = "2" }
Expand All @@ -18,7 +16,3 @@ thiserror = { version = "1" }

# cli management
clap = { version = "3", features = ["derive"] }

[[bin]]
name = "vcf2parquet"
path = "src/main.rs"
8 changes: 4 additions & 4 deletions vcf2parquet-bin/src/main.rs → vcf2parquet-bin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use clap::Parser as _;
use vcf2parquet_lib as lib;

/* mod section */
mod cli;
mod error;
pub mod cli;
pub mod error;

fn main() -> error::Result<()> {
pub fn main() -> error::Result<()> {
let params = cli::Command::parse();

let mut reader = std::fs::File::open(params.input())
Expand All @@ -24,7 +24,7 @@ fn main() -> error::Result<()> {

let mut output = std::fs::File::create(params.output()).map_err(error::mapping)?;

lib::noodles2arrow(
lib::vcf2parquet(
&mut reader,
&mut output,
params.batch_size(),
Expand Down
63 changes: 60 additions & 3 deletions vcf2parquet-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub mod record2chunk;
pub mod schema;

/// Read `input` vcf and write parquet in `output`
pub fn noodles2arrow<R, W>(
pub fn vcf2parquet<R, W>(
input: &mut R,
output: &mut W,
batch_size: usize,
Expand Down Expand Up @@ -67,6 +67,63 @@ where
Ok(())
}

/// Read `input` vcf and write each row group in a parquet file match with template
pub fn vcf2multiparquet<R, W>(
input: &mut R,
template: String,
batch_size: usize,
compression: arrow2::io::parquet::write::CompressionOptions,
) -> error::Result<()>
where
R: std::io::BufRead,
{
// VCF section
let mut reader = noodles::vcf::Reader::new(input);

let vcf_header: noodles::vcf::Header = reader
.read_header()
.map_err(error::mapping)?
.parse()
.map_err(error::mapping)?;

// Parquet section
let schema = schema::from_header(&vcf_header)?;

let chunk_iterator = record2chunk::Record2Chunk::new(
reader.records(&vcf_header),
batch_size,
vcf_header.clone(),
schema.clone(),
);

let options = arrow2::io::parquet::write::WriteOptions {
write_statistics: true,
compression,
version: arrow2::io::parquet::write::Version::V2,
};

let encodings = chunk_iterator.encodings();
let row_groups = arrow2::io::parquet::write::RowGroupIterator::try_new(
chunk_iterator,
&schema,
options,
encodings,
)?;

for (index, group) in row_groups.enumerate() {
let output = std::fs::File::create(template.replace("{}", &index.to_string()))
.map_err(error::mapping)?;
let mut writer =
arrow2::io::parquet::write::FileWriter::try_new(output, schema.clone(), options)?;

writer.start()?;
writer.write(group?)?;
writer.end(None)?;
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -252,7 +309,7 @@ mod tests {
let mut input = std::io::BufReader::new(&*VCF_FILE);
let mut output = Vec::new();

noodles2arrow(
vcf2parquet(
&mut input,
&mut output,
1,
Expand All @@ -268,7 +325,7 @@ mod tests {
let mut input = std::io::BufReader::new(&raw_data[..]);
let mut output = Vec::new();

let result = noodles2arrow(
let result = vcf2parquet(
&mut input,
&mut output,
1,
Expand Down

0 comments on commit 827c3fa

Please sign in to comment.