Skip to content

Commit

Permalink
Merge pull request #28 from ginkgobioworks/remove-boundary-edges
Browse files Browse the repository at this point in the history
Get rid of boundary edges in the database
  • Loading branch information
dkhofer authored Sep 4, 2024
2 parents 61b1ec0 + e89724d commit 0e34168
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 106 deletions.
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ fn main() {
sample,
}) => {
let mut conn = get_connection(db);
conn.execute("PRAGMA cache_size=50000;", []).unwrap();
conn.execute("BEGIN TRANSACTION", []).unwrap();
update_with_vcf(
vcf,
Expand All @@ -431,6 +432,7 @@ fn main() {
sample.clone().unwrap_or("".to_string()),
&mut conn,
);

conn.execute("END TRANSACTION", []).unwrap();
}
None => {}
Expand Down
168 changes: 86 additions & 82 deletions src/models/block_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,48 +205,88 @@ impl BlockGroup {
new_bg_id.id
}

pub fn blocks_from_edges(conn: &Connection, edges: Vec<Edge>) -> Vec<GroupBlock> {
let mut sequence_hashes = HashSet::new();
for edge in &edges {
if edge.source_hash != Edge::PATH_START_HASH {
sequence_hashes.insert(edge.source_hash.clone());
pub fn get_block_boundaries(
source_edges: Option<&Vec<&Edge>>,
target_edges: Option<&Vec<&Edge>>,
sequence_length: i32,
) -> Vec<i32> {
let mut block_boundary_coordinates = HashSet::new();
if let Some(actual_source_edges) = source_edges {
for source_edge in actual_source_edges {
if source_edge.source_coordinate > 0
&& source_edge.source_coordinate < sequence_length
{
block_boundary_coordinates.insert(source_edge.source_coordinate);
}
}
if edge.target_hash != Edge::PATH_END_HASH {
sequence_hashes.insert(edge.target_hash.clone());
}
if let Some(actual_target_edges) = target_edges {
for target_edge in actual_target_edges {
if target_edge.target_coordinate > 0
&& target_edge.target_coordinate < sequence_length
{
block_boundary_coordinates.insert(target_edge.target_coordinate);
}
}
}

let mut boundary_edges_by_hash = HashMap::<String, Vec<Edge>>::new();
block_boundary_coordinates
.into_iter()
.sorted_by(|c1, c2| Ord::cmp(&c1, &c2))
.collect::<Vec<i32>>()
}

pub fn blocks_from_edges(conn: &Connection, edges: &Vec<Edge>) -> (Vec<GroupBlock>, Vec<Edge>) {
let mut sequence_hashes = HashSet::new();
let mut edges_by_source_hash: HashMap<&str, Vec<&Edge>> = HashMap::new();
let mut edges_by_target_hash: HashMap<&str, Vec<&Edge>> = HashMap::new();
for edge in edges {
if (edge.source_hash == edge.target_hash)
&& (edge.target_coordinate == edge.source_coordinate)
{
boundary_edges_by_hash
.entry(edge.source_hash.clone())
.and_modify(|current_edges| current_edges.push(edge.clone()))
.or_insert_with(|| vec![edge.clone()]);
if edge.source_hash != Edge::PATH_START_HASH {
sequence_hashes.insert(edge.source_hash.as_str());
edges_by_source_hash
.entry(&edge.source_hash)
.and_modify(|edges| edges.push(edge))
.or_default();
}
if edge.target_hash != Edge::PATH_END_HASH {
sequence_hashes.insert(edge.target_hash.as_str());
edges_by_target_hash
.entry(&edge.target_hash)
.and_modify(|edges| edges.push(edge))
.or_default();
}
}

let sequences_by_hash =
Sequence::sequences_by_hash(conn, sequence_hashes.into_iter().collect::<Vec<String>>());
Sequence::sequences_by_hash(conn, sequence_hashes.into_iter().collect::<Vec<&str>>());
let mut blocks = vec![];

let mut block_index = 0;
let mut boundary_edges = vec![];
for (hash, sequence) in sequences_by_hash.into_iter() {
let sequence_edges = boundary_edges_by_hash.get(&hash);
if sequence_edges.is_some() {
let sorted_sequence_edges: Vec<Edge> = sequence_edges
.unwrap()
.iter()
.sorted_by(|edge1, edge2| {
Ord::cmp(&edge1.source_coordinate, &edge2.source_coordinate)
})
.cloned()
.collect();
let first_edge = sorted_sequence_edges[0].clone();
let block_boundaries = BlockGroup::get_block_boundaries(
edges_by_source_hash.get(hash.as_str()),
edges_by_target_hash.get(hash.as_str()),
sequence.length,
);
for block_boundary in &block_boundaries {
// NOTE: Most of this data is bogus, the Edge struct is just a convenient wrapper
// for the data we need to set up boundary edges in the block group graph
boundary_edges.push(Edge {
id: -1,
source_hash: hash.clone(),
source_coordinate: *block_boundary,
source_strand: String::from(""),
target_hash: hash.clone(),
target_coordinate: *block_boundary,
target_strand: String::from(""),
chromosome_index: 0,
phased: 0,
});
}

if !block_boundaries.is_empty() {
let start = 0;
let end = first_edge.source_coordinate;
let end = block_boundaries[0];
let block_sequence = sequence.get_sequence(start, end).to_string();
let first_block = GroupBlock {
id: block_index,
Expand All @@ -257,9 +297,7 @@ impl BlockGroup {
};
blocks.push(first_block);
block_index += 1;
for (into, out_of) in sorted_sequence_edges.clone().into_iter().tuple_windows() {
let start = into.target_coordinate;
let end = out_of.source_coordinate;
for (start, end) in block_boundaries.clone().into_iter().tuple_windows() {
let block_sequence = sequence.get_sequence(start, end).to_string();
let block = GroupBlock {
id: block_index,
Expand All @@ -271,8 +309,7 @@ impl BlockGroup {
blocks.push(block);
block_index += 1;
}
let last_edge = &sorted_sequence_edges[sorted_sequence_edges.len() - 1];
let start = last_edge.target_coordinate;
let start = block_boundaries[block_boundaries.len() - 1];
let end = sequence.length;
let block_sequence = sequence.get_sequence(start, end).to_string();
let last_block = GroupBlock {
Expand All @@ -295,12 +332,13 @@ impl BlockGroup {
block_index += 1;
}
}
blocks
(blocks, boundary_edges)
}

pub fn get_all_sequences(conn: &Connection, block_group_id: i32) -> HashSet<String> {
let edges = BlockGroupEdge::edges_for_block_group(conn, block_group_id);
let blocks = BlockGroup::blocks_from_edges(conn, edges.clone());
let mut edges = BlockGroupEdge::edges_for_block_group(conn, block_group_id);
let (blocks, boundary_edges) = BlockGroup::blocks_from_edges(conn, &edges);
edges.extend(boundary_edges.clone());

let blocks_by_start = blocks
.clone()
Expand Down Expand Up @@ -491,40 +529,6 @@ impl BlockGroup {
new_edges.push(new_end_edge);
}

// NOTE: Add edges marking the existing part of the sequence that is being substituted out,
// so we can retrieve it as one node of the overall graph
if change.start < start_block.path_end {
let split_coordinate =
change.start - start_block.path_start + start_block.sequence_start;
let new_split_start_edge = EdgeData {
source_hash: start_block.sequence.hash.clone(),
source_coordinate: split_coordinate,
source_strand: "+".to_string(),
target_hash: start_block.sequence.hash.clone(),
target_coordinate: split_coordinate,
target_strand: "+".to_string(),
chromosome_index: change.chromosome_index,
phased: change.phased,
};
new_edges.push(new_split_start_edge);
}

if change.end > end_block.path_start {
let split_coordinate = change.end - end_block.path_start + end_block.sequence_start;
let new_split_end_edge = EdgeData {
source_hash: end_block.sequence.hash.clone(),
source_coordinate: split_coordinate,
source_strand: "+".to_string(),
target_hash: end_block.sequence.hash.clone(),
target_coordinate: split_coordinate,
target_strand: "+".to_string(),
chromosome_index: change.chromosome_index,
phased: change.phased,
};

new_edges.push(new_split_end_edge);
}

new_edges
}
}
Expand Down Expand Up @@ -661,7 +665,7 @@ mod tests {
}

#[test]
fn insert_and_deletion_new_get_all() {
fn insert_and_deletion_get_all() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -742,7 +746,7 @@ mod tests {
}

#[test]
fn simple_insert_new_get_all() {
fn simple_insert_get_all() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -783,7 +787,7 @@ mod tests {
}

#[test]
fn insert_on_block_boundary_middle_new() {
fn insert_on_block_boundary_middle() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -824,7 +828,7 @@ mod tests {
}

#[test]
fn insert_within_block_new() {
fn insert_within_block() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -865,7 +869,7 @@ mod tests {
}

#[test]
fn insert_on_block_boundary_start_new() {
fn insert_on_block_boundary_start() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -906,7 +910,7 @@ mod tests {
}

#[test]
fn insert_on_block_boundary_end_new() {
fn insert_on_block_boundary_end() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -947,7 +951,7 @@ mod tests {
}

#[test]
fn insert_across_entire_block_boundary_new() {
fn insert_across_entire_block_boundary() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -988,7 +992,7 @@ mod tests {
}

#[test]
fn insert_across_two_blocks_new() {
fn insert_across_two_blocks() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -1029,7 +1033,7 @@ mod tests {
}

#[test]
fn insert_spanning_blocks_new() {
fn insert_spanning_blocks() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -1070,7 +1074,7 @@ mod tests {
}

#[test]
fn simple_deletion_new() {
fn simple_deletion() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let deletion_sequence_hash = Sequence::new()
Expand Down Expand Up @@ -1114,7 +1118,7 @@ mod tests {
}

#[test]
fn doesnt_apply_same_insert_twice_new() {
fn doesnt_apply_same_insert_twice() {
let conn = get_connection();
let (block_group_id, path) = setup_block_group(&conn);
let insert_sequence_hash = Sequence::new()
Expand Down
25 changes: 14 additions & 11 deletions src/models/block_group_edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@ pub struct BlockGroupEdge {

impl BlockGroupEdge {
pub fn bulk_create(conn: &Connection, block_group_id: i32, edge_ids: Vec<i32>) {
let mut rows_to_insert = vec![];
for edge_id in edge_ids {
let row = format!("({0}, {1})", block_group_id, edge_id);
rows_to_insert.push(row);
}
let formatted_rows_to_insert = rows_to_insert.join(", ");
for chunk in edge_ids.chunks(100000) {
let mut rows_to_insert = vec![];
for edge_id in chunk {
let row = format!("({0}, {1})", block_group_id, edge_id);
rows_to_insert.push(row);
}

let insert_statement = format!(
"INSERT OR IGNORE INTO block_group_edges (block_group_id, edge_id) VALUES {0};",
formatted_rows_to_insert
);
let _ = conn.execute(&insert_statement, ());
let formatted_rows_to_insert = rows_to_insert.join(", ");

let insert_statement = format!(
"INSERT OR IGNORE INTO block_group_edges (block_group_id, edge_id) VALUES {0};",
formatted_rows_to_insert
);
let _ = conn.execute(&insert_statement, ());
}
}

pub fn edges_for_block_group(conn: &Connection, block_group_id: i32) -> Vec<Edge> {
Expand Down
19 changes: 11 additions & 8 deletions src/models/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,20 @@ impl Edge {
return existing_edge_ids;
}

let formatted_edge_rows_to_insert = edge_rows_to_insert.join(", ");
for chunk in edge_rows_to_insert.chunks(100000) {
let formatted_edge_rows_to_insert = chunk.join(", ");

let insert_statement = format!("INSERT INTO edges (source_hash, source_coordinate, source_strand, target_hash, target_coordinate, target_strand, chromosome_index, phased) VALUES {0} RETURNING (id);", formatted_edge_rows_to_insert);
let mut stmt = conn.prepare(&insert_statement).unwrap();
let rows = stmt.query_map([], |row| row.get(0)).unwrap();
let mut edge_ids: Vec<i32> = vec![];
for row in rows {
edge_ids.push(row.unwrap());
let insert_statement = format!("INSERT INTO edges (source_hash, source_coordinate, source_strand, target_hash, target_coordinate, target_strand, chromosome_index, phased) VALUES {0} RETURNING (id);", formatted_edge_rows_to_insert);
let mut stmt = conn.prepare(&insert_statement).unwrap();
let rows = stmt.query_map([], |row| row.get(0)).unwrap();
let mut edge_ids: Vec<i32> = vec![];
for row in rows {
edge_ids.push(row.unwrap());
}

existing_edge_ids.extend(edge_ids);
}

existing_edge_ids.extend(edge_ids);
existing_edge_ids
}

Expand Down
6 changes: 3 additions & 3 deletions src/models/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ impl Path {
let mut sequence_hashes = HashSet::new();
for edge in &edges {
if edge.source_hash != Edge::PATH_START_HASH {
sequence_hashes.insert(edge.source_hash.clone());
sequence_hashes.insert(edge.source_hash.as_str());
}
if edge.target_hash != Edge::PATH_END_HASH {
sequence_hashes.insert(edge.target_hash.clone());
sequence_hashes.insert(edge.target_hash.as_str());
}
}
let sequences_by_hash =
Sequence::sequences_by_hash(conn, sequence_hashes.into_iter().collect());
Sequence::sequences_by_hash(conn, sequence_hashes.into_iter().collect::<Vec<&str>>());

let mut blocks = vec![];
let mut path_length = 0;
Expand Down
Loading

0 comments on commit 0e34168

Please sign in to comment.