diff --git a/tpu-client-next/src/connection_worker.rs b/tpu-client-next/src/connection_worker.rs index c19c3bd3c5a2ed..7d77bc3f6ed2a2 100644 --- a/tpu-client-next/src/connection_worker.rs +++ b/tpu-client-next/src/connection_worker.rs @@ -1,5 +1,6 @@ -//! This module defines `ConnectionWorker` which encapsulates the functionality +//! This module defines [`ConnectionWorker`] which encapsulates the functionality //! needed to handle one connection within the scope of task. + use { super::SendTransactionStats, crate::{ @@ -30,8 +31,9 @@ const RETRY_SLEEP_INTERVAL: Duration = /// batches are dropped. const MAX_PROCESSING_AGE_MS: u64 = MAX_PROCESSING_AGE as u64 * DEFAULT_MS_PER_SLOT; -/// [`ConnectionState`] represents the current state of a quic connection. This -/// enum tracks the lifecycle of connection from initial setup to closing phase. +/// [`ConnectionState`] represents the current state of a quic connection. +/// +/// It tracks the lifecycle of connection from initial setup to closing phase. /// The transition function between states is defined in `ConnectionWorker` /// implementation. enum ConnectionState { @@ -42,6 +44,9 @@ enum ConnectionState { } impl Drop for ConnectionState { + /// When [`ConnectionState`] is dropped, underlying connection is closed + /// which means that there is no guarantee that the open streams will + /// finish. fn drop(&mut self) { if let Self::Active(connection) = self { debug!( @@ -49,14 +54,14 @@ impl Drop for ConnectionState { connection.remote_address(), connection.stats() ); - // no guarantee that all the streams will finish connection.close(0u32.into(), b"done"); } } } -/// [`ConnectionWorker`] holds connection to the validator with address `peer`. If -/// connection has been closed, [`ConnectionWorker`] tries to reconnect +/// [`ConnectionWorker`] holds connection to the validator with address `peer`. +/// +/// If connection has been closed, [`ConnectionWorker`] tries to reconnect /// `max_reconnect_attempts` times. If connection is in `Active` state, it sends /// transactions received from `transactions_receiver`. Additionally, it /// accumulates statistics about connections and streams failures. @@ -72,6 +77,16 @@ pub(crate) struct ConnectionWorker { } impl ConnectionWorker { + /// Constructs a [`ConnectionWorker`]. + /// + /// [`ConnectionWorker`] maintains a connection to a `peer` and processes + /// transactions from `transactions_receiver`. If + /// `skip_check_transaction_age` is set to `true`, the worker skips checking + /// for transaction blockhash expiration. The `max_reconnect_attempts` + /// parameter controls how many times the worker will attempt to reconnect + /// in case of connection failure. Returns the created `ConnectionWorker` + /// along with a cancellation token that can be used by the caller to stop + /// the worker. pub fn new( endpoint: Endpoint, peer: SocketAddr, @@ -79,7 +94,6 @@ impl ConnectionWorker { skip_check_transaction_age: bool, max_reconnect_attempts: usize, ) -> (Self, CancellationToken) { - // TODO(klykov): check if this token should be child of the scheduler token let cancel = CancellationToken::new(); let this = Self { @@ -96,10 +110,11 @@ impl ConnectionWorker { (this, cancel) } - /// Starts the main loop of the [`ConnectionWorker`]. This method manages the - /// connection to the peer and handles state transitions. It runs - /// indefinitely until the connection is closed or an unrecoverable error - /// occurs. + /// Starts the main loop of the [`ConnectionWorker`]. + /// + /// This method manages the connection to the peer and handles state + /// transitions. It runs indefinitely until the connection is closed or an + /// unrecoverable error occurs. pub async fn run(&mut self) { let cancel = self.cancel.clone(); @@ -145,12 +160,15 @@ impl ConnectionWorker { &self.send_txs_stats } - /// Sends a batch of transactions using the provided `connection`. Each - /// transaction in the batch is sent over the QUIC streams one at the time, - /// which prevents traffic fragmentation and shows better TPS in comparison - /// with multistream send. If the batch is determined to be outdated, it - /// will be dropped without being sent. In case of error, it doesn't retry - /// to send the same transactions again. + /// Sends a batch of transactions using the provided `connection`. + /// + /// Each transaction in the batch is sent over the QUIC streams one at the + /// time, which prevents traffic fragmentation and shows better TPS in + /// comparison with multistream send. If the batch is determined to be + /// outdated and flag `skip_check_transaction_age` is unset, it will be + /// dropped without being sent. + /// + /// In case of error, it doesn't retry to send the same transactions again. async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) { let now = timestamp(); if !self.skip_check_transaction_age @@ -179,9 +197,11 @@ impl ConnectionWorker { ); } - /// Attempts to create a new connection to the specified `peer` address. If - /// the connection is successful, the state is updated to `Active`. If an - /// error occurs, the state may transition to `Retry` or `Closing`, + /// Attempts to create a new connection to the specified `peer` address. + /// + /// If the connection is successful, the state is updated to `Active`. + /// + /// If an error occurs, the state may transition to `Retry` or `Closing`, /// depending on the nature of the error. async fn create_connection(&mut self, max_retries_attempt: usize) { let connecting = self.endpoint.connect(self.peer, "connect"); diff --git a/tpu-client-next/src/connection_workers_scheduler.rs b/tpu-client-next/src/connection_workers_scheduler.rs index 18c649611aecc6..82b038827b48eb 100644 --- a/tpu-client-next/src/connection_workers_scheduler.rs +++ b/tpu-client-next/src/connection_workers_scheduler.rs @@ -1,5 +1,6 @@ -//! This module defines [`ConnectionWorkersScheduler`] which creates and -//! orchestrates `ConnectionWorker` instances. +//! This module defines [`ConnectionWorkersScheduler`] which sends transactions +//! to the upcoming leaders. + use { super::{leader_updater::LeaderUpdater, SendTransactionStatsPerAddr}, crate::{ @@ -19,10 +20,15 @@ use { tokio_util::sync::CancellationToken, }; -/// [`ConnectionWorkersScheduler`] is responsible for managing and scheduling -/// connection workers that handle transactions over the network. +/// The [`ConnectionWorkersScheduler`] sends transactions from the provided +/// receiver channel to upcoming leaders. It obtains information about future +/// leaders from the implementation of the [`LeaderUpdater`] trait. +/// +/// Internally, it enables the management and coordination of multiple network +/// connections, schedules and oversees connection workers. pub struct ConnectionWorkersScheduler; +/// Errors that arise from running [`ConnectionWorkersSchedulerError`]. #[derive(Debug, Error, PartialEq)] pub enum ConnectionWorkersSchedulerError { #[error(transparent)] @@ -33,23 +39,49 @@ pub enum ConnectionWorkersSchedulerError { LeaderReceiverDropped, } +/// Configuration for the [`ConnectionWorkersScheduler`]. +/// +/// This struct holds the necessary settings to initialize and manage connection +/// workers, including network binding, identity, connection limits, and +/// behavior related to transaction handling. pub struct ConnectionWorkersSchedulerConfig { + /// The local address to bind the scheduler to. pub bind: SocketAddr, + + /// Optional stake identity keypair used in the endpoint certificate for + /// identifying the sender. pub stake_identity: Option, + + /// The number of connections to be maintained by the scheduler. pub num_connections: usize, + + /// Whether to skip checking the transaction blockhash expiration. pub skip_check_transaction_age: bool, - /// Size of the channel to transmit transaction batches to the target workers. - pub worker_channel_size: usize, // = 2; - /// Maximum number of reconnection attempts in case the connection errors out. - pub max_reconnect_attempts: usize, // = 4; + + /// The size of the channel used to transmit transaction batches to the + /// worker tasks. + pub worker_channel_size: usize, + + /// The maximum number of reconnection attempts allowed in case of + /// connection failure. + pub max_reconnect_attempts: usize, + + /// The number of slots to look ahead during the leader estimation + /// procedure. Determines how far into the future leaders are estimated, + /// allowing connections to be established with those leaders in advance. + pub lookahead_slots: u64, } impl ConnectionWorkersScheduler { + /// Starts the scheduler, which manages the distribution of transactions to + /// the network's upcoming leaders. + /// /// Runs the main loop that handles worker scheduling and management for /// connections. Returns the error quic statistics per connection address or - /// an error if something goes wrong. Importantly, if some transactions were - /// not delivered due to network problems, they will not be retried when the - /// problem is resolved. + /// an error. + /// + /// Importantly, if some transactions were not delivered due to network + /// problems, they will not be retried when the problem is resolved. pub async fn run( ConnectionWorkersSchedulerConfig { bind, @@ -58,6 +90,7 @@ impl ConnectionWorkersScheduler { skip_check_transaction_age, worker_channel_size, max_reconnect_attempts, + lookahead_slots, }: ConnectionWorkersSchedulerConfig, mut leader_updater: Box, mut transaction_receiver: mpsc::Receiver, @@ -81,7 +114,7 @@ impl ConnectionWorkersScheduler { break; } }; - let updated_leaders = leader_updater.next_num_lookahead_slots_leaders(); + let updated_leaders = leader_updater.next_leaders(lookahead_slots); let new_leader = &updated_leaders[0]; let future_leaders = &updated_leaders[1..]; if !workers.contains(new_leader) { diff --git a/tpu-client-next/src/leader_updater.rs b/tpu-client-next/src/leader_updater.rs index 94a3fff1025212..5e07b9b0bfe612 100644 --- a/tpu-client-next/src/leader_updater.rs +++ b/tpu-client-next/src/leader_updater.rs @@ -1,9 +1,13 @@ -//! This module provides [`LeaderUpdater`] trait along with it's implementations -//! `LeaderUpdaterService` and `PinnedLeaderUpdater`. Currently, the main -//! purpose of [`LeaderUpdater`] is to abstract over leader updates, hiding the -//! details of how leaders are retrieved and which structures are used. -//! Specifically, `LeaderUpdaterService` keeps [`LeaderTpuService`] internal to -//! this module. Yet, it also allows to implement custom leader estimation. +//! This module provides [`LeaderUpdater`] trait along with +//! `create_leader_updater` function to create an instance of this trait. +//! +//! Currently, the main purpose of [`LeaderUpdater`] is to abstract over leader +//! updates, hiding the details of how leaders are retrieved and which +//! structures are used. It contains trait implementations +//! `LeaderUpdaterService` and `PinnedLeaderUpdater`, where +//! `LeaderUpdaterService` keeps [`LeaderTpuService`] internal to this module. +//! Yet, it also allows to implement custom leader estimation. + use { async_trait::async_trait, log::*, @@ -21,14 +25,23 @@ use { }; /// [`LeaderUpdater`] trait abstracts out functionality required for the -/// [`ConnectionWorkersScheduler`](crate::network::ConnectionWorkersScheduler) -/// to identify next leaders to send transactions to. +/// [`ConnectionWorkersScheduler`](crate::ConnectionWorkersScheduler) to +/// identify next leaders to send transactions to. #[async_trait] pub trait LeaderUpdater: Send { - fn next_num_lookahead_slots_leaders(&self) -> Vec; + /// Returns next unique leaders for the next `lookahead_slots` starting from + /// current estimated slot. + /// + /// If the current leader estimation is incorrect and transactions are sent to + /// only one estimated leader, there is a risk of losing all the transactions, + /// depending on the forwarding policy. + fn next_leaders(&self, lookahead_slots: u64) -> Vec; + + /// Stop [`LeaderUpdater`] and releases all associated resources. async fn stop(&mut self); } +/// Error type for [`LeaderUpdater`]. pub struct LeaderUpdaterError; impl fmt::Display for LeaderUpdaterError { @@ -44,15 +57,15 @@ impl fmt::Debug for LeaderUpdaterError { } /// Creates a [`LeaderUpdater`] based on the configuration provided by the -/// caller. If a pinned address is provided, it will return a -/// `PinnedLeaderUpdater` that always returns the provided address instead of -/// checking leader schedule. Otherwise, it creates a `LeaderUpdaterService` -/// which dynamically updates the leaders by connecting to the network via the -/// [`LeaderTpuService`]. +/// caller. +/// +/// If `pinned_address` is provided, it returns a `PinnedLeaderUpdater` that +/// always returns the provided address instead of checking leader schedule. +/// Otherwise, it creates a `LeaderUpdaterService` which dynamically updates the +/// leaders by connecting to the network via the [`LeaderTpuService`]. pub async fn create_leader_updater( rpc_client: Arc, websocket_url: String, - lookahead_slots: u64, pinned_address: Option, ) -> Result, LeaderUpdaterError> { if let Some(pinned_address) = pinned_address { @@ -72,7 +85,6 @@ pub async fn create_leader_updater( Ok(Box::new(LeaderUpdaterService { leader_tpu_service, exit, - lookahead_slots, })) } @@ -82,17 +94,12 @@ pub async fn create_leader_updater( struct LeaderUpdaterService { leader_tpu_service: LeaderTpuService, exit: Arc, - /// Number of *estimated* next leaders. If the estimation for the current - /// leader is wrong and we have sent to only one leader, we may lose all the - /// txs (depends on the forwarding policy). - lookahead_slots: u64, } #[async_trait] impl LeaderUpdater for LeaderUpdaterService { - fn next_num_lookahead_slots_leaders(&self) -> Vec { - self.leader_tpu_service - .leader_tpu_sockets(self.lookahead_slots) + fn next_leaders(&self, lookahead_slots: u64) -> Vec { + self.leader_tpu_service.leader_tpu_sockets(lookahead_slots) } async fn stop(&mut self) { @@ -109,7 +116,7 @@ struct PinnedLeaderUpdater { #[async_trait] impl LeaderUpdater for PinnedLeaderUpdater { - fn next_num_lookahead_slots_leaders(&self) -> Vec { + fn next_leaders(&self, _lookahead_slots: u64) -> Vec { self.address.clone() } diff --git a/tpu-client-next/src/lib.rs b/tpu-client-next/src/lib.rs index a74da8c7d5eef0..720b3876b47cb4 100644 --- a/tpu-client-next/src/lib.rs +++ b/tpu-client-next/src/lib.rs @@ -1,12 +1,12 @@ -pub mod connection_worker; +pub(crate) mod connection_worker; pub mod connection_workers_scheduler; pub mod send_transaction_stats; -pub mod workers_cache; +pub(crate) mod workers_cache; pub use crate::{ connection_workers_scheduler::{ConnectionWorkersScheduler, ConnectionWorkersSchedulerError}, send_transaction_stats::{SendTransactionStats, SendTransactionStatsPerAddr}, }; -pub mod quic_networking; -pub use crate::quic_networking::QuicError; +pub(crate) mod quic_networking; +pub(crate) use crate::quic_networking::QuicError; pub mod leader_updater; pub mod transaction_batch; diff --git a/tpu-client-next/src/quic_networking/error.rs b/tpu-client-next/src/quic_networking/error.rs index efc2e1d12a96a0..8fa79265cb69a4 100644 --- a/tpu-client-next/src/quic_networking/error.rs +++ b/tpu-client-next/src/quic_networking/error.rs @@ -39,11 +39,11 @@ impl From for IoErrorWithPartialEq { #[derive(Error, Debug, PartialEq)] pub enum QuicError { #[error(transparent)] - WriteError(#[from] WriteError), + StreamWrite(#[from] WriteError), #[error(transparent)] - ConnectionError(#[from] ConnectionError), + Connection(#[from] ConnectionError), #[error(transparent)] - ConnectError(#[from] ConnectError), + Connect(#[from] ConnectError), #[error(transparent)] - EndpointError(#[from] IoErrorWithPartialEq), + Endpoint(#[from] IoErrorWithPartialEq), } diff --git a/tpu-client-next/src/send_transaction_stats.rs b/tpu-client-next/src/send_transaction_stats.rs index 275c4a582605ea..abe68b8bf60213 100644 --- a/tpu-client-next/src/send_transaction_stats.rs +++ b/tpu-client-next/src/send_transaction_stats.rs @@ -31,63 +31,63 @@ pub struct SendTransactionStats { #[allow(clippy::arithmetic_side_effects)] pub fn record_error(err: QuicError, stats: &mut SendTransactionStats) { match err { - QuicError::ConnectError(ConnectError::EndpointStopping) => { + QuicError::Connect(ConnectError::EndpointStopping) => { stats.connect_error_other += 1; } - QuicError::ConnectError(ConnectError::CidsExhausted) => { + QuicError::Connect(ConnectError::CidsExhausted) => { stats.connect_error_cids_exhausted += 1; } - QuicError::ConnectError(ConnectError::InvalidServerName(_)) => { + QuicError::Connect(ConnectError::InvalidServerName(_)) => { stats.connect_error_other += 1; } - QuicError::ConnectError(ConnectError::InvalidRemoteAddress(_)) => { + QuicError::Connect(ConnectError::InvalidRemoteAddress(_)) => { stats.connect_error_invalid_remote_address += 1; } - QuicError::ConnectError(ConnectError::NoDefaultClientConfig) => { + QuicError::Connect(ConnectError::NoDefaultClientConfig) => { stats.connect_error_other += 1; } - QuicError::ConnectError(ConnectError::UnsupportedVersion) => { + QuicError::Connect(ConnectError::UnsupportedVersion) => { stats.connect_error_other += 1; } - QuicError::ConnectionError(ConnectionError::VersionMismatch) => { + QuicError::Connection(ConnectionError::VersionMismatch) => { stats.connection_error_version_mismatch += 1; } - QuicError::ConnectionError(ConnectionError::TransportError(_)) => { + QuicError::Connection(ConnectionError::TransportError(_)) => { stats.connection_error_transport_error += 1; } - QuicError::ConnectionError(ConnectionError::ConnectionClosed(_)) => { + QuicError::Connection(ConnectionError::ConnectionClosed(_)) => { stats.connection_error_connection_closed += 1; } - QuicError::ConnectionError(ConnectionError::ApplicationClosed(_)) => { + QuicError::Connection(ConnectionError::ApplicationClosed(_)) => { stats.connection_error_application_closed += 1; } - QuicError::ConnectionError(ConnectionError::Reset) => { + QuicError::Connection(ConnectionError::Reset) => { stats.connection_error_reset += 1; } - QuicError::ConnectionError(ConnectionError::TimedOut) => { + QuicError::Connection(ConnectionError::TimedOut) => { stats.connection_error_timed_out += 1; } - QuicError::ConnectionError(ConnectionError::LocallyClosed) => { + QuicError::Connection(ConnectionError::LocallyClosed) => { stats.connection_error_locally_closed += 1; } - QuicError::ConnectionError(ConnectionError::CidsExhausted) => { + QuicError::Connection(ConnectionError::CidsExhausted) => { stats.connection_error_cids_exhausted += 1; } - QuicError::WriteError(WriteError::Stopped(_)) => { + QuicError::StreamWrite(WriteError::Stopped(_)) => { stats.write_error_stopped += 1; } - QuicError::WriteError(WriteError::ConnectionLost(_)) => { + QuicError::StreamWrite(WriteError::ConnectionLost(_)) => { stats.write_error_connection_lost += 1; } - QuicError::WriteError(WriteError::ClosedStream) => { + QuicError::StreamWrite(WriteError::ClosedStream) => { stats.write_error_closed_stream += 1; } - QuicError::WriteError(WriteError::ZeroRttRejected) => { + QuicError::StreamWrite(WriteError::ZeroRttRejected) => { stats.write_error_zero_rtt_rejected += 1; } // Endpoint is created on the scheduler level and handled separately // No counters are used for this case. - QuicError::EndpointError(_) => (), + QuicError::Endpoint(_) => (), } } diff --git a/tpu-client-next/src/transaction_batch.rs b/tpu-client-next/src/transaction_batch.rs index f42c9a993f3aeb..a3c2d92fc8386e 100644 --- a/tpu-client-next/src/transaction_batch.rs +++ b/tpu-client-next/src/transaction_batch.rs @@ -1,9 +1,9 @@ //! This module holds [`TransactionBatch`] structure. + use solana_sdk::timing::timestamp; -/// Batch of generated transactions -/// timestamp is used to discard batches which are too old -/// to have valid blockhash. +/// Batch of generated transactions timestamp is used to discard batches which +/// are too old to have valid blockhash. #[derive(Clone, PartialEq)] pub struct TransactionBatch { wired_transactions: Vec, diff --git a/tpu-client-next/src/workers_cache.rs b/tpu-client-next/src/workers_cache.rs index 9fb29e9ef1fe6a..90d2954b669d7f 100644 --- a/tpu-client-next/src/workers_cache.rs +++ b/tpu-client-next/src/workers_cache.rs @@ -37,7 +37,10 @@ impl WorkerInfo { } } - async fn send_txs(&self, txs_batch: TransactionBatch) -> Result<(), WorkersCacheError> { + async fn send_transactions( + &self, + txs_batch: TransactionBatch, + ) -> Result<(), WorkersCacheError> { self.sender .send(txs_batch) .await @@ -45,7 +48,8 @@ impl WorkerInfo { Ok(()) } - /// Closes the worker by dropping the sender and awaiting the worker's statistics. + /// Closes the worker by dropping the sender and awaiting the worker's + /// statistics. async fn shutdown(self) -> Result { self.cancel.cancel(); drop(self.sender); @@ -121,7 +125,7 @@ impl WorkersCache { "Failed to fetch worker for peer {peer}.\n\ Peer existence must be checked before this call using `contains` method.", ); - let send_res = current_worker.send_txs(txs_batch).await; + let send_res = current_worker.send_transactions(txs_batch).await; if let Err(WorkersCacheError::ReceiverDropped) = send_res { // Remove the worker from the cache, if the peer has disconnected. diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 7e9e7780d5236b..0ffabb6640f7a3 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -23,7 +23,7 @@ use { }, std::{ collections::HashMap, - net::{IpAddr, Ipv6Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, num::Saturating, str::FromStr, sync::{atomic::Ordering, Arc}, @@ -40,6 +40,18 @@ use { tokio_util::sync::CancellationToken, }; +fn test_config(validator_identity: Option) -> ConnectionWorkersSchedulerConfig { + ConnectionWorkersSchedulerConfig { + bind: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0), + stake_identity: validator_identity, + num_connections: 1, + skip_check_transaction_age: false, + worker_channel_size: 2, + max_reconnect_attempts: 4, + lookahead_slots: 1, + } +} + async fn setup_connection_worker_scheduler( tpu_address: SocketAddr, transaction_receiver: Receiver, @@ -57,20 +69,12 @@ async fn setup_connection_worker_scheduler( )); // Setup sending txs - let leader_updater = create_leader_updater(rpc_client, websocket_url, 1, Some(tpu_address)) + let leader_updater = create_leader_updater(rpc_client, websocket_url, Some(tpu_address)) .await .expect("Leader updates was successfully created"); - let bind: SocketAddr = "127.0.0.1:0".parse().unwrap(); let cancel = CancellationToken::new(); - let config = ConnectionWorkersSchedulerConfig { - bind, - stake_identity: validator_identity, - num_connections: 1, - skip_check_transaction_age: false, - worker_channel_size: 2, - max_reconnect_attempts: 4, - }; + let config = test_config(validator_identity); let scheduler = tokio::spawn(ConnectionWorkersScheduler::run( config, leader_updater,