Skip to content

Commit

Permalink
Merge pull request #23 from ginkgobioworks/batch-insert-changes
Browse files Browse the repository at this point in the history
Cache block group, path, and interval tree for path, and batch inserts
  • Loading branch information
dkhofer authored Aug 28, 2024
2 parents 39060ba + 616193d commit 0ebe206
Show file tree
Hide file tree
Showing 3 changed files with 414 additions and 156 deletions.
192 changes: 119 additions & 73 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(warnings)]
use clap::{Parser, Subcommand};
use intervaltree::IntervalTree;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::path::PathBuf;
Expand All @@ -8,10 +9,10 @@ use std::{io, str};
use gen::migrations::run_migrations;
use gen::models::{
self,
block_group::BlockGroup,
block_group::{BlockGroup, BlockGroupData, PathCache, PathChange},
block_group_edge::BlockGroupEdge,
edge::Edge,
path::{NewBlock, Path},
path::{NewBlock, Path, PathData},
path_edge::PathEdge,
sequence::{NewSequence, Sequence},
};
Expand Down Expand Up @@ -135,9 +136,90 @@ fn import_fasta(fasta: &String, name: &str, shallow: bool, conn: &mut Connection
}
}

#[derive(Debug)]
struct BlockGroupCache<'a> {
pub cache: HashMap<BlockGroupData<'a>, i32>,
pub conn: &'a Connection,
}

impl<'a> BlockGroupCache<'_> {
pub fn new(conn: &Connection) -> BlockGroupCache {
BlockGroupCache {
cache: HashMap::<BlockGroupData, i32>::new(),
conn,
}
}

pub fn lookup(
block_group_cache: &mut BlockGroupCache<'a>,
collection_name: &'a str,
sample_name: &'a str,
name: String,
) -> i32 {
let block_group_key = BlockGroupData {
collection_name,
sample_name: Some(sample_name),
name: name.clone(),
};
let block_group_lookup = block_group_cache.cache.get(&block_group_key);
if let Some(block_group_id) = block_group_lookup {
*block_group_id
} else {
let new_block_group_id = BlockGroup::get_or_create_sample_block_group(
block_group_cache.conn,
collection_name,
sample_name,
name.clone(),
);
block_group_cache
.cache
.insert(block_group_key, new_block_group_id);
new_block_group_id
}
}
}

#[allow(clippy::too_many_arguments)]
fn prepare_change(
conn: &Connection,
sample_bg_id: i32,
sample_path: &Path,
alt_seq: &str,
ref_start: i32,
ref_end: i32,
chromosome_index: i32,
phased: i32,
) -> PathChange {
// TODO: new sequence may not be real and be <DEL> or some sort. Handle these.
let new_sequence_hash = Sequence::new()
.sequence_type("DNA")
.sequence(alt_seq)
.save(conn);
let sequence = Sequence::sequence_from_hash(conn, &new_sequence_hash).unwrap();
let new_block = NewBlock {
id: 0,
sequence: sequence.clone(),
block_sequence: alt_seq.to_string(),
sequence_start: 0,
sequence_end: alt_seq.len() as i32,
path_start: ref_start,
path_end: ref_end,
strand: "+".to_string(),
};
PathChange {
block_group_id: sample_bg_id,
path: sample_path.clone(),
start: ref_start,
end: ref_end,
block: new_block,
chromosome_index,
phased,
}
}

