From bd57e0e4e2ccb2cf25304b49f99672bffe74a20d Mon Sep 17 00:00:00 2001 From: hofer Date: Thu, 29 Aug 2024 15:27:39 -0400 Subject: [PATCH 1/5] Get rid of boundary edges in the database --- src/main.rs | 2 + src/models/block_group.rs | 169 ++++++++++++++++++++------------------ src/models/edge.rs | 2 +- src/models/path.rs | 6 +- src/models/sequence.rs | 4 +- 5 files changed, 95 insertions(+), 88 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3c77e61..8049594 100644 --- a/src/main.rs +++ b/src/main.rs @@ -423,6 +423,7 @@ fn main() { sample, }) => { let mut conn = get_connection(db); + conn.execute("PRAGMA cache_size=50000;", []).unwrap(); conn.execute("BEGIN TRANSACTION", []).unwrap(); update_with_vcf( vcf, @@ -431,6 +432,7 @@ fn main() { sample.clone().unwrap_or("".to_string()), &mut conn, ); + conn.execute("END TRANSACTION", []).unwrap(); } None => {} diff --git a/src/models/block_group.rs b/src/models/block_group.rs index bd1fa4c..2c55da2 100644 --- a/src/models/block_group.rs +++ b/src/models/block_group.rs @@ -205,48 +205,89 @@ impl BlockGroup { new_bg_id.id } - pub fn blocks_from_edges(conn: &Connection, edges: Vec) -> Vec { - let mut sequence_hashes = HashSet::new(); - for edge in &edges { - if edge.source_hash != Edge::PATH_START_HASH { - sequence_hashes.insert(edge.source_hash.clone()); + pub fn get_block_boundaries( + source_edges: Option<&Vec<&Edge>>, + target_edges: Option<&Vec<&Edge>>, + sequence_length: i32, + ) -> Vec { + let mut block_boundary_coordinates = HashSet::new(); + if let Some(actual_source_edges) = source_edges { + for source_edge in actual_source_edges { + if source_edge.source_coordinate > 0 + && source_edge.source_coordinate < sequence_length + { + block_boundary_coordinates.insert(source_edge.source_coordinate); + } } - if edge.target_hash != Edge::PATH_END_HASH { - sequence_hashes.insert(edge.target_hash.clone()); + } + if let Some(actual_target_edges) = target_edges { + for target_edge in actual_target_edges { + if target_edge.target_coordinate > 0 + && target_edge.target_coordinate < sequence_length + { + block_boundary_coordinates.insert(target_edge.target_coordinate); + } } } - let mut boundary_edges_by_hash = HashMap::>::new(); + block_boundary_coordinates + .into_iter() + .sorted_by(|c1, c2| Ord::cmp(&c1, &c2)) + .collect::>() + } + + pub fn blocks_from_edges(conn: &Connection, edges: &Vec) -> (Vec, Vec) { + let mut sequence_hashes = HashSet::new(); + let mut edges_by_source_hash: HashMap<&str, Vec<&Edge>> = HashMap::new(); + let mut edges_by_target_hash: HashMap<&str, Vec<&Edge>> = HashMap::new(); for edge in edges { - if (edge.source_hash == edge.target_hash) - && (edge.target_coordinate == edge.source_coordinate) - { - boundary_edges_by_hash - .entry(edge.source_hash.clone()) - .and_modify(|current_edges| current_edges.push(edge.clone())) - .or_insert_with(|| vec![edge.clone()]); + if edge.source_hash != Edge::PATH_START_HASH { + sequence_hashes.insert(edge.source_hash.as_str()); + edges_by_source_hash + .entry(&edge.source_hash) + .and_modify(|edges| edges.push(edge)) + .or_default(); + } + if edge.target_hash != Edge::PATH_END_HASH { + sequence_hashes.insert(edge.target_hash.as_str()); + edges_by_target_hash + .entry(&edge.target_hash) + .and_modify(|edges| edges.push(edge)) + .or_default(); } } let sequences_by_hash = - Sequence::sequences_by_hash(conn, sequence_hashes.into_iter().collect::>()); + Sequence::sequences_by_hash(conn, sequence_hashes.into_iter().collect::>()); let mut blocks = vec![]; - let mut block_index = 0; + let mut boundary_edges = vec![]; for (hash, sequence) in sequences_by_hash.into_iter() { - let sequence_edges = boundary_edges_by_hash.get(&hash); - if sequence_edges.is_some() { - let sorted_sequence_edges: Vec = sequence_edges - .unwrap() - .iter() - .sorted_by(|edge1, edge2| { - Ord::cmp(&edge1.source_coordinate, &edge2.source_coordinate) - }) - .cloned() - .collect(); - let first_edge = sorted_sequence_edges[0].clone(); + let block_boundaries = BlockGroup::get_block_boundaries( + edges_by_source_hash.get(hash.as_str()), + edges_by_target_hash.get(hash.as_str()), + sequence.length, + ); + for block_boundary in &block_boundaries { + // NOTE: Most of this data is bogus, the Edge struct is just a convenient wrapper + // for the data we need to set up boundary edges in the block group graph + boundary_edges.push(Edge { + id: -1, + source_hash: hash.clone(), + source_coordinate: *block_boundary, + source_strand: String::from(""), + target_hash: hash.clone(), + target_coordinate: *block_boundary, + target_strand: String::from(""), + chromosome_index: 0, + phased: 0, + }); + } + + #[allow(clippy::len_zero)] + if block_boundaries.len() > 0 { let start = 0; - let end = first_edge.source_coordinate; + let end = block_boundaries[0]; let block_sequence = sequence.get_sequence(start, end).to_string(); let first_block = GroupBlock { id: block_index, @@ -257,9 +298,7 @@ impl BlockGroup { }; blocks.push(first_block); block_index += 1; - for (into, out_of) in sorted_sequence_edges.clone().into_iter().tuple_windows() { - let start = into.target_coordinate; - let end = out_of.source_coordinate; + for (start, end) in block_boundaries.clone().into_iter().tuple_windows() { let block_sequence = sequence.get_sequence(start, end).to_string(); let block = GroupBlock { id: block_index, @@ -271,8 +310,7 @@ impl BlockGroup { blocks.push(block); block_index += 1; } - let last_edge = &sorted_sequence_edges[sorted_sequence_edges.len() - 1]; - let start = last_edge.target_coordinate; + let start = block_boundaries[block_boundaries.len() - 1]; let end = sequence.length; let block_sequence = sequence.get_sequence(start, end).to_string(); let last_block = GroupBlock { @@ -295,12 +333,13 @@ impl BlockGroup { block_index += 1; } } - blocks + (blocks, boundary_edges) } pub fn get_all_sequences(conn: &Connection, block_group_id: i32) -> HashSet { - let edges = BlockGroupEdge::edges_for_block_group(conn, block_group_id); - let blocks = BlockGroup::blocks_from_edges(conn, edges.clone()); + let mut edges = BlockGroupEdge::edges_for_block_group(conn, block_group_id); + let (blocks, boundary_edges) = BlockGroup::blocks_from_edges(conn, &edges); + edges.extend(boundary_edges.clone()); let blocks_by_start = blocks .clone() @@ -491,40 +530,6 @@ impl BlockGroup { new_edges.push(new_end_edge); } - // NOTE: Add edges marking the existing part of the sequence that is being substituted out, - // so we can retrieve it as one node of the overall graph - if change.start < start_block.path_end { - let split_coordinate = - change.start - start_block.path_start + start_block.sequence_start; - let new_split_start_edge = EdgeData { - source_hash: start_block.sequence.hash.clone(), - source_coordinate: split_coordinate, - source_strand: "+".to_string(), - target_hash: start_block.sequence.hash.clone(), - target_coordinate: split_coordinate, - target_strand: "+".to_string(), - chromosome_index: change.chromosome_index, - phased: change.phased, - }; - new_edges.push(new_split_start_edge); - } - - if change.end > end_block.path_start { - let split_coordinate = change.end - end_block.path_start + end_block.sequence_start; - let new_split_end_edge = EdgeData { - source_hash: end_block.sequence.hash.clone(), - source_coordinate: split_coordinate, - source_strand: "+".to_string(), - target_hash: end_block.sequence.hash.clone(), - target_coordinate: split_coordinate, - target_strand: "+".to_string(), - chromosome_index: change.chromosome_index, - phased: change.phased, - }; - - new_edges.push(new_split_end_edge); - } - new_edges } } @@ -661,7 +666,7 @@ mod tests { } #[test] - fn insert_and_deletion_new_get_all() { + fn insert_and_deletion_get_all() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -742,7 +747,7 @@ mod tests { } #[test] - fn simple_insert_new_get_all() { + fn simple_insert_get_all() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -783,7 +788,7 @@ mod tests { } #[test] - fn insert_on_block_boundary_middle_new() { + fn insert_on_block_boundary_middle() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -824,7 +829,7 @@ mod tests { } #[test] - fn insert_within_block_new() { + fn insert_within_block() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -865,7 +870,7 @@ mod tests { } #[test] - fn insert_on_block_boundary_start_new() { + fn insert_on_block_boundary_start() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -906,7 +911,7 @@ mod tests { } #[test] - fn insert_on_block_boundary_end_new() { + fn insert_on_block_boundary_end() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -947,7 +952,7 @@ mod tests { } #[test] - fn insert_across_entire_block_boundary_new() { + fn insert_across_entire_block_boundary() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -988,7 +993,7 @@ mod tests { } #[test] - fn insert_across_two_blocks_new() { + fn insert_across_two_blocks() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -1029,7 +1034,7 @@ mod tests { } #[test] - fn insert_spanning_blocks_new() { + fn insert_spanning_blocks() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() @@ -1070,7 +1075,7 @@ mod tests { } #[test] - fn simple_deletion_new() { + fn simple_deletion() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let deletion_sequence_hash = Sequence::new() @@ -1114,7 +1119,7 @@ mod tests { } #[test] - fn doesnt_apply_same_insert_twice_new() { + fn doesnt_apply_same_insert_twice() { let conn = get_connection(); let (block_group_id, path) = setup_block_group(&conn); let insert_sequence_hash = Sequence::new() diff --git a/src/models/edge.rs b/src/models/edge.rs index 98b8245..850ff04 100644 --- a/src/models/edge.rs +++ b/src/models/edge.rs @@ -200,7 +200,7 @@ impl Edge { let formatted_edge_rows_to_insert = edge_rows_to_insert.join(", "); - let insert_statement = format!("INSERT INTO edges (source_hash, source_coordinate, source_strand, target_hash, target_coordinate, target_strand, chromosome_index, phased) VALUES {0} RETURNING (id);", formatted_edge_rows_to_insert); + let insert_statement = format!("INSERT OR IGNORE INTO edges (source_hash, source_coordinate, source_strand, target_hash, target_coordinate, target_strand, chromosome_index, phased) VALUES {0} RETURNING (id);", formatted_edge_rows_to_insert); let mut stmt = conn.prepare(&insert_statement).unwrap(); let rows = stmt.query_map([], |row| row.get(0)).unwrap(); let mut edge_ids: Vec = vec![]; diff --git a/src/models/path.rs b/src/models/path.rs index 3630959..53c6b5b 100644 --- a/src/models/path.rs +++ b/src/models/path.rs @@ -183,14 +183,14 @@ impl Path { let mut sequence_hashes = HashSet::new(); for edge in &edges { if edge.source_hash != Edge::PATH_START_HASH { - sequence_hashes.insert(edge.source_hash.clone()); + sequence_hashes.insert(edge.source_hash.as_str()); } if edge.target_hash != Edge::PATH_END_HASH { - sequence_hashes.insert(edge.target_hash.clone()); + sequence_hashes.insert(edge.target_hash.as_str()); } } let sequences_by_hash = - Sequence::sequences_by_hash(conn, sequence_hashes.into_iter().collect()); + Sequence::sequences_by_hash(conn, sequence_hashes.into_iter().collect::>()); let mut blocks = vec![]; let mut path_length = 0; diff --git a/src/models/sequence.rs b/src/models/sequence.rs index 626bfa4..cad1475 100644 --- a/src/models/sequence.rs +++ b/src/models/sequence.rs @@ -259,7 +259,7 @@ impl Sequence { objs } - pub fn sequences_by_hash(conn: &Connection, hashes: Vec) -> HashMap { + pub fn sequences_by_hash(conn: &Connection, hashes: Vec<&str>) -> HashMap { let joined_hashes = &hashes .into_iter() .map(|hash| format!("\"{}\"", hash)) @@ -277,7 +277,7 @@ impl Sequence { } pub fn sequence_from_hash(conn: &Connection, hash: &str) -> Option { - let sequences_by_hash = Sequence::sequences_by_hash(conn, vec![hash.to_string()]); + let sequences_by_hash = Sequence::sequences_by_hash(conn, vec![hash]); sequences_by_hash.get(hash).cloned() } } From ff3ef5942de3fe8111c3547dfdb826d4f62b28d7 Mon Sep 17 00:00:00 2001 From: hofer Date: Thu, 29 Aug 2024 15:41:19 -0400 Subject: [PATCH 2/5] Batch up bulk edge inserts --- src/models/edge.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/models/edge.rs b/src/models/edge.rs index 850ff04..b92ab0f 100644 --- a/src/models/edge.rs +++ b/src/models/edge.rs @@ -198,17 +198,20 @@ impl Edge { return existing_edge_ids; } - let formatted_edge_rows_to_insert = edge_rows_to_insert.join(", "); + for chunk in edge_rows_to_insert.chunks(100000) { + let formatted_edge_rows_to_insert = chunk.join(", "); - let insert_statement = format!("INSERT OR IGNORE INTO edges (source_hash, source_coordinate, source_strand, target_hash, target_coordinate, target_strand, chromosome_index, phased) VALUES {0} RETURNING (id);", formatted_edge_rows_to_insert); - let mut stmt = conn.prepare(&insert_statement).unwrap(); - let rows = stmt.query_map([], |row| row.get(0)).unwrap(); - let mut edge_ids: Vec = vec![]; - for row in rows { - edge_ids.push(row.unwrap()); + let insert_statement = format!("INSERT OR IGNORE INTO edges (source_hash, source_coordinate, source_strand, target_hash, target_coordinate, target_strand, chromosome_index, phased) VALUES {0} RETURNING (id);", formatted_edge_rows_to_insert); + let mut stmt = conn.prepare(&insert_statement).unwrap(); + let rows = stmt.query_map([], |row| row.get(0)).unwrap(); + let mut edge_ids: Vec = vec![]; + for row in rows { + edge_ids.push(row.unwrap()); + } + + existing_edge_ids.extend(edge_ids); } - existing_edge_ids.extend(edge_ids); existing_edge_ids } From 08e71c91a07d906eebb2df83fe0ed27071dd69cb Mon Sep 17 00:00:00 2001 From: hofer Date: Thu, 29 Aug 2024 15:44:25 -0400 Subject: [PATCH 3/5] Also batch block group edge inserts --- src/models/block_group_edge.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/models/block_group_edge.rs b/src/models/block_group_edge.rs index 3daf125..8a2fd7d 100644 --- a/src/models/block_group_edge.rs +++ b/src/models/block_group_edge.rs @@ -11,18 +11,21 @@ pub struct BlockGroupEdge { impl BlockGroupEdge { pub fn bulk_create(conn: &Connection, block_group_id: i32, edge_ids: Vec) { - let mut rows_to_insert = vec![]; - for edge_id in edge_ids { - let row = format!("({0}, {1})", block_group_id, edge_id); - rows_to_insert.push(row); - } - let formatted_rows_to_insert = rows_to_insert.join(", "); + for chunk in edge_ids.chunks(100000) { + let mut rows_to_insert = vec![]; + for edge_id in chunk { + let row = format!("({0}, {1})", block_group_id, edge_id); + rows_to_insert.push(row); + } - let insert_statement = format!( - "INSERT OR IGNORE INTO block_group_edges (block_group_id, edge_id) VALUES {0};", - formatted_rows_to_insert - ); - let _ = conn.execute(&insert_statement, ()); + let formatted_rows_to_insert = rows_to_insert.join(", "); + + let insert_statement = format!( + "INSERT OR IGNORE INTO block_group_edges (block_group_id, edge_id) VALUES {0};", + formatted_rows_to_insert + ); + let _ = conn.execute(&insert_statement, ()); + } } pub fn edges_for_block_group(conn: &Connection, block_group_id: i32) -> Vec { From 0853b49919371dd18214215a44ebb7fdad675308 Mon Sep 17 00:00:00 2001 From: hofer Date: Thu, 29 Aug 2024 15:57:33 -0400 Subject: [PATCH 4/5] INSERT OR IGNORE -> INSERT --- src/models/edge.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/models/edge.rs b/src/models/edge.rs index b92ab0f..33d9be2 100644 --- a/src/models/edge.rs +++ b/src/models/edge.rs @@ -201,7 +201,7 @@ impl Edge { for chunk in edge_rows_to_insert.chunks(100000) { let formatted_edge_rows_to_insert = chunk.join(", "); - let insert_statement = format!("INSERT OR IGNORE INTO edges (source_hash, source_coordinate, source_strand, target_hash, target_coordinate, target_strand, chromosome_index, phased) VALUES {0} RETURNING (id);", formatted_edge_rows_to_insert); + let insert_statement = format!("INSERT INTO edges (source_hash, source_coordinate, source_strand, target_hash, target_coordinate, target_strand, chromosome_index, phased) VALUES {0} RETURNING (id);", formatted_edge_rows_to_insert); let mut stmt = conn.prepare(&insert_statement).unwrap(); let rows = stmt.query_map([], |row| row.get(0)).unwrap(); let mut edge_ids: Vec = vec![]; From e89724dae3e3a3ed7bd6e48a34da114c6ee551b5 Mon Sep 17 00:00:00 2001 From: hofer Date: Wed, 4 Sep 2024 11:35:10 -0400 Subject: [PATCH 5/5] Use is_empty correctly --- src/models/block_group.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/models/block_group.rs b/src/models/block_group.rs index 2c55da2..bbc04d4 100644 --- a/src/models/block_group.rs +++ b/src/models/block_group.rs @@ -284,8 +284,7 @@ impl BlockGroup { }); } - #[allow(clippy::len_zero)] - if block_boundaries.len() > 0 { + if !block_boundaries.is_empty() { let start = 0; let end = block_boundaries[0]; let block_sequence = sequence.get_sequence(start, end).to_string();