From 27f5d5519d592ce6bc16d110cc2e74358b765737 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Wed, 3 Jan 2024 22:15:18 +0100 Subject: [PATCH 1/8] Limit amount of transactions which can be verified in parallel --- Cargo.lock | 1 + node/Cargo.toml | 3 ++ node/src/client/mod.rs | 94 +++++++++++++++++++++++++++++++++++++-- node/src/client/router.rs | 11 +++-- 4 files changed, 99 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1b375f5e2..c38ee21d60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3269,6 +3269,7 @@ dependencies = [ "deadline", "futures-util", "indexmap 2.1.0", + "lru", "num_cpus", "once_cell", "parking_lot", diff --git a/node/Cargo.toml b/node/Cargo.toml index 0a34cdaea4..6bf9cec190 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -48,6 +48,9 @@ features = [ "sink" ] [dependencies.indexmap] version = "2.1" +[dependencies.lru] +version = "0.12.1" + [dependencies.metrics] package = "snarkos-node-metrics" path = "./metrics" diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index 1878deaeaa..f0fdc933e7 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -16,10 +16,10 @@ mod router; use crate::traits::NodeInterface; use snarkos_account::Account; -use snarkos_node_bft::ledger_service::CoreLedgerService; +use snarkos_node_bft::{ledger_service::CoreLedgerService, MAX_TRANSMISSIONS_PER_BATCH}; use snarkos_node_rest::Rest; use snarkos_node_router::{ - messages::{Message, NodeType, UnconfirmedSolution}, + messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction}, Heartbeat, Inbound, Outbound, @@ -39,17 +39,27 @@ use snarkvm::{ store::ConsensusStorage, Ledger, }, + prelude::block::Transaction, }; use anyhow::Result; use core::future::Future; +use lru::LruCache; use parking_lot::Mutex; use std::{ net::SocketAddr, - sync::{atomic::AtomicBool, Arc}, + num::NonZeroUsize, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed}, + Arc, + }, }; use tokio::task::JoinHandle; +const VERIFICATION_CONCURRENCY_LIMIT: usize = 6; // 8 deployments of MAX_NUM_CONSTRAINTS will run out of memory. + +type TransactionContents = (SocketAddr, UnconfirmedTransaction, Transaction); + /// A client node is a full node, capable of querying with the network. #[derive(Clone)] pub struct Client> { @@ -65,6 +75,10 @@ pub struct Client> { genesis: Block, /// The coinbase puzzle. coinbase_puzzle: CoinbasePuzzle, + /// The unconfirmed transactions queue. + transaction_queue: Arc>>>, + /// The amount of transactions being verified concurrently. + verification_counter: Arc, /// The spawned handles. handles: Arc>>>, /// The shutdown signal. @@ -129,6 +143,10 @@ impl> Client { sync: Arc::new(sync), genesis, coinbase_puzzle, + transaction_queue: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(MAX_TRANSMISSIONS_PER_BATCH).unwrap(), + ))), + verification_counter: Default::default(), handles: Default::default(), shutdown, }; @@ -141,6 +159,8 @@ impl> Client { node.initialize_routing().await; // Initialize the sync module. node.initialize_sync(); + // Initialize transaction verification. + node.initialize_transaction_verification(); // Initialize the notification message loop. node.handles.lock().push(crate::start_notification_message_loop()); // Pass the node to the signal handler. @@ -168,7 +188,7 @@ impl> Client { self.handles.lock().push(tokio::spawn(async move { loop { // If the Ctrl-C handler registered the signal, stop the node. - if node.shutdown.load(std::sync::atomic::Ordering::Relaxed) { + if node.shutdown.load(Relaxed) { info!("Shutting down block production"); break; } @@ -181,6 +201,72 @@ impl> Client { })); } + /// Initializes transaction verification. + fn initialize_transaction_verification(&self) { + // Start the transaction verification loop. + let node = self.clone(); + self.handles.lock().push(tokio::spawn(async move { + loop { + // If the Ctrl-C handler registered the signal, stop the node. + if node.shutdown.load(Relaxed) { + info!("Shutting down transaction verification"); + break; + } + + // Sleep briefly to allow transactions to be validated. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Determine the length of the queue. + let queue_len = node.transaction_queue.lock().len(); + + // If we don't have any transactions to verify, try again later. + if queue_len == 0 { + continue; + } + + // Determine how many transactions we want to verify. + let num_transactions = VERIFICATION_CONCURRENCY_LIMIT.min(queue_len); + + // Check if we have room to verify the transactions and update the counter accordingly. + let previous_verification_counter = node.verification_counter.fetch_update(Relaxed, Relaxed, |c| { + // If we are already verifying sufficient transactions, don't verify any more for now. + if c >= VERIFICATION_CONCURRENCY_LIMIT { + None + // If we have space to verify more transactions, verify as many as we can. + } else { + // Consider verifying the full transaction queue, but limit to the concurrency limit. + Some((c + queue_len).min(VERIFICATION_CONCURRENCY_LIMIT)) + } + }); + + // Determine how many transactions we cán verify. + let num_transactions = match previous_verification_counter { + // Determine how many transactions we can verify. + Ok(previous_value) => num_transactions.saturating_sub(previous_value), + // If we are already verifying sufficient transactions, don't verify any more for now. + Err(_) => continue, + }; + + // For each transaction, spawn a task to verify it. + for _ in 0..num_transactions { + if let Some((_, (peer_ip, serialized, transaction))) = node.transaction_queue.lock().pop_lru() { + let _node = node.clone(); + tokio::spawn(async move { + // Check the transaction. + if _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()).is_ok() + { + // Propagate the `UnconfirmedTransaction`. + _node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); + } + // Reduce the verification counter. + _node.verification_counter.fetch_sub(1, Relaxed); + }); + } + } + } + })); + } + /// Spawns a task with the given future; it should only be used for long-running tasks. pub fn spawn + Send + 'static>(&self, future: T) { self.handles.lock().push(tokio::spawn(future)); diff --git a/node/src/client/router.rs b/node/src/client/router.rs index 6123f75e32..b02df5cbb4 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -303,11 +303,10 @@ impl> Inbound for Client { if transaction.is_fee() { return true; // Maintain the connection. } - // Check that the transaction is well-formed and unique. - if self.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()).is_ok() { - // Propagate the `UnconfirmedTransaction`. - self.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]); - } - true + + // Try to add the transaction to the verification queue, ignore if it succeeds. + self.transaction_queue.lock().put(transaction.id(), (peer_ip, serialized, transaction)); + + true // Maintain the connection } } From 989e741eebb8369be2f575fe470e1f37b8ff754d Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Thu, 4 Jan 2024 12:36:07 +0100 Subject: [PATCH 2/8] Document TransactionContents --- node/src/client/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index f0fdc933e7..551a028b95 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -58,6 +58,8 @@ use tokio::task::JoinHandle; const VERIFICATION_CONCURRENCY_LIMIT: usize = 6; // 8 deployments of MAX_NUM_CONSTRAINTS will run out of memory. +/// Transaction details needed for propagation. +/// We preserve the serialized transaction for faster propagation. type TransactionContents = (SocketAddr, UnconfirmedTransaction, Transaction); /// A client node is a full node, capable of querying with the network. From 321cd023e5056edab8225d85801b8591dae5ed40 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Thu, 4 Jan 2024 12:37:03 +0100 Subject: [PATCH 3/8] Lock queue for duration of verification spawns --- node/src/client/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index 551a028b95..bbbede827e 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -250,8 +250,9 @@ impl> Client { }; // For each transaction, spawn a task to verify it. + let mut tx_queue = node.transaction_queue.lock(); for _ in 0..num_transactions { - if let Some((_, (peer_ip, serialized, transaction))) = node.transaction_queue.lock().pop_lru() { + if let Some((_, (peer_ip, serialized, transaction))) = tx_queue.pop_lru() { let _node = node.clone(); tokio::spawn(async move { // Check the transaction. From 0d383e525435c0e441f6b231897333c3e7e979ff Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Thu, 4 Jan 2024 12:48:11 +0100 Subject: [PATCH 4/8] Ensure LRU cache is FIFO --- node/src/client/router.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/node/src/client/router.rs b/node/src/client/router.rs index b02df5cbb4..ce43c6efaa 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -304,8 +304,11 @@ impl> Inbound for Client { return true; // Maintain the connection. } - // Try to add the transaction to the verification queue, ignore if it succeeds. - self.transaction_queue.lock().put(transaction.id(), (peer_ip, serialized, transaction)); + // Try to add the transaction to the verification queue, without changing LRU status of known txs. + let mut tx_queue = self.transaction_queue.lock(); + if tx_queue.peek(&transaction.id()).is_none() { + tx_queue.put(transaction.id(), (peer_ip, serialized, transaction)); + } true // Maintain the connection } From d2ba9838404bb7edbb33adc85af5137a27b9fb11 Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Thu, 4 Jan 2024 13:48:35 +0100 Subject: [PATCH 5/8] Make the wait time dependent on queue_len and counter --- node/src/client/mod.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index bbbede827e..a0bd701746 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -53,8 +53,9 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed}, Arc, }, + time::Duration, }; -use tokio::task::JoinHandle; +use tokio::{task::JoinHandle, time::sleep}; const VERIFICATION_CONCURRENCY_LIMIT: usize = 6; // 8 deployments of MAX_NUM_CONSTRAINTS will run out of memory. @@ -215,23 +216,30 @@ impl> Client { break; } - // Sleep briefly to allow transactions to be validated. - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - // Determine the length of the queue. + // Determine the current length of the queue and counter. let queue_len = node.transaction_queue.lock().len(); - - // If we don't have any transactions to verify, try again later. - if queue_len == 0 { - continue; - } + let counter = node.verification_counter.load(Relaxed); + + // Sleep to allow the queue to be filled and transactions to be validated. + match (queue_len, counter) { + // If the queue is empty, sleep longer and retry. + (0, _) => { + sleep(Duration::from_millis(500)).await; + continue; + } + // If the counter is 0, we can verify transactions immediately. + (_, 0) => {} + // Otherwise, sleep briefly to let transactions be verified. + _ => sleep(Duration::from_millis(50)).await, + }; // Determine how many transactions we want to verify. + let queue_len = node.transaction_queue.lock().len(); let num_transactions = VERIFICATION_CONCURRENCY_LIMIT.min(queue_len); // Check if we have room to verify the transactions and update the counter accordingly. let previous_verification_counter = node.verification_counter.fetch_update(Relaxed, Relaxed, |c| { - // If we are already verifying sufficient transactions, don't verify any more for now. + // If we are still verifying sufficient transactions, don't verify any more for now. if c >= VERIFICATION_CONCURRENCY_LIMIT { None // If we have space to verify more transactions, verify as many as we can. From 7e12d015d8a967594d79057b3e29b41ee943ce2e Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Thu, 4 Jan 2024 13:52:30 +0100 Subject: [PATCH 6/8] Cleaner check if tx_queue contains tx id --- node/src/client/router.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/client/router.rs b/node/src/client/router.rs index ce43c6efaa..67ee9ee76e 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -306,7 +306,7 @@ impl> Inbound for Client { // Try to add the transaction to the verification queue, without changing LRU status of known txs. let mut tx_queue = self.transaction_queue.lock(); - if tx_queue.peek(&transaction.id()).is_none() { + if !tx_queue.contains(&transaction.id()) { tx_queue.put(transaction.id(), (peer_ip, serialized, transaction)); } From 61e2a1d18f6680ac95f7748c545f5c7a604d87da Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Thu, 4 Jan 2024 13:53:29 +0100 Subject: [PATCH 7/8] Clarify meaning of verification_counter --- node/src/client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index a0bd701746..adc2eaf8e1 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -80,7 +80,7 @@ pub struct Client> { coinbase_puzzle: CoinbasePuzzle, /// The unconfirmed transactions queue. transaction_queue: Arc>>>, - /// The amount of transactions being verified concurrently. + /// The amount of transactions currently being verified. verification_counter: Arc, /// The spawned handles. handles: Arc>>>, From f6debd6f455e8eb26b604054116e035b8e5438cc Mon Sep 17 00:00:00 2001 From: Victor Sint Nicolaas Date: Thu, 4 Jan 2024 16:07:55 +0100 Subject: [PATCH 8/8] Simplify and speed up verification --- node/src/client/mod.rs | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index adc2eaf8e1..12ebd7438c 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -216,26 +216,20 @@ impl> Client { break; } - // Determine the current length of the queue and counter. - let queue_len = node.transaction_queue.lock().len(); - let counter = node.verification_counter.load(Relaxed); + // Determine if the queue contains txs to verify. + let queue_is_empty = node.transaction_queue.lock().is_empty(); + // Determine if our verification counter has space to verify new txs. + let counter_is_full = node.verification_counter.load(Relaxed) >= VERIFICATION_CONCURRENCY_LIMIT; - // Sleep to allow the queue to be filled and transactions to be validated. - match (queue_len, counter) { - // If the queue is empty, sleep longer and retry. - (0, _) => { - sleep(Duration::from_millis(500)).await; - continue; - } - // If the counter is 0, we can verify transactions immediately. - (_, 0) => {} - // Otherwise, sleep briefly to let transactions be verified. - _ => sleep(Duration::from_millis(50)).await, - }; + // Sleep to allow the queue to be filled or transactions to be validated. + if queue_is_empty || counter_is_full { + sleep(Duration::from_millis(50)).await; + continue; + } // Determine how many transactions we want to verify. let queue_len = node.transaction_queue.lock().len(); - let num_transactions = VERIFICATION_CONCURRENCY_LIMIT.min(queue_len); + let num_transactions = queue_len.min(VERIFICATION_CONCURRENCY_LIMIT); // Check if we have room to verify the transactions and update the counter accordingly. let previous_verification_counter = node.verification_counter.fetch_update(Relaxed, Relaxed, |c| { @@ -244,8 +238,8 @@ impl> Client { None // If we have space to verify more transactions, verify as many as we can. } else { - // Consider verifying the full transaction queue, but limit to the concurrency limit. - Some((c + queue_len).min(VERIFICATION_CONCURRENCY_LIMIT)) + // Consider verifying all desired txs, but limit to the concurrency limit. + Some((c + num_transactions).min(VERIFICATION_CONCURRENCY_LIMIT)) } });