diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index 13197b39f9afe9..87183dd84ae628 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -4,11 +4,15 @@ use { futures::future::{TryJoin, TryJoin3}, log::error, quinn::{ + crypto::rustls::{QuicClientConfig, QuicServerConfig}, ClientConfig, ConnectError, Connecting, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendDatagramError, ServerConfig, TokioRuntime, TransportConfig, VarInt, }, - rustls::{Certificate, KeyLogFile, PrivateKey}, + rustls::{ + pki_types::{CertificateDer, PrivateKeyDer}, + CertificateError, KeyLogFile, + }, solana_gossip::contact_info::Protocol, solana_quic_client::nonblocking::quic_client::SkipServerVerification, solana_runtime::bank_forks::BankForks, @@ -55,9 +59,10 @@ use { // channeled to shred-fetch-stage using a Sender<(Pubkey, SocketAddr, Bytes)> // channel. // 3) ancestor_hashes_requests_quic: -// Outgoing RepairProtocol::AncestorHashes from the ancestor_hashes_service -// are received by the client through a AsyncReceiver<(SocketAddr, Bytes)> -// channel and sent to serve_repair_quic socket of the remote node. +// Outgoing RepairProtocol::AncestorHashes requests from the +// ancestor_hashes_service are received by the client through a +// AsyncReceiver<(SocketAddr, Bytes)> channel and sent to serve_repair_quic +// socket of the remote node. // Incoming AncestorHashesResponse are channeled back to // ancestor_hashes_service using a Sender<(Pubkey, SocketAddr, Bytes)> // channel. @@ -102,6 +107,8 @@ pub(crate) struct RemoteRequest { pub(crate) bytes: Bytes, } +// Async sender channel for directing outgoing packets from validator threads +// to QUIC clients. pub(crate) struct RepairQuicAsyncSenders { // Outgoing repair responses to remote repair requests from serve_repair. pub(crate) repair_response_quic_sender: AsyncSender<(SocketAddr, Bytes)>, @@ -113,9 +120,9 @@ pub(crate) struct RepairQuicAsyncSenders { } pub(crate) struct RepairQuicSockets { - // Socket receiving remote repair requests from the cluster, - // and sending back repair responses. - pub(crate) repair_service_quic_socket: UdpSocket, + // Socket receiving remote repair or ancestor hashes requests from the + // cluster, and sending back repair and ancestor hashes responses. + pub(crate) repair_server_quic_socket: UdpSocket, // Socket sending out local repair requests, // and receiving repair responses from the cluster. pub(crate) repair_client_quic_socket: UdpSocket, @@ -124,6 +131,8 @@ pub(crate) struct RepairQuicSockets { pub(crate) ancestor_hashes_quic_socket: UdpSocket, } +// Sender channel for directing incoming packets from QUIC servers to validator +// threads processing those packets. pub(crate) struct RepairQuicSenders { // Channel to send incoming repair requests from the cluster. pub(crate) repair_request_quic_sender: Sender, @@ -165,13 +174,13 @@ pub(crate) fn new_quic_endpoints( senders: RepairQuicSenders, bank_forks: Arc>, ) -> Result<([Endpoint; 3], RepairQuicAsyncSenders, AsyncTryJoinHandle), Error> { - let (repair_service_quic_endpoint, repair_response_quic_sender, repair_service_join_handle) = + let (repair_server_quic_endpoint, repair_response_quic_sender, repair_server_join_handle) = new_quic_endpoint( runtime, - "repair_service_quic_client", - "repair_service_quic_server", + "repair_server_quic_client", + "repair_server_quic_server", keypair, - sockets.repair_service_quic_socket, + sockets.repair_server_quic_socket, senders.repair_request_quic_sender, bank_forks.clone(), )?; @@ -200,7 +209,7 @@ pub(crate) fn new_quic_endpoints( )?; Ok(( [ - repair_service_quic_endpoint, + repair_server_quic_endpoint, repair_client_quic_endpoint, ancestor_hashes_quic_endpoint, ], @@ -210,7 +219,7 @@ pub(crate) fn new_quic_endpoints( ancestor_hashes_request_quic_sender, }, futures::future::try_join3( - repair_service_join_handle, + repair_server_join_handle, repair_client_join_handle, ancestor_hashes_join_handle, ), @@ -238,7 +247,7 @@ where T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send, { let (cert, key) = new_dummy_x509_certificate(keypair); - let server_config = new_server_config(cert.clone(), key.clone())?; + let server_config = new_server_config(cert.clone(), key.clone_key())?; let client_config = new_client_config(cert, key)?; let mut endpoint = { // Endpoint::new requires entering the runtime context, @@ -286,29 +295,38 @@ pub(crate) fn close_quic_endpoint(endpoint: &Endpoint) { ); } -fn new_server_config(cert: Certificate, key: PrivateKey) -> Result { +fn new_server_config( + cert: CertificateDer<'static>, + key: PrivateKeyDer<'static>, +) -> Result { let mut config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_client_cert_verifier(Arc::new(SkipClientVerification {})) + .with_client_cert_verifier(SkipClientVerification::new()) .with_single_cert(vec![cert], key)?; config.alpn_protocols = vec![ALPN_REPAIR_PROTOCOL_ID.to_vec()]; config.key_log = Arc::new(KeyLogFile::new()); + let Ok(config) = QuicServerConfig::try_from(config) else { + return Err(rustls::Error::InvalidCertificate( + CertificateError::BadSignature, + )); + }; let mut config = ServerConfig::with_crypto(Arc::new(config)); config .transport_config(Arc::new(new_transport_config())) - .use_retry(true) .migration(false); Ok(config) } -fn new_client_config(cert: Certificate, key: PrivateKey) -> Result { +fn new_client_config( + cert: CertificateDer<'static>, + key: PrivateKeyDer<'static>, +) -> Result { let mut config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_custom_certificate_verifier(Arc::new(SkipServerVerification {})) + .dangerous() + .with_custom_certificate_verifier(SkipServerVerification::new()) .with_client_auth_cert(vec![cert], key)?; config.enable_early_data = true; config.alpn_protocols = vec![ALPN_REPAIR_PROTOCOL_ID.to_vec()]; - let mut config = ClientConfig::new(Arc::new(config)); + let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(config).unwrap())); config.transport_config(Arc::new(new_transport_config())); Ok(config) } @@ -342,17 +360,26 @@ async fn run_server( { let stats = Arc::::default(); let report_metrics_task = tokio::task::spawn(report_metrics_task(server_name, stats.clone())); - while let Some(connecting) = endpoint.accept().await { - tokio::task::spawn(handle_connecting_task( - endpoint.clone(), - connecting, - sender.clone(), - bank_forks.clone(), - prune_cache_pending.clone(), - router.clone(), - cache.clone(), - stats.clone(), - )); + while let Some(incoming) = endpoint.accept().await { + let remote_addr: SocketAddr = incoming.remote_address(); + match incoming.accept() { + Ok(connecting) => { + tokio::task::spawn(handle_connecting_task( + endpoint.clone(), + connecting, + sender.clone(), + bank_forks.clone(), + prune_cache_pending.clone(), + router.clone(), + cache.clone(), + stats.clone(), + )); + } + Err(err) => { + debug!("Error while accepting incoming connection: {err:?} from {remote_addr}"); + record_error(&Error::from(err), &stats); + } + } } report_metrics_task.abort(); } @@ -404,6 +431,9 @@ async fn run_client( report_metrics_task.abort(); } +// Routes the payload to respective channel. +// Drops the payload if the channel is full. +// Bounces the payload back if the channel is closed or does not exist. fn try_route_bytes( remote_address: &SocketAddr, bytes: Bytes, @@ -810,10 +840,11 @@ impl From> for Error { #[derive(Default)] struct RepairQuicStats { + connect_error_cids_exhausted: AtomicU64, connect_error_invalid_remote_address: AtomicU64, connect_error_other: AtomicU64, - connect_error_too_many_connections: AtomicU64, connection_error_application_closed: AtomicU64, + connection_error_cids_exhausted: AtomicU64, connection_error_connection_closed: AtomicU64, connection_error_locally_closed: AtomicU64, connection_error_reset: AtomicU64, @@ -837,24 +868,27 @@ async fn report_metrics_task(name: &'static str, stats: Arc) { fn record_error(err: &Error, stats: &RepairQuicStats) { match err { Error::ChannelSendError => (), - Error::ConnectError(ConnectError::EndpointStopping) => { - add_metric!(stats.connect_error_other) - } - Error::ConnectError(ConnectError::TooManyConnections) => { - add_metric!(stats.connect_error_too_many_connections) + Error::ConnectError(ConnectError::CidsExhausted) => { + add_metric!(stats.connect_error_cids_exhausted) } - Error::ConnectError(ConnectError::InvalidDnsName(_)) => { + Error::ConnectError(ConnectError::EndpointStopping) => { add_metric!(stats.connect_error_other) } Error::ConnectError(ConnectError::InvalidRemoteAddress(_)) => { add_metric!(stats.connect_error_invalid_remote_address) } + Error::ConnectError(ConnectError::InvalidServerName(_)) => { + add_metric!(stats.connect_error_other) + } Error::ConnectError(ConnectError::NoDefaultClientConfig) => { add_metric!(stats.connect_error_other) } Error::ConnectError(ConnectError::UnsupportedVersion) => { add_metric!(stats.connect_error_other) } + Error::ConnectionError(ConnectionError::CidsExhausted) => { + add_metric!(stats.connection_error_cids_exhausted) + } Error::ConnectionError(ConnectionError::VersionMismatch) => { add_metric!(stats.connection_error_version_mismatch) } @@ -898,6 +932,11 @@ fn report_metrics(name: &'static str, stats: &RepairQuicStats) { } datapoint_info!( name, + ( + "connect_error_cids_exhausted", + reset_metric!(stats.connect_error_cids_exhausted), + i64 + ), ( "connect_error_invalid_remote_address", reset_metric!(stats.connect_error_invalid_remote_address), @@ -909,13 +948,13 @@ fn report_metrics(name: &'static str, stats: &RepairQuicStats) { i64 ), ( - "connect_error_too_many_connections", - reset_metric!(stats.connect_error_too_many_connections), + "connection_error_application_closed", + reset_metric!(stats.connection_error_application_closed), i64 ), ( - "connection_error_application_closed", - reset_metric!(stats.connection_error_application_closed), + "connection_error_cids_exhausted", + reset_metric!(stats.connection_error_cids_exhausted), i64 ), ( diff --git a/core/src/validator.rs b/core/src/validator.rs index fce0f5f83a0e39..815510611f7954 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1277,7 +1277,7 @@ impl Validator { (None, RepairQuicAsyncSenders::new_dummy(), None) } else { let repair_quic_sockets = RepairQuicSockets { - repair_service_quic_socket: node.sockets.serve_repair_quic, + repair_server_quic_socket: node.sockets.serve_repair_quic, repair_client_quic_socket: node.sockets.repair_quic, ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic, };