Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new client implementation targeting TPU #2905

Merged
merged 11 commits into from
Oct 14, 2024

Conversation

KirillLykov
Copy link

@KirillLykov KirillLykov commented Sep 12, 2024

Problem

Although tpu-client, component which is currently used for sending transactions over TPU, suited for bulk transaction sent, it was not designed for handling the stream of transactions. Additionally, the call stack for sending transactions using tpu-client has grown too deep, making the code difficult to maintain.. This motivated us to create a new client implementation that is optimized for handling transaction streams and is built using Tokio from the start.

Initially, this implementation was part of the transaction-bench crate in the Invalidator repository, which is a client application for sending transactions that execute a program with a specified number of accounts. We now plan to move the networking component of this implementation to the Agave repository, making it reusable for different client applications and accessible to other parties.

The design was proposed by @alessandrod, who also used the transaction-bench tool for his testing activities. This component has been actively developed over the past several months and has undergone extensive review by @ilya-bobyr and @bw-solana. Documentation PRs were reviewed by @sam0x17.

Test results using transaction-bench will be added soon.

Summary of Changes

The main structure that is introduced in this PR is ConnectionWorkersScheduler.
Most of other components are private.

Key Components:

  1. ConnectionWorker
  • Represents a task that handles single connection to a peer
  • Receives transactions from its own transaction receiver
  • If a connection is closed or errors occur, it attempts to reconnect up to a maximum number of retries
  • Tracks connection and transaction statistics, such as failed transaction attempts.
  1. WorkerInfo
  • Stores information about worker to allow other task to communicate with ConnectionWorker (including the transaction sender and the handle to the worker’s task).
  1. ConnectionWorkersScheduler
  • Manages workers scheduling
  • Receives transaction from a channel and distributes them to workers
  • The scheduler runs in its own task, ensuring non-blocking performance
  1. WorkersCache
  • A cache that manages the lifecycle of workers using an LRU (Least Recently Used) cache
  • Tracks transaction statistics for each worker
  • Handles worker creation and retrieval, ensuring workers are reused efficiently to reduce overhead

Workflow

  1. ConnectionWorkersScheduler receives transactions from channel receiver.
  2. It fetches future leaders.
  3. It checks if the task handling connection for the next leader is already in the WorkersCache. If not, creates a new worker.
  4. Each worker (ConnectionWorker) handles a connection to a peer and processes transactions received through it's channel.
  5. Workers run in their own tasks, ensuring that each connection is managed asynchronously.
  6. The system is resilient to network failures, but does not retry undelivered transactions once the connection is restored.
  7. Workers report statistics back to the scheduler, such as successful/failed transaction attempts and connection errors.

Future work

There are some features that are not included in this PR:

  1. Send transactions to the next several leaders (fanout). Experiments show that this enhancement increases tps for both private cluster and testnet. Will be added in a separate PR.
  2. Use several endpoints with different stake for the same node. The server limits receive window and number of concurrent streams per connection to 512*PACKET_SIZE and 512 at most. These limits doesn't allow to use the full potential of the client. Will be added in a separate PR.

@KirillLykov
Copy link
Author

KirillLykov commented Sep 12, 2024

The CI breaks with tpu-client/src/nonblocking/tpu_client.rs checks (unrelated to the current PR changes). I addressed in #2913

@KirillLykov KirillLykov marked this pull request as ready for review September 12, 2024 13:09
Copy link

@godmodegalactus godmodegalactus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the PR looks good, would prefer a local cache (sorted by prioritization fees) for each connection worker, and back pressure logic implemented in connection worker as we know we can send only N transactions on a connection at a time for staked or unstaked connection. The N is calculated using stakes for the node.

