Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit amount of transactions which clients verify in parallel #2970

Closed
wants to merge 9 commits into from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ features = [ "sink" ]
[dependencies.indexmap]
version = "2.1"

[dependencies.lru]
version = "0.12.1"

[dependencies.metrics]
package = "snarkos-node-metrics"
path = "./metrics"
Expand Down
97 changes: 93 additions & 4 deletions node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ mod router;

use crate::traits::NodeInterface;
use snarkos_account::Account;
use snarkos_node_bft::ledger_service::CoreLedgerService;
use snarkos_node_bft::{ledger_service::CoreLedgerService, MAX_TRANSMISSIONS_PER_BATCH};
use snarkos_node_rest::Rest;
use snarkos_node_router::{
messages::{Message, NodeType, UnconfirmedSolution},
messages::{Message, NodeType, UnconfirmedSolution, UnconfirmedTransaction},
Heartbeat,
Inbound,
Outbound,
Expand All @@ -39,17 +39,29 @@ use snarkvm::{
store::ConsensusStorage,
Ledger,
},
prelude::block::Transaction,
};

use anyhow::Result;
use core::future::Future;
use lru::LruCache;
use parking_lot::Mutex;
use std::{
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
num::NonZeroUsize,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed},
Arc,
},
};
use tokio::task::JoinHandle;

const VERIFICATION_CONCURRENCY_LIMIT: usize = 6; // 8 deployments of MAX_NUM_CONSTRAINTS will run out of memory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be dynamic based on your machine?

How much memory was this estimation based on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was based on the Clients: 16GB of memory requirement in the snarkOS README. Please confirm if you want me to make it a dynamic value from a cli flag.

Sidenote: given our syncing issues I understood the CPU/memory requirements need to go up anyway.


/// Transaction details needed for propagation.
/// We preserve the serialized transaction for faster propagation.
type TransactionContents<N> = (SocketAddr, UnconfirmedTransaction<N>, Transaction<N>);
ljedrz marked this conversation as resolved.
Show resolved Hide resolved

/// A client node is a full node, capable of querying with the network.
#[derive(Clone)]
pub struct Client<N: Network, C: ConsensusStorage<N>> {
Expand All @@ -65,6 +77,10 @@ pub struct Client<N: Network, C: ConsensusStorage<N>> {
genesis: Block<N>,
/// The coinbase puzzle.
coinbase_puzzle: CoinbasePuzzle<N>,
/// The unconfirmed transactions queue.
transaction_queue: Arc<Mutex<LruCache<N::TransactionID, TransactionContents<N>>>>,
/// The amount of transactions being verified concurrently.
verification_counter: Arc<AtomicUsize>,
/// The spawned handles.
handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
/// The shutdown signal.
Expand Down Expand Up @@ -129,6 +145,10 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
sync: Arc::new(sync),
genesis,
coinbase_puzzle,
transaction_queue: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(MAX_TRANSMISSIONS_PER_BATCH).unwrap(),
))),
verification_counter: Default::default(),
handles: Default::default(),
shutdown,
};
Expand All @@ -141,6 +161,8 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
node.initialize_routing().await;
// Initialize the sync module.
node.initialize_sync();
// Initialize transaction verification.
node.initialize_transaction_verification();
// Initialize the notification message loop.
node.handles.lock().push(crate::start_notification_message_loop());
// Pass the node to the signal handler.
Expand Down Expand Up @@ -168,7 +190,7 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
self.handles.lock().push(tokio::spawn(async move {
loop {
// If the Ctrl-C handler registered the signal, stop the node.
if node.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
if node.shutdown.load(Relaxed) {
info!("Shutting down block production");
break;
}
Expand All @@ -181,6 +203,73 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
}));
}

/// Initializes transaction verification.
fn initialize_transaction_verification(&self) {
// Start the transaction verification loop.
let node = self.clone();
self.handles.lock().push(tokio::spawn(async move {
loop {
// If the Ctrl-C handler registered the signal, stop the node.
if node.shutdown.load(Relaxed) {
info!("Shutting down transaction verification");
break;
}

// Sleep briefly to allow transactions to be validated.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
ljedrz marked this conversation as resolved.
Show resolved Hide resolved

// Determine the length of the queue.
let queue_len = node.transaction_queue.lock().len();

// If we don't have any transactions to verify, try again later.
if queue_len == 0 {
continue;
}

// Determine how many transactions we want to verify.
let num_transactions = VERIFICATION_CONCURRENCY_LIMIT.min(queue_len);

// Check if we have room to verify the transactions and update the counter accordingly.
let previous_verification_counter = node.verification_counter.fetch_update(Relaxed, Relaxed, |c| {
// If we are already verifying sufficient transactions, don't verify any more for now.
if c >= VERIFICATION_CONCURRENCY_LIMIT {
None
// If we have space to verify more transactions, verify as many as we can.
} else {
// Consider verifying the full transaction queue, but limit to the concurrency limit.
Some((c + queue_len).min(VERIFICATION_CONCURRENCY_LIMIT))
}
});

// Determine how many transactions we cán verify.
let num_transactions = match previous_verification_counter {
// Determine how many transactions we can verify.
Ok(previous_value) => num_transactions.saturating_sub(previous_value),
// If we are already verifying sufficient transactions, don't verify any more for now.
Err(_) => continue,
};

// For each transaction, spawn a task to verify it.
let mut tx_queue = node.transaction_queue.lock();
for _ in 0..num_transactions {
if let Some((_, (peer_ip, serialized, transaction))) = tx_queue.pop_lru() {
let _node = node.clone();
tokio::spawn(async move {
vicsn marked this conversation as resolved.
Show resolved Hide resolved
// Check the transaction.
if _node.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()).is_ok()
{
// Propagate the `UnconfirmedTransaction`.
_node.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);
}
// Reduce the verification counter.
_node.verification_counter.fetch_sub(1, Relaxed);
ljedrz marked this conversation as resolved.
Show resolved Hide resolved
});
}
}
}
}));
}

/// Spawns a task with the given future; it should only be used for long-running tasks.
pub fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.handles.lock().push(tokio::spawn(future));
Expand Down
12 changes: 7 additions & 5 deletions node/src/client/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,13 @@ impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {
if transaction.is_fee() {
return true; // Maintain the connection.
}
// Check that the transaction is well-formed and unique.
if self.ledger.check_transaction_basic(&transaction, None, &mut rand::thread_rng()).is_ok() {
// Propagate the `UnconfirmedTransaction`.
self.propagate(Message::UnconfirmedTransaction(serialized), &[peer_ip]);

// Try to add the transaction to the verification queue, without changing LRU status of known txs.
let mut tx_queue = self.transaction_queue.lock();
if tx_queue.peek(&transaction.id()).is_none() {
ljedrz marked this conversation as resolved.
Show resolved Hide resolved
tx_queue.put(transaction.id(), (peer_ip, serialized, transaction));
}
true

true // Maintain the connection
}
}