Skip to content

Commit

Permalink
quic tpu: lower stream timeout from 10s to 1s
Browse files Browse the repository at this point in the history
Don't wait 10s for one piece of a transaction to come before timing out.
Lower the timeout to 1s, after which we assume a stream is dead. 1s is
enough round trips to account for non catastrophic packet loss. If
packet loss is causing >1s stream latency, the connection is hosed and
the best thing we can do is save server resources for peers with better
connectivity.

This reduces the number of timeout futures we crate in the worst case by
10.
  • Loading branch information
alessandrod committed Oct 15, 2024
1 parent 0b71c7c commit 92e77a6
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 26 deletions.
38 changes: 14 additions & 24 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use {
tokio_util::sync::CancellationToken,
};

pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10);
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(1);

pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";

Expand Down Expand Up @@ -1103,19 +1103,14 @@ async fn handle_connection(
let stream_load_ema = stream_load_ema.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
// The min is to guard against a value too small which can wake up unnecessarily
// frequently and wasting CPU cycles. The max guard against waiting for too long
// which delay exit and cause some test failures when the timeout value is large.
// Within this value, the heuristic is to wake up 10 times to check for exit
// for the set timeout if there are no data.
let exit_check_interval = (wait_for_chunk_timeout / 10)
.clamp(Duration::from_millis(10), Duration::from_secs(1));
let mut start = Instant::now();
loop {
// Read the next chunk
// Read the next chunk, waiting up to `wait_for_chunk_timeout`. If we don't get a
// chunk before then, we assume the stream is dead and stop the stream task. This
// can only happen if there's severe packet loss or the peer stop sending for
// whatever reason.
let chunk = match tokio::select! {
chunk = tokio::time::timeout(
exit_check_interval,
wait_for_chunk_timeout,
stream.read_chunk(PACKET_DATA_SIZE, true)) => chunk,

// If the peer gets disconnected stop the task right away.
Expand All @@ -1133,15 +1128,11 @@ async fn handle_connection(
}
// timeout elapsed
Err(_) => {
if start.elapsed() >= wait_for_chunk_timeout {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
} else {
continue;
}
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
break;
}
};

Expand All @@ -1158,7 +1149,6 @@ async fn handle_connection(
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
start = Instant::now();
}

stats.total_streams.fetch_sub(1, Ordering::Relaxed);
Expand Down Expand Up @@ -1791,7 +1781,7 @@ pub mod test {
s1.write_all(&[0u8]).await.unwrap_or_default();

// Wait long enough for the stream to timeout in receiving chunks
let sleep_time = Duration::from_secs(3).min(WAIT_FOR_STREAM_TIMEOUT * 1000);
let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2;
sleep(sleep_time).await;

// Test that the stream was created, but timed out in read
Expand Down Expand Up @@ -1871,7 +1861,7 @@ pub mod test {
CONNECTION_CLOSE_REASON_DROPPED_ENTRY,
);
// Wait long enough for the stream to timeout in receiving chunks
let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000);
let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2;
sleep(sleep_time).await;

assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1);
Expand All @@ -1884,7 +1874,7 @@ pub mod test {
CONNECTION_CLOSE_REASON_DROPPED_ENTRY,
);
// Wait long enough for the stream to timeout in receiving chunks
let sleep_time = Duration::from_secs(1).min(WAIT_FOR_STREAM_TIMEOUT * 1000);
let sleep_time = DEFAULT_WAIT_FOR_CHUNK_TIMEOUT * 2;
sleep(sleep_time).await;

assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 2);
Expand Down
4 changes: 2 additions & 2 deletions streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
super::quic::{
spawn_server_multi, SpawnNonBlockingServerResult, ALPN_TPU_PROTOCOL_ID,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crate::{
quic::{StreamerStats, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
Expand All @@ -23,7 +24,6 @@ use {
std::{
net::{SocketAddr, UdpSocket},
sync::{atomic::AtomicBool, Arc, RwLock},
time::Duration,
},
tokio::task::JoinHandle,
};
Expand Down Expand Up @@ -201,7 +201,7 @@ pub fn setup_quic_server_with_sockets(
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_minute,
Duration::from_secs(2),
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
.unwrap();
Expand Down

0 comments on commit 92e77a6

Please sign in to comment.