/// to send the same transactions again.
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) {
let now = timestamp();
if now.saturating_sub(transactions.get_timestamp()) > MAX_PROCESSING_AGE_MS {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic could be done over block height, usually transaction is valid for 300 block, during epoch boundary first slot may take few seconds to come, in this case the transactions will be dropped.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would definitely be more precise, but I think there are 2 challenges:

  1. The transactions are already serialized into bytes at this point, so extracting blockhash/slot is tricky.
  2. Even if we're within the 150 block limit, by the time it gets sent/received/included in a block, we might be outside the window, so we would likely need to introduce some fudge factor anyways.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KirillLykov Do you know if we're hitting this and dumping tx batches during normal operation? I'm assuming it depends on where the bottleneck is between tx generation/scheduling/sending and the backpressure mechanism

Copy link
Author

@KirillLykov KirillLykov Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed these concerns with @godmodegalactus and Groovie|mango and I'm quite convinced that this thing must be implemented if we want to use this client not only for benchmarking cluster.
Probably, this filtering should be configurable because it is not always necessary (might be implemented by caller).
I would like to add these changes in the follow up PR because it is already heavy PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this should be in the caller somehow

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a simple way to move this check to the caller because it is a channel where caller puts his transactions and he cannot go over it and clean them up. But I think a decent solution is to do some age check optionally (controlled by boolean flag). Because actually there is only one correct way to check the age of tx and because there are only two possible options -- we want this check or not.

return;
}
let mut measure_send = Measure::start("send transaction batch");
for data in transactions.into_iter() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Track how many transactions have already been sent and being treated, for staked connections there is a way to calculate how many parallel streams a connection can have. Try to parallize only that many transactions at once.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to parallelize anything. Parallelizing makes things slower by creating fragmentation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We count how many txs have been sent in SendTransactionStats::successfully_sent counter. Regarding send in parallel using number of streams, it is no longer needed because on the server side will not allow us to send more than it can accept from a client with provided stake.

Copy link

@sam0x17 sam0x17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some feedback:

regarding error granularity:

  • The current approach to error handling in the send_transactions and create_connection methods works but could benefit from more granular logging, especially in critical error paths. For example, instead of just logging Unexpected error has happened..., it would be nice if you classify the types of unexpected errors so troubleshooting later on is easier.
  • In send_transactions, when a batch fails, perhaps it should log more details about which part of the stream failed and any available details about the connection state that could help with debugging

It might be beneficial to make some of the constants instead configurable via an env var or other configuration mechanism

Might be able to make use of the tokio-metrics crate for further optimization and debugging work and/or might be helpful to have it turned on here

@alessandrod
Copy link

back pressure logic implemented in connection worker as we know we can send only N transactions on a connection at a time for staked or unstaked connection. The N is calculated using stakes for the node.

There is no need to do this. write_all(data).await will stop when the client has finished its stake quota

sam0x17
sam0x17 previously approved these changes Sep 24, 2024
tpu-client-next/Cargo.toml Outdated Show resolved Hide resolved
Copy link

@bw-solana bw-solana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basic framework looks good to me. Left a few comments


#[async_trait]
impl LeaderUpdater for LeaderUpdaterService {
fn get_leaders(&self) -> Vec<SocketAddr> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nice to call this something like get_next_num_lookahead_slots_leaders. This is a mouthful... so maybe we can come up with something more concise, but the reason I mention this is that when I see the function get_leaders, my assumption is we're pulling down all of the leaders for the current epoch.

Another option is to add a function header comment to make it clear we only return the next N leaders based on our estimation of the current leader and the number of lookahead_slots defined when creating the leader updater

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be time to stop using java conventions and use name and set_name for getters and setters :P

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bw-solana I agree that get_leader might be misleading, changed to next_num_lookahead_slots_leaders as proposed.

debug!("Client endpoint bind address: {:?}", endpoint.local_addr());
let mut workers = WorkersCache::new(num_connections);
loop {
let Some(transaction_batch) = transaction_receiver.recv().await else {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be performance benefit to pulling all transaction batches off the receiver?

Copy link
Author

@KirillLykov KirillLykov Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean take all the batches from transaction_receiver and send them all after that?
The idea is that the bounded channel is used here (of size 2 in case of transaction-bench), so that we have a back-pressure on the sender side of the channel. If we take all the batches from the receiver, the generator part (which puts batches to sender) will create some new transactions although we maybe are slow in sending (due to throttling on the server).

};

/// Size of the channel to transmit transaction batches to the target workers.
const WORKER_CHANNEL_SIZE: usize = 2;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the assumption that we expect the tx generator/scheduler to be the bottleneck? Or do we ever expect to block on submitting tx batches to the workers?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bottleneck was always sending over network part because server limits number of streams/receive window size to quite limiting values. As far as I remember generator can easily generate 350k tps but the sender part will be limited by ~4k tps in the best scenario (with one connection, one stake, no fanout).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followup, but this (and possibly others) needs to be arguments to some call. The
client shouldn't make assumptions about at what rate txs are going to be produced.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will introduce a configuration structure with this and some other parameters to be specified

/// to send the same transactions again.
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) {
let now = timestamp();
if now.saturating_sub(transactions.get_timestamp()) > MAX_PROCESSING_AGE_MS {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would definitely be more precise, but I think there are 2 challenges:

  1. The transactions are already serialized into bytes at this point, so extracting blockhash/slot is tricky.
  2. Even if we're within the 150 block limit, by the time it gets sent/received/included in a block, we might be outside the window, so we would likely need to introduce some fudge factor anyways.

/// to send the same transactions again.
async fn send_transactions(&mut self, connection: Connection, transactions: TransactionBatch) {
let now = timestamp();
if now.saturating_sub(transactions.get_timestamp()) > MAX_PROCESSING_AGE_MS {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KirillLykov Do you know if we're hitting this and dumping tx batches during normal operation? I'm assuming it depends on where the bottleneck is between tx generation/scheduling/sending and the backpressure mechanism

@KirillLykov
Copy link
Author

@sam0x17

The current approach to error handling in the send_transactions and create_connection methods works but could benefit from more granular logging, especially in critical error paths. For example, instead of just logging Unexpected error has happened..., it would be nice if you classify the types of unexpected errors so troubleshooting later on is easier.

In case of ConnectError handling, I firstly record_error which mean increase error counters. After that, depending on the severty of the error I log it with different level (debug, warn, error):

record_error(connecting_error.clone().into(), &mut self.send_txs_stats);
match connecting_error {
    ConnectError::EndpointStopping => {
        debug!("Endpoint stopping, exit connection worker.");
        self.connection = ConnectionState::Closing;
    }
    ConnectError::InvalidRemoteAddress(_) => {
        warn!("Invalid remote address.");
        self.connection = ConnectionState::Closing;
    }
    e => {
        error!("Unexpected error has happen while trying to create connection {e}");
        self.connection = ConnectionState::Closing;
    }
}

So this classification in a way happens on the record_error. Or maybe I don't fully understand your comment.

In send_transactions, when a batch fails, perhaps it should log more details about which part of the stream failed and any available details about the connection state that could help with debugging

Do you mean to save in logs full error information returned by send_data_over_stream? I added this with trace level.

It might be beneficial to make some of the constants instead configurable via an env var or other configuration mechanism

Not sure it is good to add as part of this PR because it is already quite heavy, but might be a good idea if this crate will be used by client code outside of agave. How it is usually done? Using features?

Might be able to make use of the tokio-metrics crate for further optimization and debugging work and/or might be helpful to have it turned on here

Here there is no metrics to reduce the complexity of this PR. Probably will have to add another PR with metrics.

@KirillLykov
Copy link
Author

This PR was updated to use quinn v0.11 because agave meanwhile also started using it. This update is in the commit 1ed08b1 which is adaptation of the changes @ilya-bobyr did for the agave fork

@sam0x17
Copy link

sam0x17 commented Oct 8, 2024

How it is usually done? Using features?

nah features don't really have an interface for specifying settings in the Cargo.toml, though there are a few people who have tried to make this work

Usually you would use either std::env! (baked in at compile-time) or std::env::var (read at runtime). Happy path is probably using once_cell::Lazy in a static and having it check at runtime for an env var and use a default value if env var isn't present

but not necessary here, just something that could/should be undertaken if people eventually want to customize these values

sam0x17
sam0x17 previously approved these changes Oct 8, 2024
alessandrod
alessandrod previously approved these changes Oct 10, 2024

impl Default for QuicClientCertificate {
fn default() -> Self {
QuicClientCertificate::new(&Keypair::new())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? I'm always wary of adding ::default() impls that are
convenient but do the wrong thing. What is calling this? Can this be moved to
the caller?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is used only in one place (setup_endpoint) so no problem in moving it

/// Implementation of [`ServerCertVerifier`] that ignores the server certificate. But still checks
/// the TLS signatures.
///
/// This is a duplicate of `solana_quic_client::nonblocking::quic_client::SkipServerVerification`.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This last line seems unnecessary, hopefully the old quic-client goes away soon

/// to identify next leaders to send transactions to.
#[async_trait]
pub trait LeaderUpdater: Send {
fn next_num_lookahead_slots_leaders(&self) -> Vec<SocketAddr>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: strange name? I would have called it next_leaders

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it, was get_leaders: #2905 (comment)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next_leaders(num: usize) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While improving documentation have decided that it is better to move the number of lookahead slots to the trait. So applied this change

/// connection to the peer and handles state transitions. It runs
/// indefinitely until the connection is closed or an unrecoverable error
/// occurs.
pub async fn run(&mut self) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be fixed in a followup since it's probably not a small change, but with
the API as it is now, it's impossible to tell whether ConnectionWorker::run()
succeeded or failed (other than looking at stats, which is not a good API).

I think that run() should probably return Result, and whenever there's a fatal
un-retriable error it should return the actual error.

}

/// Retrieves the statistics for transactions sent by this worker.
pub fn get_transaction_stats(&self) -> &SendTransactionStats {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the convention in rust for getters is value() and set_value() for setters.
Happy if you want to fix this everywhere in a followup.

let mut res = TransportConfig::default();

let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
res.max_idle_timeout(Some(timeout));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having max idle only 2 * KEEP_ALIVE strikes me as wrong in case there's packet
loss. This is something we should measure.

macro_rules! display_send_transaction_stats_body {
($self:ident, $f:ident, $($field:ident),* $(,)?) => {
write!(
$f,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's going on with the indentation in this macro? 😂

}
}

async fn send_txs(&self, txs_batch: TransactionBatch) -> Result<(), WorkersCacheError> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: send_transactions. Be consistent with the names please, there's autistic
people among us.

};

/// Size of the channel to transmit transaction batches to the target workers.
const WORKER_CHANNEL_SIZE: usize = 2;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followup, but this (and possibly others) needs to be arguments to some call. The
client shouldn't make assumptions about at what rate txs are going to be produced.

lru = { workspace = true }
quinn = { workspace = true }
rustls = { workspace = true }
solana-connection-cache = { workspace = true }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, it sucks to have to depend on this for

enum Protocol {
	TCP,
	UDP
}

we should do something about it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think we should trim down all these deps as much as possible.
Break the cycle of violence!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is to have leader estimation service. A separate task on my roadmap to cover that component with tests, move it out from solana-connection-cache and give it some love in general.

peer: SocketAddr,
transactions_receiver: mpsc::Receiver<TransactionBatch>,
) -> (Self, CancellationToken) {
let cancel = CancellationToken::new();
Copy link
Author

@KirillLykov KirillLykov Oct 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have created a child token from the one received in scheduler and this way don't need to cancel explicitly in the shutdown. Not sure if worth the effort

This commit contains changes from the fork of agave repo used for
testing.
There, Illia updated tpu-client-next to use quinn v0.11 along with
added cancelation token to gracefully cancel ongoing work without
necessity to wait until recv will be checked.
@KirillLykov
Copy link
Author

@alessandrod I've address most of the "easy" comments. Also fixed docs formatting here: ca56c09

Other comments I will address in the follow up.

Copy link

@bw-solana bw-solana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Can address rest of the feedback in follow-ups

@KirillLykov KirillLykov merged commit f03bce5 into anza-xyz:master Oct 14, 2024
49 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants