Skip to content

Commit

Permalink
uses datagrams for repair over QUIC protocol
Browse files Browse the repository at this point in the history
Incoming packets can be either:
  RepairProtocol
  RepairResponse or Shred + repair Nonce
  AncestorHashesResponse
So, we need 3 QUIC endpoints on 3 separate sockets to correctly distinguish
between these packets and send them down the right channel.
1) serve_repair_quic:
  The server side receives incoming RepairProtocols from the cluster and
  channels them to serve_repair using a Sender<RemoteRequest> channel.
  The outgoing repair (or ancestor hashes) responses from serve_repair are
  sent back to the client side through a AsyncReceiver<(SocketAddr, Bytes)>
  channel and sent back to the remote node.
2) repair_quic:
  Outgoing repair requests from the repair_service are received by the
  client through a AsyncReceiver<(SocketAddr, Bytes)> channel and sent to
  serve_repair_quic socket of the remote node.
  Incoming repair responses (RepairResponse or Shred + repair Nonce) are
  channeled to shred-fetch-stage using a Sender<(Pubkey, SocketAddr, Bytes)>
  channel.
3) ancestor_hashes_requests_quic:
  Outgoing RepairProtocol::AncestorHashes from the ancestor_hashes_service
  are received by the client through a AsyncReceiver<(SocketAddr, Bytes)>
  channel and sent to serve_repair_quic socket of the remote node.
  Incoming AncestorHashesResponse are channeled back to
  ancestor_hashes_service using a Sender<(Pubkey, SocketAddr, Bytes)>
  channel.
  • Loading branch information
behzadnouri committed Oct 1, 2024
1 parent 6f26b65 commit 4513f49
Show file tree
Hide file tree
Showing 11 changed files with 604 additions and 746 deletions.
100 changes: 37 additions & 63 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@ use {
},
outstanding_requests::OutstandingRequests,
packet_threshold::DynamicPacketToProcessThreshold,
quic_endpoint::LocalRequest,
repair_service::{AncestorDuplicateSlotsSender, RepairInfo, RepairStatsGroup},
request_response::RequestResponse,
serve_repair::{
self, AncestorHashesRepairType, AncestorHashesResponse, RepairProtocol, ServeRepair,
},
},
replay_stage::DUPLICATE_THRESHOLD,
shred_fetch_stage::receive_repair_quic_packets,
shred_fetch_stage::receive_quic_datagrams,
},
bincode::serialize,
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
dashmap::{mapref::entry::Entry::Occupied, DashMap},
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol, ping_pong::Pong},
solana_ledger::blockstore::Blockstore,
solana_perf::{
packet::{deserialize_from_with_limit, Packet, PacketBatch},
packet::{deserialize_from_with_limit, Packet, PacketBatch, PacketFlags},
recycler::Recycler,
},
solana_runtime::bank::Bank,
Expand Down Expand Up @@ -153,7 +152,8 @@ impl AncestorHashesService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
ancestor_hashes_request_socket: Arc<UdpSocket>,
quic_endpoint_sender: AsyncSender<LocalRequest>,
ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
ancestor_hashes_response_quic_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
repair_info: RepairInfo,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
Expand All @@ -171,17 +171,17 @@ impl AncestorHashesService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
false, // is_staked_service
);