fn update_with_vcf(
vcf_path: &String,
collection_name: &String,
collection_name: &str,
fixed_genotype: String,
fixed_sample: String,
conn: &mut Connection,
Expand All @@ -160,9 +242,15 @@ fn update_with_vcf(
genotype = parse_genotype(&fixed_genotype);
}

// Cache a bunch of data ahead of making changes
let mut block_group_cache = BlockGroupCache::new(conn);
let mut path_cache = PathCache::new(conn);

let mut changes: Vec<PathChange> = vec![];

for result in reader.records() {
let record = result.unwrap();
let seq_name = record.reference_sequence_name().to_string();
let seq_name: String = record.reference_sequence_name().to_string();
let ref_allele = record.reference_bases();
// this converts the coordinates to be zero based, start inclusive, end exclusive
let ref_start = record.variant_start().unwrap().unwrap().get() - 1;
Expand All @@ -171,6 +259,14 @@ fn update_with_vcf(
let alt_alleles: Vec<_> = alt_bases.iter().collect::<io::Result<_>>().unwrap();
// TODO: fix this duplication of handling an insert
if !fixed_sample.is_empty() && !genotype.is_empty() {
let sample_bg_id = BlockGroupCache::lookup(
&mut block_group_cache,
collection_name,
&fixed_sample,
seq_name.clone(),
);
let sample_path = PathCache::lookup(&mut path_cache, sample_bg_id, seq_name.clone());

for (chromosome_index, genotype) in genotype.iter().enumerate() {
if let Some(gt) = genotype {
if gt.allele != 0 {
Expand All @@ -179,52 +275,31 @@ fn update_with_vcf(
Phasing::Phased => 1,
Phasing::Unphased => 0,
};
// TODO: new sequence may not be real and be <DEL> or some sort. Handle these.
let new_sequence_hash = Sequence::new()
.sequence_type("DNA")
.sequence(alt_seq)
.save(conn);
let sequence =
Sequence::sequence_from_hash(conn, &new_sequence_hash).unwrap();
let sample_bg_id = BlockGroup::get_or_create_sample_block_group(
conn,
collection_name,
&fixed_sample,
&seq_name,
);
let sample_paths = Path::get_paths(
conn,
"select * from path where block_group_id = ?1 AND name = ?2",
vec![
SQLValue::from(sample_bg_id),
SQLValue::from(seq_name.clone()),
],
);
let new_block = NewBlock {
id: 0,
sequence: sequence.clone(),
block_sequence: alt_seq.to_string(),
sequence_start: 0,
sequence_end: alt_seq.len() as i32,
path_start: ref_start as i32,
path_end: ref_end as i32,
strand: "+".to_string(),
};
BlockGroup::insert_change(
let change = prepare_change(
conn,
sample_bg_id,
&sample_paths[0],
&sample_path,
alt_seq,
ref_start as i32,
ref_end as i32,
&new_block,
chromosome_index as i32,
phased,
);
changes.push(change);
}
}
}
} else {
for (sample_index, sample) in record.samples().iter().enumerate() {
let sample_bg_id = BlockGroupCache::lookup(
&mut block_group_cache,
collection_name,
&sample_names[sample_index],
seq_name.clone(),
);
let sample_path =
PathCache::lookup(&mut path_cache, sample_bg_id, seq_name.clone());

let genotype = sample.get(&header, "GT");
if genotype.is_some() {
if let Value::Genotype(genotypes) = genotype.unwrap().unwrap().unwrap() {
Expand All @@ -238,48 +313,17 @@ fn update_with_vcf(
let allele = allele.unwrap();
if allele != 0 {
let alt_seq = alt_alleles[allele - 1];
// TODO: new sequence may not be real and be <DEL> or some sort. Handle these.
let new_sequence_hash = Sequence::new()
.sequence_type("DNA")
.sequence(alt_seq)
.save(conn);
let sequence =
Sequence::sequence_from_hash(conn, &new_sequence_hash)
.unwrap();
let sample_bg_id = BlockGroup::get_or_create_sample_block_group(
conn,
collection_name,
&sample_names[sample_index],
&seq_name,
);
let sample_paths = Path::get_paths(
conn,
"select * from path where block_group_id = ?1 AND name = ?2",
vec![
SQLValue::from(sample_bg_id),
SQLValue::from(seq_name.clone()),
],
);
let new_block = NewBlock {
id: 0,
sequence: sequence.clone(),
block_sequence: alt_seq.to_string(),
sequence_start: 0,
sequence_end: alt_seq.len() as i32,
path_start: ref_start as i32,
path_end: ref_end as i32,
strand: "+".to_string(),
};
BlockGroup::insert_change(
let change = prepare_change(
conn,
sample_bg_id,
&sample_paths[0],
&sample_path,
alt_seq,
ref_start as i32,
ref_end as i32,
&new_block,
chromosome_index as i32,
phased,
);
changes.push(change);
}
}
}
Expand All @@ -288,6 +332,8 @@ fn update_with_vcf(
}
}
}

BlockGroup::insert_changes(conn, &changes, &path_cache);
}

fn main() {
Expand Down
Loading

0 comments on commit 0ebe206

Please sign in to comment.