From 1febd006e0438a299f4954d30b8ed8de078f32f9 Mon Sep 17 00:00:00 2001 From: kpandl <16907747+kpandl@users.noreply.github.com> Date: Mon, 2 Sep 2024 18:00:42 +0200 Subject: [PATCH 1/6] use tx cache minimize diffs fix test code fix fmt don't update cache cache access speedup --- Cargo.lock | 1 + node/bft/storage-service/Cargo.toml | 3 + node/bft/storage-service/src/persistent.rs | 107 +++++++++++++++------ 3 files changed, 80 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 500ec0acf3..24bfba4ed1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3257,6 +3257,7 @@ version = "2.2.7" dependencies = [ "aleo-std", "indexmap 2.2.6", + "lru", "parking_lot", "snarkvm", "tracing", diff --git a/node/bft/storage-service/Cargo.toml b/node/bft/storage-service/Cargo.toml index 73098dd1cd..04d4c0fa04 100644 --- a/node/bft/storage-service/Cargo.toml +++ b/node/bft/storage-service/Cargo.toml @@ -29,6 +29,9 @@ workspace = true version = "2.1" features = [ "serde", "rayon" ] +[dependencies.lru] +version = "0.12.1" + [dependencies.parking_lot] version = "0.12" optional = true diff --git a/node/bft/storage-service/src/persistent.rs b/node/bft/storage-service/src/persistent.rs index 96a9d0c8e5..fcff4257a4 100644 --- a/node/bft/storage-service/src/persistent.rs +++ b/node/bft/storage-service/src/persistent.rs @@ -15,6 +15,7 @@ use crate::StorageService; use snarkvm::{ ledger::{ + committee::Committee, narwhal::{BatchHeader, Transmission, TransmissionID}, store::{ cow_to_cloned, @@ -33,9 +34,12 @@ use snarkvm::{ use aleo_std::StorageMode; use indexmap::{indexset, IndexSet}; +use lru::LruCache; +use parking_lot::RwLock; use std::{ borrow::Cow, collections::{HashMap, HashSet}, + num::NonZeroUsize, }; use tracing::error; @@ -46,11 +50,20 @@ pub struct BFTPersistentStorage { transmissions: DataMap, (Transmission, IndexSet>)>, /// The map of `aborted transmission ID` to `certificate IDs` entries. aborted_transmission_ids: DataMap, IndexSet>>, + /// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries. + cache_transmissions: RwLock, (Transmission, IndexSet>)>>, + /// The LRU cache for `aborted transmission ID` to `certificate IDs` entries. + cache_aborted_transmission_ids: RwLock, IndexSet>>>, } impl BFTPersistentStorage { /// Initializes a new BFT persistent storage service. pub fn open(storage_mode: StorageMode) -> Result { + let capacity = NonZeroUsize::new( + (Committee::::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH) * 2, + ) + .unwrap(); + Ok(Self { transmissions: internal::RocksDB::open_map(N::ID, storage_mode.clone(), MapID::BFT(BFTMap::Transmissions))?, aborted_transmission_ids: internal::RocksDB::open_map( @@ -58,12 +71,19 @@ impl BFTPersistentStorage { storage_mode, MapID::BFT(BFTMap::AbortedTransmissionIDs), )?, + cache_transmissions: RwLock::new(LruCache::new(capacity)), + cache_aborted_transmission_ids: RwLock::new(LruCache::new(capacity)), }) } - /// Initializes a new BFT persistent storage service. + /// Initializes a new BFT persistent storage service for testing. #[cfg(any(test, feature = "test"))] pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option) -> Result { + let capacity = NonZeroUsize::new( + (Committee::::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH) * 2, + ) + .unwrap(); + Ok(Self { transmissions: internal::RocksDB::open_map_testing( temp_dir.clone(), @@ -75,6 +95,8 @@ impl BFTPersistentStorage { dev, MapID::BFT(BFTMap::AbortedTransmissionIDs), )?, + cache_transmissions: RwLock::new(LruCache::new(capacity)), + cache_aborted_transmission_ids: RwLock::new(LruCache::new(capacity)), }) } } @@ -101,7 +123,12 @@ impl StorageService for BFTPersistentStorage { /// Returns the transmission for the given `transmission ID`. /// If the transmission ID does not exist in storage, `None` is returned. fn get_transmission(&self, transmission_id: TransmissionID) -> Option> { - // Get the transmission. + // Try to get the transmission from the cache first. + if let Some((transmission, _)) = self.cache_transmissions.write().get_mut(&transmission_id) { + return Some(transmission.clone()); + } + + // If not found in cache, check persistent storage. match self.transmissions.get_confirmed(&transmission_id) { Ok(Some(Cow::Owned((transmission, _)))) => Some(transmission), Ok(Some(Cow::Borrowed((transmission, _)))) => Some(transmission.clone()), @@ -152,66 +179,84 @@ impl StorageService for BFTPersistentStorage { aborted_transmission_ids: HashSet>, mut missing_transmissions: HashMap, Transmission>, ) { - // Inserts the following: - // - Inserts **only the missing** transmissions from storage. - // - Inserts the certificate ID into the corresponding set for **all** transmissions. - 'outer: for transmission_id in transmission_ids { - // Retrieve the transmission entry. + // First, handle the non-aborted transmissions. + for transmission_id in transmission_ids { + // Try to fetch from the persistent storage. match self.transmissions.get_confirmed(&transmission_id) { Ok(Some(entry)) => { + // The transmission exists in storage; update its certificate IDs. let (transmission, mut certificate_ids) = cow_to_cloned!(entry); - // Insert the certificate ID into the set. certificate_ids.insert(certificate_id); - // Update the transmission entry. - if let Err(e) = self.transmissions.insert(transmission_id, (transmission, certificate_ids)) { + + // Update the persistent storage. + if let Err(e) = + self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone())) + { error!("Failed to insert transmission {transmission_id} into storage - {e}"); - continue 'outer; } + + // Also, update the cache. + self.cache_transmissions.write().put(transmission_id, (transmission, certificate_ids)); } Ok(None) => { - // Retrieve the missing transmission. - let Some(transmission) = missing_transmissions.remove(&transmission_id) else { - if !aborted_transmission_ids.contains(&transmission_id) - && !self.contains_transmission(transmission_id) + // The transmission is missing from persistent storage. + // Check if it exists in the `missing_transmissions` map provided. + if let Some(transmission) = missing_transmissions.remove(&transmission_id) { + let certificate_ids = indexset! { certificate_id }; + + // Insert into persistent storage. + if let Err(e) = + self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone())) { - error!("Failed to provide a missing transmission {transmission_id}"); + error!("Failed to insert transmission {transmission_id} into storage - {e}"); } - continue 'outer; - }; - // Prepare the set of certificate IDs. - let certificate_ids = indexset! { certificate_id }; - // Insert the transmission and a new set with the certificate ID. - if let Err(e) = self.transmissions.insert(transmission_id, (transmission, certificate_ids)) { - error!("Failed to insert transmission {transmission_id} into storage - {e}"); - continue 'outer; + + // Also, insert into the cache. + self.cache_transmissions.write().put(transmission_id, (transmission, certificate_ids)); + } else if !aborted_transmission_ids.contains(&transmission_id) { + // If the transmission is not found in either storage or the missing map, + // and it's not an aborted transmission, log an error. + error!("Failed to provide a missing transmission {transmission_id}"); } } Err(e) => { + // Handle any errors during the retrieval. error!("Failed to process the 'insert' for transmission {transmission_id} into storage - {e}"); - continue 'outer; } } } - // Inserts the aborted transmission IDs. + + // Next, handle the aborted transmission IDs. for aborted_transmission_id in aborted_transmission_ids { - // Retrieve the transmission entry. match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) { Ok(Some(entry)) => { let mut certificate_ids = cow_to_cloned!(entry); // Insert the certificate ID into the set. certificate_ids.insert(certificate_id); - // Update the transmission entry. - if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids) { + + // Update the persistent storage. + if let Err(e) = + self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) + { error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}"); } + + // Update the cache. + self.cache_aborted_transmission_ids.write().put(aborted_transmission_id, certificate_ids); } Ok(None) => { // Prepare the set of certificate IDs. let certificate_ids = indexset! { certificate_id }; - // Insert the transmission and a new set with the certificate ID. - if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids) { + + // Insert the transmission and a new set with the certificate ID into the persistent storage. + if let Err(e) = + self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) + { error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}"); } + + // Insert the transmission and a new set with the certificate ID into the cache. + self.cache_aborted_transmission_ids.write().put(aborted_transmission_id, certificate_ids); } Err(e) => { error!( From f3060c7df0393bf5fd4683a347ff9f8470d8d06e Mon Sep 17 00:00:00 2001 From: Konstantin Pandl <16907747+kpandl@users.noreply.github.com> Date: Tue, 10 Sep 2024 13:52:15 +0200 Subject: [PATCH 2/6] update docs, mutex instead of rwlock, reduce duplicate code --- node/bft/storage-service/src/persistent.rs | 59 ++++++++-------------- 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/node/bft/storage-service/src/persistent.rs b/node/bft/storage-service/src/persistent.rs index fcff4257a4..ea57355b91 100644 --- a/node/bft/storage-service/src/persistent.rs +++ b/node/bft/storage-service/src/persistent.rs @@ -35,7 +35,7 @@ use snarkvm::{ use aleo_std::StorageMode; use indexmap::{indexset, IndexSet}; use lru::LruCache; -use parking_lot::RwLock; +use parking_lot::Mutex; use std::{ borrow::Cow, collections::{HashMap, HashSet}, @@ -50,10 +50,10 @@ pub struct BFTPersistentStorage { transmissions: DataMap, (Transmission, IndexSet>)>, /// The map of `aborted transmission ID` to `certificate IDs` entries. aborted_transmission_ids: DataMap, IndexSet>>, - /// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries. - cache_transmissions: RwLock, (Transmission, IndexSet>)>>, - /// The LRU cache for `aborted transmission ID` to `certificate IDs` entries. - cache_aborted_transmission_ids: RwLock, IndexSet>>>, + /// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries that are part of the persistent storage. + cache_transmissions: Mutex, (Transmission, IndexSet>)>>, + /// The LRU cache for `aborted transmission ID` to `certificate IDs` entries that are part of the persistent storage. + cache_aborted_transmission_ids: Mutex, IndexSet>>>, } impl BFTPersistentStorage { @@ -71,8 +71,8 @@ impl BFTPersistentStorage { storage_mode, MapID::BFT(BFTMap::AbortedTransmissionIDs), )?, - cache_transmissions: RwLock::new(LruCache::new(capacity)), - cache_aborted_transmission_ids: RwLock::new(LruCache::new(capacity)), + cache_transmissions: Mutex::new(LruCache::new(capacity)), + cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)), }) } @@ -95,8 +95,8 @@ impl BFTPersistentStorage { dev, MapID::BFT(BFTMap::AbortedTransmissionIDs), )?, - cache_transmissions: RwLock::new(LruCache::new(capacity)), - cache_aborted_transmission_ids: RwLock::new(LruCache::new(capacity)), + cache_transmissions: Mutex::new(LruCache::new(capacity)), + cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)), }) } } @@ -124,7 +124,7 @@ impl StorageService for BFTPersistentStorage { /// If the transmission ID does not exist in storage, `None` is returned. fn get_transmission(&self, transmission_id: TransmissionID) -> Option> { // Try to get the transmission from the cache first. - if let Some((transmission, _)) = self.cache_transmissions.write().get_mut(&transmission_id) { + if let Some((transmission, _)) = self.cache_transmissions.lock().get_mut(&transmission_id) { return Some(transmission.clone()); } @@ -196,7 +196,7 @@ impl StorageService for BFTPersistentStorage { } // Also, update the cache. - self.cache_transmissions.write().put(transmission_id, (transmission, certificate_ids)); + self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids)); } Ok(None) => { // The transmission is missing from persistent storage. @@ -212,7 +212,7 @@ impl StorageService for BFTPersistentStorage { } // Also, insert into the cache. - self.cache_transmissions.write().put(transmission_id, (transmission, certificate_ids)); + self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids)); } else if !aborted_transmission_ids.contains(&transmission_id) { // If the transmission is not found in either storage or the missing map, // and it's not an aborted transmission, log an error. @@ -228,42 +228,27 @@ impl StorageService for BFTPersistentStorage { // Next, handle the aborted transmission IDs. for aborted_transmission_id in aborted_transmission_ids { - match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) { + let certificate_ids = match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) { Ok(Some(entry)) => { let mut certificate_ids = cow_to_cloned!(entry); // Insert the certificate ID into the set. certificate_ids.insert(certificate_id); - - // Update the persistent storage. - if let Err(e) = - self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) - { - error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}"); - } - - // Update the cache. - self.cache_aborted_transmission_ids.write().put(aborted_transmission_id, certificate_ids); - } - Ok(None) => { - // Prepare the set of certificate IDs. - let certificate_ids = indexset! { certificate_id }; - - // Insert the transmission and a new set with the certificate ID into the persistent storage. - if let Err(e) = - self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) - { - error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}"); - } - - // Insert the transmission and a new set with the certificate ID into the cache. - self.cache_aborted_transmission_ids.write().put(aborted_transmission_id, certificate_ids); + certificate_ids } + Ok(None) => indexset! { certificate_id }, Err(e) => { error!( "Failed to process the 'insert' for aborted transmission ID {aborted_transmission_id} into storage - {e}" ); + continue; } + }; + // Insert the certificate IDs into the persistent storage. + if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) { + error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}"); } + // Insert the certificate IDs into the cache. + self.cache_aborted_transmission_ids.lock().put(aborted_transmission_id, certificate_ids); } } From 500a1e0700622a64b90998007b9d2cc0eb9c1e87 Mon Sep 17 00:00:00 2001 From: Konstantin Pandl <16907747+kpandl@users.noreply.github.com> Date: Wed, 11 Sep 2024 01:26:02 +0200 Subject: [PATCH 3/6] use deadline instead of sleep in node-router tests --- node/router/tests/disconnect.rs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/node/router/tests/disconnect.rs b/node/router/tests/disconnect.rs index df9eee1016..b13f373092 100644 --- a/node/router/tests/disconnect.rs +++ b/node/router/tests/disconnect.rs @@ -18,6 +18,7 @@ use common::*; use snarkos_node_tcp::{protocols::Handshake, P2P}; use core::time::Duration; +use deadline::deadline; #[tokio::test] async fn test_disconnect_without_handshake() { @@ -33,8 +34,12 @@ async fn test_disconnect_without_handshake() { // Connect node0 to node1. node0.connect(node1.local_ip()); - // Sleep briefly. - tokio::time::sleep(Duration::from_millis(200)).await; + // Await both nodes being connected. + let node0_ = node0.clone(); + let node1_ = node1.clone(); + deadline!(Duration::from_secs(5), move || { + node0_.tcp().num_connected() == 1 && node1_.tcp().num_connected() == 1 + }); print_tcp!(node0); print_tcp!(node1); @@ -49,8 +54,9 @@ async fn test_disconnect_without_handshake() { // collection of connected peers is only altered during the handshake, // as well as the address resolver needed for the higher-level calls node0.tcp().disconnect(node1.local_ip()).await; - // Sleep briefly. - tokio::time::sleep(Duration::from_millis(100)).await; + // Await disconnection. + let node0_ = node0.clone(); + deadline!(Duration::from_secs(5), move || { node0_.tcp().num_connected() == 0 }); print_tcp!(node0); print_tcp!(node1); @@ -79,8 +85,12 @@ async fn test_disconnect_with_handshake() { // Connect node0 to node1. node0.connect(node1.local_ip()); - // Sleep briefly. - tokio::time::sleep(Duration::from_millis(200)).await; + // Await for the nodes to be connected. + let node0_ = node0.clone(); + let node1_ = node1.clone(); + deadline!(Duration::from_secs(5), move || { + node0_.tcp().num_connected() == 1 && node1_.tcp().num_connected() == 1 + }); print_tcp!(node0); print_tcp!(node1); @@ -97,8 +107,9 @@ async fn test_disconnect_with_handshake() { // Disconnect node0 from node1. node0.disconnect(node1.local_ip()); - // Sleep briefly. - tokio::time::sleep(Duration::from_millis(100)).await; + // Await nodes being disconnected. + let node0_ = node0.clone(); + deadline!(Duration::from_secs(5), move || { node0_.tcp().num_connected() == 0 }); print_tcp!(node0); print_tcp!(node1); From bbd10c136468fa0d884537319bca8bd3244dd332 Mon Sep 17 00:00:00 2001 From: Konstantin Pandl <16907747+kpandl@users.noreply.github.com> Date: Thu, 12 Sep 2024 12:35:40 +0200 Subject: [PATCH 4/6] remove duplicate code, bring back contains_transmission --- node/bft/storage-service/src/persistent.rs | 47 +++++++++------------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/node/bft/storage-service/src/persistent.rs b/node/bft/storage-service/src/persistent.rs index ea57355b91..9e3a56e01f 100644 --- a/node/bft/storage-service/src/persistent.rs +++ b/node/bft/storage-service/src/persistent.rs @@ -180,50 +180,43 @@ impl StorageService for BFTPersistentStorage { mut missing_transmissions: HashMap, Transmission>, ) { // First, handle the non-aborted transmissions. - for transmission_id in transmission_ids { + 'outer: for transmission_id in transmission_ids { // Try to fetch from the persistent storage. - match self.transmissions.get_confirmed(&transmission_id) { + let (transmission, certificate_ids) = match self.transmissions.get_confirmed(&transmission_id) { Ok(Some(entry)) => { // The transmission exists in storage; update its certificate IDs. let (transmission, mut certificate_ids) = cow_to_cloned!(entry); certificate_ids.insert(certificate_id); - - // Update the persistent storage. - if let Err(e) = - self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone())) - { - error!("Failed to insert transmission {transmission_id} into storage - {e}"); - } - - // Also, update the cache. - self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids)); + (transmission, certificate_ids) } Ok(None) => { // The transmission is missing from persistent storage. // Check if it exists in the `missing_transmissions` map provided. - if let Some(transmission) = missing_transmissions.remove(&transmission_id) { - let certificate_ids = indexset! { certificate_id }; - - // Insert into persistent storage. - if let Err(e) = - self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone())) + let Some(transmission) = missing_transmissions.remove(&transmission_id) else { + if !aborted_transmission_ids.contains(&transmission_id) + && !self.contains_transmission(transmission_id) { - error!("Failed to insert transmission {transmission_id} into storage - {e}"); + error!("Failed to provide a missing transmission {transmission_id}"); } - - // Also, insert into the cache. - self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids)); - } else if !aborted_transmission_ids.contains(&transmission_id) { - // If the transmission is not found in either storage or the missing map, - // and it's not an aborted transmission, log an error. - error!("Failed to provide a missing transmission {transmission_id}"); - } + continue 'outer; + }; + // Prepare the set of certificate IDs. + let certificate_ids = indexset! { certificate_id }; + (transmission, certificate_ids) } Err(e) => { // Handle any errors during the retrieval. error!("Failed to process the 'insert' for transmission {transmission_id} into storage - {e}"); + continue; } + }; + // Insert the transmission into persistent storage. + if let Err(e) = self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone())) + { + error!("Failed to insert transmission {transmission_id} into storage - {e}"); } + // Insert the transmission into the cache. + self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids)); } // Next, handle the aborted transmission IDs. From bdc713e6b2b34516c283575dc0b4d3e09dd4610b Mon Sep 17 00:00:00 2001 From: Konstantin Pandl <16907747+kpandl@users.noreply.github.com> Date: Thu, 12 Sep 2024 15:32:08 +0200 Subject: [PATCH 5/6] remove test deadline duration to 1 second --- node/router/tests/disconnect.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/router/tests/disconnect.rs b/node/router/tests/disconnect.rs index b13f373092..ac184c940c 100644 --- a/node/router/tests/disconnect.rs +++ b/node/router/tests/disconnect.rs @@ -37,7 +37,7 @@ async fn test_disconnect_without_handshake() { // Await both nodes being connected. let node0_ = node0.clone(); let node1_ = node1.clone(); - deadline!(Duration::from_secs(5), move || { + deadline!(Duration::from_secs(1), move || { node0_.tcp().num_connected() == 1 && node1_.tcp().num_connected() == 1 }); @@ -56,7 +56,7 @@ async fn test_disconnect_without_handshake() { node0.tcp().disconnect(node1.local_ip()).await; // Await disconnection. let node0_ = node0.clone(); - deadline!(Duration::from_secs(5), move || { node0_.tcp().num_connected() == 0 }); + deadline!(Duration::from_secs(1), move || { node0_.tcp().num_connected() == 0 }); print_tcp!(node0); print_tcp!(node1); @@ -88,7 +88,7 @@ async fn test_disconnect_with_handshake() { // Await for the nodes to be connected. let node0_ = node0.clone(); let node1_ = node1.clone(); - deadline!(Duration::from_secs(5), move || { + deadline!(Duration::from_secs(1), move || { node0_.tcp().num_connected() == 1 && node1_.tcp().num_connected() == 1 }); @@ -109,7 +109,7 @@ async fn test_disconnect_with_handshake() { node0.disconnect(node1.local_ip()); // Await nodes being disconnected. let node0_ = node0.clone(); - deadline!(Duration::from_secs(5), move || { node0_.tcp().num_connected() == 0 }); + deadline!(Duration::from_secs(1), move || { node0_.tcp().num_connected() == 0 }); print_tcp!(node0); print_tcp!(node1); From 1ea3358813e950e6dd86e35a47e5da78db22c2e3 Mon Sep 17 00:00:00 2001 From: Konstantin Pandl Date: Mon, 21 Oct 2024 12:40:44 +0200 Subject: [PATCH 6/6] handle could not construct NonZeroUsize case --- Cargo.lock | 1 + node/bft/storage-service/Cargo.toml | 3 +++ node/bft/storage-service/src/persistent.rs | 5 +++-- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d2f413586..39474e892c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3344,6 +3344,7 @@ name = "snarkos-node-bft-storage-service" version = "2.2.7" dependencies = [ "aleo-std", + "anyhow", "indexmap 2.5.0", "lru", "parking_lot", diff --git a/node/bft/storage-service/Cargo.toml b/node/bft/storage-service/Cargo.toml index 4aa60eab29..5cc8ef1578 100644 --- a/node/bft/storage-service/Cargo.toml +++ b/node/bft/storage-service/Cargo.toml @@ -25,6 +25,9 @@ test = [ "memory" ] [dependencies.aleo-std] workspace = true +[dependencies.anyhow] +version = "1.0.79" + [dependencies.indexmap] version = "2.1" features = [ "serde", "rayon" ] diff --git a/node/bft/storage-service/src/persistent.rs b/node/bft/storage-service/src/persistent.rs index 7ccc6c578a..7c4f1abd55 100644 --- a/node/bft/storage-service/src/persistent.rs +++ b/node/bft/storage-service/src/persistent.rs @@ -34,6 +34,7 @@ use snarkvm::{ }; use aleo_std::StorageMode; +use anyhow::anyhow; use indexmap::{indexset, IndexSet}; use lru::LruCache; use parking_lot::Mutex; @@ -63,7 +64,7 @@ impl BFTPersistentStorage { let capacity = NonZeroUsize::new( (Committee::::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH) * 2, ) - .unwrap(); + .ok_or_else(|| anyhow!("Could not construct NonZeroUsize"))?; Ok(Self { transmissions: internal::RocksDB::open_map(N::ID, storage_mode.clone(), MapID::BFT(BFTMap::Transmissions))?, @@ -83,7 +84,7 @@ impl BFTPersistentStorage { let capacity = NonZeroUsize::new( (Committee::::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::::MAX_TRANSMISSIONS_PER_BATCH) * 2, ) - .unwrap(); + .ok_or_else(|| anyhow!("Could not construct NonZeroUsize"))?; Ok(Self { transmissions: internal::RocksDB::open_map_testing(