Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache transmissions for faster processing #3395

Open
wants to merge 6 commits into
base: mainnet-staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions node/bft/storage-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ workspace = true
version = "2.1"
features = [ "serde", "rayon" ]

[dependencies.lru]
version = "0.12.1"
Copy link
Collaborator

@niklaslong niklaslong Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to change it but just as a note to reviewers, the latest version (which will be used by cargo when building) is 0.12.4.


[dependencies.parking_lot]
version = "0.12"
optional = true
Expand Down
93 changes: 58 additions & 35 deletions node/bft/storage-service/src/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::StorageService;
use snarkvm::{
ledger::{
committee::Committee,
narwhal::{BatchHeader, Transmission, TransmissionID},
store::{
cow_to_cloned,
Expand All @@ -33,9 +34,12 @@ use snarkvm::{

use aleo_std::StorageMode;
use indexmap::{indexset, IndexSet};
use lru::LruCache;
use parking_lot::Mutex;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
num::NonZeroUsize,
};
use tracing::error;

Expand All @@ -46,24 +50,40 @@ pub struct BFTPersistentStorage<N: Network> {
transmissions: DataMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
/// The map of `aborted transmission ID` to `certificate IDs` entries.
aborted_transmission_ids: DataMap<TransmissionID<N>, IndexSet<Field<N>>>,
/// The LRU cache for `transmission ID` to `(transmission, certificate IDs)` entries that are part of the persistent storage.
cache_transmissions: Mutex<LruCache<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>>,
/// The LRU cache for `aborted transmission ID` to `certificate IDs` entries that are part of the persistent storage.
cache_aborted_transmission_ids: Mutex<LruCache<TransmissionID<N>, IndexSet<Field<N>>>>,
}

impl<N: Network> BFTPersistentStorage<N> {
/// Initializes a new BFT persistent storage service.
pub fn open(storage_mode: StorageMode) -> Result<Self> {
let capacity = NonZeroUsize::new(
(Committee::<N>::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH) * 2,
)
.unwrap();

Ok(Self {
transmissions: internal::RocksDB::open_map(N::ID, storage_mode.clone(), MapID::BFT(BFTMap::Transmissions))?,
aborted_transmission_ids: internal::RocksDB::open_map(
N::ID,
storage_mode,
MapID::BFT(BFTMap::AbortedTransmissionIDs),
)?,
cache_transmissions: Mutex::new(LruCache::new(capacity)),
cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)),
})
}

/// Initializes a new BFT persistent storage service.
/// Initializes a new BFT persistent storage service for testing.
#[cfg(any(test, feature = "test"))]
pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option<u16>) -> Result<Self> {
let capacity = NonZeroUsize::new(
(Committee::<N>::MAX_COMMITTEE_SIZE as usize) * (BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH) * 2,
)
.unwrap();

Ok(Self {
transmissions: internal::RocksDB::open_map_testing(
temp_dir.clone(),
Expand All @@ -75,6 +95,8 @@ impl<N: Network> BFTPersistentStorage<N> {
dev,
MapID::BFT(BFTMap::AbortedTransmissionIDs),
)?,
cache_transmissions: Mutex::new(LruCache::new(capacity)),
cache_aborted_transmission_ids: Mutex::new(LruCache::new(capacity)),
})
}
}
Expand All @@ -101,7 +123,12 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
/// Returns the transmission for the given `transmission ID`.
/// If the transmission ID does not exist in storage, `None` is returned.
fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
// Get the transmission.
// Try to get the transmission from the cache first.
if let Some((transmission, _)) = self.cache_transmissions.lock().get_mut(&transmission_id) {
return Some(transmission.clone());
}

// If not found in cache, check persistent storage.
match self.transmissions.get_confirmed(&transmission_id) {
Ok(Some(Cow::Owned((transmission, _)))) => Some(transmission),
Ok(Some(Cow::Borrowed((transmission, _)))) => Some(transmission.clone()),
Expand Down Expand Up @@ -152,24 +179,19 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
aborted_transmission_ids: HashSet<TransmissionID<N>>,
mut missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
) {
// Inserts the following:
// - Inserts **only the missing** transmissions from storage.
// - Inserts the certificate ID into the corresponding set for **all** transmissions.
// First, handle the non-aborted transmissions.
'outer: for transmission_id in transmission_ids {
// Retrieve the transmission entry.
match self.transmissions.get_confirmed(&transmission_id) {
// Try to fetch from the persistent storage.
let (transmission, certificate_ids) = match self.transmissions.get_confirmed(&transmission_id) {
Ok(Some(entry)) => {
// The transmission exists in storage; update its certificate IDs.
let (transmission, mut certificate_ids) = cow_to_cloned!(entry);
// Insert the certificate ID into the set.
certificate_ids.insert(certificate_id);
// Update the transmission entry.
if let Err(e) = self.transmissions.insert(transmission_id, (transmission, certificate_ids)) {
error!("Failed to insert transmission {transmission_id} into storage - {e}");
continue 'outer;
}
(transmission, certificate_ids)
}
Ok(None) => {
// Retrieve the missing transmission.
// The transmission is missing from persistent storage.
// Check if it exists in the `missing_transmissions` map provided.
let Some(transmission) = missing_transmissions.remove(&transmission_id) else {
if !aborted_transmission_ids.contains(&transmission_id)
&& !self.contains_transmission(transmission_id)
vicsn marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -180,45 +202,46 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
};
// Prepare the set of certificate IDs.
let certificate_ids = indexset! { certificate_id };
// Insert the transmission and a new set with the certificate ID.
if let Err(e) = self.transmissions.insert(transmission_id, (transmission, certificate_ids)) {
error!("Failed to insert transmission {transmission_id} into storage - {e}");
continue 'outer;
}
(transmission, certificate_ids)
}
Err(e) => {
// Handle any errors during the retrieval.
error!("Failed to process the 'insert' for transmission {transmission_id} into storage - {e}");
continue 'outer;
continue;
}
};
// Insert the transmission into persistent storage.
if let Err(e) = self.transmissions.insert(transmission_id, (transmission.clone(), certificate_ids.clone()))
{
error!("Failed to insert transmission {transmission_id} into storage - {e}");
}
// Insert the transmission into the cache.
self.cache_transmissions.lock().put(transmission_id, (transmission, certificate_ids));
}
// Inserts the aborted transmission IDs.

// Next, handle the aborted transmission IDs.
for aborted_transmission_id in aborted_transmission_ids {
// Retrieve the transmission entry.
match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) {
let certificate_ids = match self.aborted_transmission_ids.get_confirmed(&aborted_transmission_id) {
Ok(Some(entry)) => {
let mut certificate_ids = cow_to_cloned!(entry);
// Insert the certificate ID into the set.
certificate_ids.insert(certificate_id);
// Update the transmission entry.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
}
Ok(None) => {
// Prepare the set of certificate IDs.
let certificate_ids = indexset! { certificate_id };
// Insert the transmission and a new set with the certificate ID.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
certificate_ids
}
Ok(None) => indexset! { certificate_id },
Err(e) => {
error!(
"Failed to process the 'insert' for aborted transmission ID {aborted_transmission_id} into storage - {e}"
);
continue;
}
};
// Insert the certificate IDs into the persistent storage.
if let Err(e) = self.aborted_transmission_ids.insert(aborted_transmission_id, certificate_ids.clone()) {
error!("Failed to insert aborted transmission ID {aborted_transmission_id} into storage - {e}");
}
// Insert the certificate IDs into the cache.
self.cache_aborted_transmission_ids.lock().put(aborted_transmission_id, certificate_ids);
}
}

Expand Down
27 changes: 19 additions & 8 deletions node/router/tests/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use common::*;
use snarkos_node_tcp::{protocols::Handshake, P2P};

use core::time::Duration;
use deadline::deadline;

#[tokio::test]
async fn test_disconnect_without_handshake() {
Expand All @@ -33,8 +34,12 @@ async fn test_disconnect_without_handshake() {

// Connect node0 to node1.
node0.connect(node1.local_ip());
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(200)).await;
// Await both nodes being connected.
let node0_ = node0.clone();
let node1_ = node1.clone();
deadline!(Duration::from_secs(5), move || {
niklaslong marked this conversation as resolved.
Show resolved Hide resolved
node0_.tcp().num_connected() == 1 && node1_.tcp().num_connected() == 1
});

print_tcp!(node0);
print_tcp!(node1);
Expand All @@ -49,8 +54,9 @@ async fn test_disconnect_without_handshake() {
// collection of connected peers is only altered during the handshake,
// as well as the address resolver needed for the higher-level calls
node0.tcp().disconnect(node1.local_ip()).await;
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(100)).await;
// Await disconnection.
let node0_ = node0.clone();
deadline!(Duration::from_secs(5), move || { node0_.tcp().num_connected() == 0 });

print_tcp!(node0);
print_tcp!(node1);
Expand Down Expand Up @@ -79,8 +85,12 @@ async fn test_disconnect_with_handshake() {

// Connect node0 to node1.
node0.connect(node1.local_ip());
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(200)).await;
// Await for the nodes to be connected.
let node0_ = node0.clone();
let node1_ = node1.clone();
deadline!(Duration::from_secs(5), move || {
node0_.tcp().num_connected() == 1 && node1_.tcp().num_connected() == 1
});

print_tcp!(node0);
print_tcp!(node1);
Expand All @@ -97,8 +107,9 @@ async fn test_disconnect_with_handshake() {

// Disconnect node0 from node1.
node0.disconnect(node1.local_ip());
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(100)).await;
// Await nodes being disconnected.
let node0_ = node0.clone();
deadline!(Duration::from_secs(5), move || { node0_.tcp().num_connected() == 0 });

print_tcp!(node0);
print_tcp!(node1);
Expand Down