Skip to content

Commit

Permalink
Update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Oct 14, 2024
1 parent 4a20c16 commit eb1fc76
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 100 deletions.
60 changes: 40 additions & 20 deletions tpu-client-next/src/connection_worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This module defines `ConnectionWorker` which encapsulates the functionality
//! This module defines [`ConnectionWorker`] which encapsulates the functionality
//! needed to handle one connection within the scope of task.

use {
super::SendTransactionStats,
crate::{
Expand Down Expand Up @@ -30,8 +31,9 @@ const RETRY_SLEEP_INTERVAL: Duration =
/// batches are dropped.
const MAX_PROCESSING_AGE_MS: u64 = MAX_PROCESSING_AGE as u64 * DEFAULT_MS_PER_SLOT;

/// [`ConnectionState`] represents the current state of a quic connection. This
/// enum tracks the lifecycle of connection from initial setup to closing phase.
/// [`ConnectionState`] represents the current state of a quic connection.
///
/// It tracks the lifecycle of connection from initial setup to closing phase.
/// The transition function between states is defined in `ConnectionWorker`
/// implementation.
enum ConnectionState {
Expand All @@ -42,21 +44,24 @@ enum ConnectionState {
}

impl Drop for ConnectionState {
/// When [`ConnectionState`] is dropped, underlying connection is closed
/// which means that there is no guarantee that the open streams will
/// finish.
fn drop(&mut self) {
if let Self::Active(connection) = self {
debug!(
"Close connection with {:?}, stats: {:?}. All pending streams will be dropped.",
connection.remote_address(),
connection.stats()
);
// no guarantee that all the streams will finish
connection.close(0u32.into(), b"done");
}
}
}

/// [`ConnectionWorker`] holds connection to the validator with address `peer`. If
/// connection has been closed, [`ConnectionWorker`] tries to reconnect
/// [`ConnectionWorker`] holds connection to the validator with address `peer`.
///
/// If connection has been closed, [`ConnectionWorker`] tries to reconnect
/// `max_reconnect_attempts` times. If connection is in `Active` state, it sends
/// transactions received from `transactions_receiver`. Additionally, it
/// accumulates statistics about connections and streams failures.
Expand All @@ -72,14 +77,23 @@ pub(crate) struct ConnectionWorker {
}

impl ConnectionWorker {
/// Constructs a [`ConnectionWorker`].
///
/// [`ConnectionWorker`] maintains a connection to a `peer` and processes
/// transactions from `transactions_receiver`. If
/// `skip_check_transaction_age` is set to `true`, the worker skips checking
/// for transaction blockhash expiration. The `max_reconnect_attempts`
/// parameter controls how many times the worker will attempt to reconnect
/// in case of connection failure. Returns the created `ConnectionWorker`
/// along with a cancellation token that can be used by the caller to stop
/// the worker.
pub fn new(
endpoint: Endpoint,
peer: SocketAddr,
transactions_receiver: mpsc::Receiver<TransactionBatch>,
skip_check_transaction_age: bool,
max_reconnect_attempts: usize,
) -> (Self, CancellationToken) {
// TODO(klykov): check if this token should be child of the scheduler token
let cancel = CancellationToken::new();

let this = Self {
Expand All @@ -96,10 +110,11 @@ impl ConnectionWorker {
(this, cancel)
}

/// Starts the main loop of the [`ConnectionWorker`]. This method manages the
/// connection to the peer and handles state transitions. It runs
/// indefinitely until the connection is closed or an unrecoverable error
/// occurs.
/// Starts the main loop of the [`ConnectionWorker`].
///
/// This method manages the connection to the peer and handles state
/// transitions. It runs indefinitely until the connection is closed or an
/// unrecoverable error occurs.
pub async fn run(&mut self) {
let cancel = self.cancel.clone();

Expand Down Expand Up @@ -145,12 +160,15 @@ impl ConnectionWorker {
&self.send_txs_stats
}

/// Sends a batch of transactions using the provided `connection`. Each
/// transaction in the batch is sent over the QUIC streams one at the time,
/// which prevents traffic fragmentation and shows better TPS in comparison
/// with multistream send. If the batch is determined to be outdated, it
/// will be dropped without being sent. In case of error, it doesn't retry
/// to send the same transactions again.
/// Sends a batch of transactions using the provided `connection`.
///
/// Each transaction in the batch is sent over the QUIC streams one at the
/// time, which prevents traffic fragmentation and shows better TPS in
/// comparison with multistream send. If the batch is determined to be
/// outdated and flag `skip_check_transaction_age` is unset, it will be
/// dropped without being sent.
///
/// In case of error, it doesn't retry to send the same transactions again.
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) {
let now = timestamp();
if !self.skip_check_transaction_age
Expand Down Expand Up @@ -179,9 +197,11 @@ impl ConnectionWorker {
);
}

/// Attempts to create a new connection to the specified `peer` address. If
/// the connection is successful, the state is updated to `Active`. If an
/// error occurs, the state may transition to `Retry` or `Closing`,
/// Attempts to create a new connection to the specified `peer` address.
///
/// If the connection is successful, the state is updated to `Active`.
///
/// If an error occurs, the state may transition to `Retry` or `Closing`,
/// depending on the nature of the error.
async fn create_connection(&mut self, max_retries_attempt: usize) {
let connecting = self.endpoint.connect(self.peer, "connect");
Expand Down
57 changes: 45 additions & 12 deletions tpu-client-next/src/connection_workers_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This module defines [`ConnectionWorkersScheduler`] which creates and
//! orchestrates `ConnectionWorker` instances.
//! This module defines [`ConnectionWorkersScheduler`] which sends transactions
//! to the upcoming leaders.

use {
super::{leader_updater::LeaderUpdater, SendTransactionStatsPerAddr},
crate::{
Expand All @@ -19,10 +20,15 @@ use {
tokio_util::sync::CancellationToken,
};

/// [`ConnectionWorkersScheduler`] is responsible for managing and scheduling
/// connection workers that handle transactions over the network.
/// The [`ConnectionWorkersScheduler`] sends transactions from the provided
/// receiver channel to upcoming leaders. It obtains information about future
/// leaders from the implementation of the [`LeaderUpdater`] trait.
///
/// Internally, it enables the management and coordination of multiple network
/// connections, schedules and oversees connection workers.
pub struct ConnectionWorkersScheduler;

/// Errors that arise from running [`ConnectionWorkersSchedulerError`].
#[derive(Debug, Error, PartialEq)]
pub enum ConnectionWorkersSchedulerError {
#[error(transparent)]
Expand All @@ -33,23 +39,49 @@ pub enum ConnectionWorkersSchedulerError {
LeaderReceiverDropped,
}

/// Configuration for the [`ConnectionWorkersScheduler`].
///
/// This struct holds the necessary settings to initialize and manage connection
/// workers, including network binding, identity, connection limits, and
/// behavior related to transaction handling.
pub struct ConnectionWorkersSchedulerConfig {
/// The local address to bind the scheduler to.
pub bind: SocketAddr,

/// Optional stake identity keypair used in the endpoint certificate for
/// identifying the sender.
pub stake_identity: Option<Keypair>,

/// The number of connections to be maintained by the scheduler.
pub num_connections: usize,

/// Whether to skip checking the transaction blockhash expiration.
pub skip_check_transaction_age: bool,
/// Size of the channel to transmit transaction batches to the target workers.
pub worker_channel_size: usize, // = 2;
/// Maximum number of reconnection attempts in case the connection errors out.
pub max_reconnect_attempts: usize, // = 4;

/// The size of the channel used to transmit transaction batches to the
/// worker tasks.
pub worker_channel_size: usize,

/// The maximum number of reconnection attempts allowed in case of
/// connection failure.
pub max_reconnect_attempts: usize,

/// The number of slots to look ahead during the leader estimation
/// procedure. Determines how far into the future leaders are estimated,
/// allowing connections to be established with those leaders in advance.
pub lookahead_slots: u64,
}

impl ConnectionWorkersScheduler {
/// Starts the scheduler, which manages the distribution of transactions to
/// the network's upcoming leaders.
///
/// Runs the main loop that handles worker scheduling and management for
/// connections. Returns the error quic statistics per connection address or
/// an error if something goes wrong. Importantly, if some transactions were
/// not delivered due to network problems, they will not be retried when the
/// problem is resolved.
/// an error.
///
/// Importantly, if some transactions were not delivered due to network
/// problems, they will not be retried when the problem is resolved.
pub async fn run(
ConnectionWorkersSchedulerConfig {
bind,
Expand All @@ -58,6 +90,7 @@ impl ConnectionWorkersScheduler {
skip_check_transaction_age,
worker_channel_size,
max_reconnect_attempts,
lookahead_slots,
}: ConnectionWorkersSchedulerConfig,
mut leader_updater: Box<dyn LeaderUpdater>,
mut transaction_receiver: mpsc::Receiver<TransactionBatch>,
Expand All @@ -81,7 +114,7 @@ impl ConnectionWorkersScheduler {
break;
}
};
let updated_leaders = leader_updater.next_num_lookahead_slots_leaders();
let updated_leaders = leader_updater.next_leaders(lookahead_slots);
let new_leader = &updated_leaders[0];
let future_leaders = &updated_leaders[1..];
if !workers.contains(new_leader) {
Expand Down
55 changes: 31 additions & 24 deletions tpu-client-next/src/leader_updater.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
//! This module provides [`LeaderUpdater`] trait along with it's implementations
//! `LeaderUpdaterService` and `PinnedLeaderUpdater`. Currently, the main
//! purpose of [`LeaderUpdater`] is to abstract over leader updates, hiding the
//! details of how leaders are retrieved and which structures are used.
//! Specifically, `LeaderUpdaterService` keeps [`LeaderTpuService`] internal to
//! this module. Yet, it also allows to implement custom leader estimation.
//! This module provides [`LeaderUpdater`] trait along with
//! `create_leader_updater` function to create an instance of this trait.
//!
//! Currently, the main purpose of [`LeaderUpdater`] is to abstract over leader
//! updates, hiding the details of how leaders are retrieved and which
//! structures are used. It contains trait implementations
//! `LeaderUpdaterService` and `PinnedLeaderUpdater`, where
//! `LeaderUpdaterService` keeps [`LeaderTpuService`] internal to this module.
//! Yet, it also allows to implement custom leader estimation.

use {
async_trait::async_trait,
log::*,
Expand All @@ -21,14 +25,23 @@ use {
};

/// [`LeaderUpdater`] trait abstracts out functionality required for the
/// [`ConnectionWorkersScheduler`](crate::network::ConnectionWorkersScheduler)
/// to identify next leaders to send transactions to.
/// [`ConnectionWorkersScheduler`](crate::ConnectionWorkersScheduler) to
/// identify next leaders to send transactions to.
#[async_trait]
pub trait LeaderUpdater: Send {
fn next_num_lookahead_slots_leaders(&self) -> Vec<SocketAddr>;
/// Returns next unique leaders for the next `lookahead_slots` starting from
/// current estimated slot.
///
/// If the current leader estimation is incorrect and transactions are sent to
/// only one estimated leader, there is a risk of losing all the transactions,
/// depending on the forwarding policy.
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr>;

/// Stop [`LeaderUpdater`] and releases all associated resources.
async fn stop(&mut self);
}

/// Error type for [`LeaderUpdater`].
pub struct LeaderUpdaterError;

impl fmt::Display for LeaderUpdaterError {
Expand All @@ -44,15 +57,15 @@ impl fmt::Debug for LeaderUpdaterError {
}

/// Creates a [`LeaderUpdater`] based on the configuration provided by the
/// caller. If a pinned address is provided, it will return a
/// `PinnedLeaderUpdater` that always returns the provided address instead of
/// checking leader schedule. Otherwise, it creates a `LeaderUpdaterService`
/// which dynamically updates the leaders by connecting to the network via the
/// [`LeaderTpuService`].
/// caller.
///
/// If `pinned_address` is provided, it returns a `PinnedLeaderUpdater` that
/// always returns the provided address instead of checking leader schedule.
/// Otherwise, it creates a `LeaderUpdaterService` which dynamically updates the
/// leaders by connecting to the network via the [`LeaderTpuService`].
pub async fn create_leader_updater(
rpc_client: Arc<RpcClient>,
websocket_url: String,
lookahead_slots: u64,
pinned_address: Option<SocketAddr>,
) -> Result<Box<dyn LeaderUpdater>, LeaderUpdaterError> {
if let Some(pinned_address) = pinned_address {
Expand All @@ -72,7 +85,6 @@ pub async fn create_leader_updater(
Ok(Box::new(LeaderUpdaterService {
leader_tpu_service,
exit,
lookahead_slots,
}))
}

Expand All @@ -82,17 +94,12 @@ pub async fn create_leader_updater(
struct LeaderUpdaterService {
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
/// Number of *estimated* next leaders. If the estimation for the current
/// leader is wrong and we have sent to only one leader, we may lose all the
/// txs (depends on the forwarding policy).
lookahead_slots: u64,
}

#[async_trait]
impl LeaderUpdater for LeaderUpdaterService {
fn next_num_lookahead_slots_leaders(&self) -> Vec<SocketAddr> {
self.leader_tpu_service
.leader_tpu_sockets(self.lookahead_slots)
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr> {
self.leader_tpu_service.leader_tpu_sockets(lookahead_slots)
}

async fn stop(&mut self) {
Expand All @@ -109,7 +116,7 @@ struct PinnedLeaderUpdater {

#[async_trait]
impl LeaderUpdater for PinnedLeaderUpdater {
fn next_num_lookahead_slots_leaders(&self) -> Vec<SocketAddr> {
fn next_leaders(&self, _lookahead_slots: u64) -> Vec<SocketAddr> {
self.address.clone()
}

Expand Down
8 changes: 4 additions & 4 deletions tpu-client-next/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
pub mod connection_worker;
pub(crate) mod connection_worker;
pub mod connection_workers_scheduler;
pub mod send_transaction_stats;
pub mod workers_cache;
pub(crate) mod workers_cache;
pub use crate::{
connection_workers_scheduler::{ConnectionWorkersScheduler, ConnectionWorkersSchedulerError},
send_transaction_stats::{SendTransactionStats, SendTransactionStatsPerAddr},
};
pub mod quic_networking;
pub use crate::quic_networking::QuicError;
pub(crate) mod quic_networking;
pub(crate) use crate::quic_networking::QuicError;
pub mod leader_updater;
pub mod transaction_batch;
8 changes: 4 additions & 4 deletions tpu-client-next/src/quic_networking/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ impl From<io::Error> for IoErrorWithPartialEq {
#[derive(Error, Debug, PartialEq)]
pub enum QuicError {
#[error(transparent)]
WriteError(#[from] WriteError),
StreamWrite(#[from] WriteError),
#[error(transparent)]
ConnectionError(#[from] ConnectionError),
Connection(#[from] ConnectionError),
#[error(transparent)]
ConnectError(#[from] ConnectError),
Connect(#[from] ConnectError),
#[error(transparent)]
EndpointError(#[from] IoErrorWithPartialEq),
Endpoint(#[from] IoErrorWithPartialEq),
}
Loading

0 comments on commit eb1fc76

Please sign in to comment.