Skip to content

Commit

Permalink
rebase & update
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Oct 1, 2024
1 parent 4513f49 commit 1482de4
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 45 deletions.
127 changes: 83 additions & 44 deletions core/src/repair/quic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ use {
futures::future::{TryJoin, TryJoin3},
log::error,
quinn::{
crypto::rustls::{QuicClientConfig, QuicServerConfig},
ClientConfig, ConnectError, Connecting, Connection, ConnectionError, Endpoint,
EndpointConfig, IdleTimeout, SendDatagramError, ServerConfig, TokioRuntime,
TransportConfig, VarInt,
},
rustls::{Certificate, KeyLogFile, PrivateKey},
rustls::{
pki_types::{CertificateDer, PrivateKeyDer},
CertificateError, KeyLogFile,
},
solana_gossip::contact_info::Protocol,
solana_quic_client::nonblocking::quic_client::SkipServerVerification,
solana_runtime::bank_forks::BankForks,
Expand Down Expand Up @@ -55,9 +59,10 @@ use {
// channeled to shred-fetch-stage using a Sender<(Pubkey, SocketAddr, Bytes)>
// channel.
// 3) ancestor_hashes_requests_quic:
// Outgoing RepairProtocol::AncestorHashes from the ancestor_hashes_service
// are received by the client through a AsyncReceiver<(SocketAddr, Bytes)>
// channel and sent to serve_repair_quic socket of the remote node.
// Outgoing RepairProtocol::AncestorHashes requests from the
// ancestor_hashes_service are received by the client through a
// AsyncReceiver<(SocketAddr, Bytes)> channel and sent to serve_repair_quic
// socket of the remote node.
// Incoming AncestorHashesResponse are channeled back to
// ancestor_hashes_service using a Sender<(Pubkey, SocketAddr, Bytes)>
// channel.
Expand Down Expand Up @@ -102,6 +107,8 @@ pub(crate) struct RemoteRequest {
pub(crate) bytes: Bytes,
}

// Async sender channel for directing outgoing packets from validator threads
// to QUIC clients.
pub(crate) struct RepairQuicAsyncSenders {
// Outgoing repair responses to remote repair requests from serve_repair.
pub(crate) repair_response_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
Expand All @@ -113,9 +120,9 @@ pub(crate) struct RepairQuicAsyncSenders {
}

pub(crate) struct RepairQuicSockets {
// Socket receiving remote repair requests from the cluster,
// and sending back repair responses.
pub(crate) repair_service_quic_socket: UdpSocket,
// Socket receiving remote repair or ancestor hashes requests from the
// cluster, and sending back repair and ancestor hashes responses.
pub(crate) repair_server_quic_socket: UdpSocket,
// Socket sending out local repair requests,
// and receiving repair responses from the cluster.
pub(crate) repair_client_quic_socket: UdpSocket,
Expand All @@ -124,6 +131,8 @@ pub(crate) struct RepairQuicSockets {
pub(crate) ancestor_hashes_quic_socket: UdpSocket,
}

// Sender channel for directing incoming packets from QUIC servers to validator
// threads processing those packets.
pub(crate) struct RepairQuicSenders {
// Channel to send incoming repair requests from the cluster.
pub(crate) repair_request_quic_sender: Sender<RemoteRequest>,
Expand Down Expand Up @@ -165,13 +174,13 @@ pub(crate) fn new_quic_endpoints(
senders: RepairQuicSenders,
bank_forks: Arc<RwLock<BankForks>>,
) -> Result<([Endpoint; 3], RepairQuicAsyncSenders, AsyncTryJoinHandle), Error> {
let (repair_service_quic_endpoint, repair_response_quic_sender, repair_service_join_handle) =
let (repair_server_quic_endpoint, repair_response_quic_sender, repair_server_join_handle) =
new_quic_endpoint(
runtime,
"repair_service_quic_client",
"repair_service_quic_server",
"repair_server_quic_client",
"repair_server_quic_server",
keypair,
sockets.repair_service_quic_socket,
sockets.repair_server_quic_socket,
senders.repair_request_quic_sender,
bank_forks.clone(),
)?;
Expand Down Expand Up @@ -200,7 +209,7 @@ pub(crate) fn new_quic_endpoints(
)?;
Ok((
[
repair_service_quic_endpoint,
repair_server_quic_endpoint,
repair_client_quic_endpoint,
ancestor_hashes_quic_endpoint,
],
Expand All @@ -210,7 +219,7 @@ pub(crate) fn new_quic_endpoints(
ancestor_hashes_request_quic_sender,
},
futures::future::try_join3(
repair_service_join_handle,
repair_server_join_handle,
repair_client_join_handle,
ancestor_hashes_join_handle,
),
Expand Down Expand Up @@ -238,7 +247,7 @@ where
T: 'static + From<(Pubkey, SocketAddr, Bytes)> + Send,
{
let (cert, key) = new_dummy_x509_certificate(keypair);
let server_config = new_server_config(cert.clone(), key.clone())?;
let server_config = new_server_config(cert.clone(), key.clone_key())?;
let client_config = new_client_config(cert, key)?;
let mut endpoint = {
// Endpoint::new requires entering the runtime context,
Expand Down Expand Up @@ -286,29 +295,38 @@ pub(crate) fn close_quic_endpoint(endpoint: &Endpoint) {
);
}

fn new_server_config(cert: Certificate, key: PrivateKey) -> Result<ServerConfig, rustls::Error> {
fn new_server_config(
cert: CertificateDer<'static>,
key: PrivateKeyDer<'static>,
) -> Result<ServerConfig, rustls::Error> {
let mut config = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_client_cert_verifier(Arc::new(SkipClientVerification {}))
.with_client_cert_verifier(SkipClientVerification::new())
.with_single_cert(vec![cert], key)?;
config.alpn_protocols = vec![ALPN_REPAIR_PROTOCOL_ID.to_vec()];
config.key_log = Arc::new(KeyLogFile::new());
let Ok(config) = QuicServerConfig::try_from(config) else {
return Err(rustls::Error::InvalidCertificate(
CertificateError::BadSignature,
));
};
let mut config = ServerConfig::with_crypto(Arc::new(config));
config
.transport_config(Arc::new(new_transport_config()))
.use_retry(true)
.migration(false);
Ok(config)
}

fn new_client_config(cert: Certificate, key: PrivateKey) -> Result<ClientConfig, rustls::Error> {
fn new_client_config(
cert: CertificateDer<'static>,
key: PrivateKeyDer<'static>,
) -> Result<ClientConfig, rustls::Error> {
let mut config = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(SkipServerVerification {}))
.dangerous()
.with_custom_certificate_verifier(SkipServerVerification::new())
.with_client_auth_cert(vec![cert], key)?;
config.enable_early_data = true;
config.alpn_protocols = vec![ALPN_REPAIR_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(config));
let mut config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(config).unwrap()));
config.transport_config(Arc::new(new_transport_config()));
Ok(config)
}
Expand Down Expand Up @@ -342,17 +360,26 @@ async fn run_server<T>(
{
let stats = Arc::<RepairQuicStats>::default();
let report_metrics_task = tokio::task::spawn(report_metrics_task(server_name, stats.clone()));
while let Some(connecting) = endpoint.accept().await {
tokio::task::spawn(handle_connecting_task(
endpoint.clone(),
connecting,
sender.clone(),
bank_forks.clone(),
prune_cache_pending.clone(),
router.clone(),
cache.clone(),
stats.clone(),
));
while let Some(incoming) = endpoint.accept().await {
let remote_addr: SocketAddr = incoming.remote_address();
match incoming.accept() {
Ok(connecting) => {
tokio::task::spawn(handle_connecting_task(
endpoint.clone(),
connecting,
sender.clone(),
bank_forks.clone(),
prune_cache_pending.clone(),
router.clone(),
cache.clone(),
stats.clone(),
));
}
Err(err) => {
debug!("Error while accepting incoming connection: {err:?} from {remote_addr}");
record_error(&Error::from(err), &stats);
}
}
}
report_metrics_task.abort();
}
Expand Down Expand Up @@ -404,6 +431,9 @@ async fn run_client<T>(
report_metrics_task.abort();
}

// Routes the payload to respective channel.
// Drops the payload if the channel is full.
// Bounces the payload back if the channel is closed or does not exist.
fn try_route_bytes(
remote_address: &SocketAddr,
bytes: Bytes,
Expand Down Expand Up @@ -810,10 +840,11 @@ impl<T> From<crossbeam_channel::SendError<T>> for Error {

#[derive(Default)]
struct RepairQuicStats {
connect_error_cids_exhausted: AtomicU64,
connect_error_invalid_remote_address: AtomicU64,
connect_error_other: AtomicU64,
connect_error_too_many_connections: AtomicU64,
connection_error_application_closed: AtomicU64,
connection_error_cids_exhausted: AtomicU64,
connection_error_connection_closed: AtomicU64,
connection_error_locally_closed: AtomicU64,
connection_error_reset: AtomicU64,
Expand All @@ -837,24 +868,27 @@ async fn report_metrics_task(name: &'static str, stats: Arc<RepairQuicStats>) {
fn record_error(err: &Error, stats: &RepairQuicStats) {
match err {
Error::ChannelSendError => (),
Error::ConnectError(ConnectError::EndpointStopping) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::TooManyConnections) => {
add_metric!(stats.connect_error_too_many_connections)
Error::ConnectError(ConnectError::CidsExhausted) => {
add_metric!(stats.connect_error_cids_exhausted)
}
Error::ConnectError(ConnectError::InvalidDnsName(_)) => {
Error::ConnectError(ConnectError::EndpointStopping) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::InvalidRemoteAddress(_)) => {
add_metric!(stats.connect_error_invalid_remote_address)
}
Error::ConnectError(ConnectError::InvalidServerName(_)) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::NoDefaultClientConfig) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectError(ConnectError::UnsupportedVersion) => {
add_metric!(stats.connect_error_other)
}
Error::ConnectionError(ConnectionError::CidsExhausted) => {
add_metric!(stats.connection_error_cids_exhausted)
}
Error::ConnectionError(ConnectionError::VersionMismatch) => {
add_metric!(stats.connection_error_version_mismatch)
}
Expand Down Expand Up @@ -898,6 +932,11 @@ fn report_metrics(name: &'static str, stats: &RepairQuicStats) {
}
datapoint_info!(
name,
(
"connect_error_cids_exhausted",
reset_metric!(stats.connect_error_cids_exhausted),
i64
),
(
"connect_error_invalid_remote_address",
reset_metric!(stats.connect_error_invalid_remote_address),
Expand All @@ -909,13 +948,13 @@ fn report_metrics(name: &'static str, stats: &RepairQuicStats) {
i64
),
(
"connect_error_too_many_connections",
reset_metric!(stats.connect_error_too_many_connections),
"connection_error_application_closed",
reset_metric!(stats.connection_error_application_closed),
i64
),
(
"connection_error_application_closed",
reset_metric!(stats.connection_error_application_closed),
"connection_error_cids_exhausted",
reset_metric!(stats.connection_error_cids_exhausted),
i64
),
(
Expand Down
2 changes: 1 addition & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,7 @@ impl Validator {
(None, RepairQuicAsyncSenders::new_dummy(), None)
} else {
let repair_quic_sockets = RepairQuicSockets {
repair_service_quic_socket: node.sockets.serve_repair_quic,
repair_server_quic_socket: node.sockets.serve_repair_quic,
repair_client_quic_socket: node.sockets.repair_quic,
ancestor_hashes_quic_socket: node.sockets.ancestor_hashes_requests_quic,
};
Expand Down

0 comments on commit 1482de4

Please sign in to comment.