let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded();
let t_receiver_quic = {
let exit = exit.clone();
Builder::new()
.name(String::from("solAncHashQuic"))
.spawn(|| {
receive_repair_quic_packets(
quic_endpoint_response_receiver,
receive_quic_datagrams(
ancestor_hashes_response_quic_receiver,
PacketFlags::REPAIR,
response_sender,
Recycler::default(),
exit,
Expand Down Expand Up @@ -210,8 +210,7 @@ impl AncestorHashesService {
let t_ancestor_requests = Self::run_manage_ancestor_requests(
ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
quic_endpoint_sender,
quic_endpoint_response_sender,
ancestor_hashes_request_quic_sender,
repair_info,
outstanding_requests,
exit,
Expand Down Expand Up @@ -586,8 +585,7 @@ impl AncestorHashesService {
fn run_manage_ancestor_requests(
ancestor_hashes_request_statuses: Arc<DashMap<Slot, AncestorRequestStatus>>,
ancestor_hashes_request_socket: Arc<UdpSocket>,
quic_endpoint_sender: AsyncSender<LocalRequest>,
quic_endpoint_response_sender: Sender<(SocketAddr, Vec<u8>)>,
ancestor_hashes_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
repair_info: RepairInfo,
outstanding_requests: Arc<RwLock<OutstandingAncestorHashesRepairs>>,
exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -627,8 +625,7 @@ impl AncestorHashesService {
Self::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand All @@ -650,8 +647,7 @@ impl AncestorHashesService {
fn manage_ancestor_requests(
ancestor_hashes_request_statuses: &DashMap<Slot, AncestorRequestStatus>,
ancestor_hashes_request_socket: &UdpSocket,
quic_endpoint_sender: &AsyncSender<LocalRequest>,
quic_endpoint_response_sender: &Sender<(SocketAddr, Vec<u8>)>,
ancestor_hashes_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>,
repair_info: &RepairInfo,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver,
Expand Down Expand Up @@ -750,8 +746,7 @@ impl AncestorHashesService {
if Self::initiate_ancestor_hashes_requests_for_duplicate_slot(
ancestor_hashes_request_statuses,
ancestor_hashes_request_socket,
quic_endpoint_sender,
quic_endpoint_response_sender,
ancestor_hashes_request_quic_sender,
&repair_info.cluster_slots,
serve_repair,
&repair_info.repair_validators,
Expand Down Expand Up @@ -829,8 +824,7 @@ impl AncestorHashesService {
fn initiate_ancestor_hashes_requests_for_duplicate_slot(
ancestor_hashes_request_statuses: &DashMap<Slot, AncestorRequestStatus>,
ancestor_hashes_request_socket: &UdpSocket,
quic_endpoint_sender: &AsyncSender<LocalRequest>,
quic_endpoint_response_sender: &Sender<(SocketAddr, Vec<u8>)>,
ancestor_hashes_request_quic_sender: &AsyncSender<(SocketAddr, Bytes)>,
cluster_slots: &ClusterSlots,
serve_repair: &ServeRepair,
repair_validators: &Option<HashSet<Pubkey>>,
Expand Down Expand Up @@ -873,16 +867,10 @@ impl AncestorHashesService {
let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket_addr);
}
Protocol::QUIC => {
let num_expected_responses =
usize::try_from(ancestor_hashes_repair_type.num_expected_responses())
.unwrap();
let request = LocalRequest {
remote_address: *socket_addr,
bytes: request_bytes,
num_expected_responses,
response_sender: quic_endpoint_response_sender.clone(),
};
if quic_endpoint_sender.blocking_send(request).is_err() {
if ancestor_hashes_request_quic_sender
.blocking_send((*socket_addr, Bytes::from(request_bytes)))
.is_err()
{
// The receiver end of the channel is disconnected.
break;
}
Expand Down Expand Up @@ -1316,10 +1304,12 @@ mod test {
let t_packet_adapter = Builder::new()
.spawn(|| adapt_repair_requests_packets(requests_receiver, remote_request_sender))
.unwrap();
let (repair_response_quic_sender, _) = tokio::sync::mpsc::channel(/*buffer:*/ 128);
let t_listen = responder_serve_repair.listen(
blockstore,
remote_request_receiver,
response_sender,
repair_response_quic_sender,
exit.clone(),
);

Expand Down Expand Up @@ -1511,14 +1501,12 @@ mod test {
repair_validators,
..
} = repair_info;
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded();
let (quic_endpoint_sender, _quic_endpoint_sender) =
let (ancestor_hashes_request_quic_sender, _) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&cluster_slots,
&requester_serve_repair,
&repair_validators,
Expand Down Expand Up @@ -1568,8 +1556,7 @@ mod test {
AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&cluster_slots,
&requester_serve_repair,
&repair_validators,
Expand Down Expand Up @@ -1631,8 +1618,7 @@ mod test {
AncestorHashesService::initiate_ancestor_hashes_requests_for_duplicate_slot(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&cluster_slots,
&requester_serve_repair,
&repair_validators,
Expand Down Expand Up @@ -1718,15 +1704,13 @@ mod test {
} = repair_info;
cluster_info.insert_info(responder_node.info);
bank_forks.read().unwrap().root_bank().epoch_schedule();
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded();
let (quic_endpoint_sender, _quic_endpoint_sender) =
let (ancestor_hashes_request_quic_sender, _) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
// 1) No signals from ReplayStage, no requests should be made
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down Expand Up @@ -1769,8 +1753,7 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down Expand Up @@ -1810,8 +1793,7 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down Expand Up @@ -1843,8 +1825,7 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down Expand Up @@ -1882,8 +1863,7 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down Expand Up @@ -1926,8 +1906,7 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down Expand Up @@ -2084,15 +2063,13 @@ mod test {
&leader_schedule_cache,
);

let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded();
let (quic_endpoint_sender, _quic_endpoint_sender) =
let (ancestor_hashes_request_quic_sender, _) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);
// Simulate making a request
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down Expand Up @@ -2188,8 +2165,7 @@ mod test {
&repair_info.ancestor_duplicate_slots_sender,
&retryable_slots_sender,
);
let (quic_endpoint_response_sender, _quic_endpoint_response_receiver) = unbounded();
let (quic_endpoint_sender, _quic_endpoint_sender) =
let (ancestor_hashes_request_quic_sender, _) =
tokio::sync::mpsc::channel(/*buffer:*/ 128);

// Simulate ancestor request thread getting the retry signal
Expand All @@ -2199,8 +2175,7 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down Expand Up @@ -2239,8 +2214,7 @@ mod test {
AncestorHashesService::manage_ancestor_requests(
&ancestor_hashes_request_statuses,
&ancestor_hashes_request_socket,
&quic_endpoint_sender,
&quic_endpoint_response_sender,
&ancestor_hashes_request_quic_sender,
&repair_info,
&outstanding_requests,
&ancestor_hashes_replay_update_receiver,
Expand Down
Loading

0 comments on commit 4513f49

Please sign in to comment.