Skip to content

Commit

Permalink
feat(networking): use range based gets for close_peers calls
Browse files Browse the repository at this point in the history
This should allow us to return more peers for various network operations, and leverage
range based gets more frequently
  • Loading branch information
joshuef committed Oct 18, 2024
1 parent 97bfbe3 commit 44f1f87
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 11 deletions.
8 changes: 4 additions & 4 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub enum LocalSwarmCmd {
sender: oneshot::Sender<Vec<PeerId>>,
},
/// Return the current GetRange as determined by the SwarmDriver
GetCurrentRange {
GetCurrentRequestRange {
sender: oneshot::Sender<KBucketDistance>,
},
/// Get a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that
Expand Down Expand Up @@ -247,7 +247,7 @@ impl Debug for LocalSwarmCmd {
LocalSwarmCmd::GetKBuckets { .. } => {
write!(f, "LocalSwarmCmd::GetKBuckets")
}
LocalSwarmCmd::GetCurrentRange { .. } => {
LocalSwarmCmd::GetCurrentRequestRange { .. } => {
write!(f, "SwarmCmd::GetCurrentRange")
}
LocalSwarmCmd::GetAllLocalPeersExcludingSelf { .. } => {
Expand Down Expand Up @@ -744,8 +744,8 @@ impl SwarmDriver {
.record_addresses();
let _ = sender.send(addresses);
}
LocalSwarmCmd::GetCurrentRange { sender } => {
cmd_string = "GetCurrentRange";
LocalSwarmCmd::GetCurrentRequestRange { sender } => {
cmd_string = "GetCurrentRequestRange";
let _ = sender.send(self.get_request_range());
}
LocalSwarmCmd::GetKBuckets { sender } => {
Expand Down
85 changes: 79 additions & 6 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl Network {
/// Return the GetRange as determined by the internal SwarmDriver
pub async fn get_range(&self) -> Result<KBucketDistance> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetCurrentRange { sender });
self.send_local_swarm_cmd(LocalSwarmCmd::GetCurrentRequestRange { sender });
receiver.await.map_err(NetworkError::from)
}

Expand Down Expand Up @@ -398,8 +398,12 @@ impl Network {

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// Excludes the client's `PeerId` while calculating the closest peers.
pub async fn client_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_close_group_closest_peers(key, true).await
pub async fn client_get_all_close_peers_in_range_or_close_group(
&self,
key: &NetworkAddress,
) -> Result<Vec<PeerId>> {
self.get_all_close_peers_in_range_or_close_group(key, true)
.await
}

/// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that
Expand Down Expand Up @@ -450,7 +454,7 @@ impl Network {
// The close_nodes don't change often and the previous set of close_nodes might be taking a while to write
// the Chunk, so query them again incase of a failure.
close_nodes = self
.get_close_group_closest_peers(&chunk_address, true)
.client_get_all_close_peers_in_range_or_close_group(&chunk_address)
.await?;
}
retry_attempts += 1;
Expand Down Expand Up @@ -517,7 +521,7 @@ impl Network {
// The requirement of having at least CLOSE_GROUP_SIZE
// close nodes will be checked internally automatically.
let mut close_nodes = self
.get_close_group_closest_peers(&record_address, true)
.client_get_all_close_peers_in_range_or_close_group(&record_address)
.await?;
// Filter out results from the ignored peers.
close_nodes.retain(|peer_id| !ignore_peers.contains(peer_id));
Expand Down Expand Up @@ -602,7 +606,7 @@ impl Network {
// The requirement of having at least CLOSE_GROUP_SIZE
// close nodes will be checked internally automatically.
let close_nodes = self
.get_close_group_closest_peers(&record_address, true)
.client_get_all_close_peers_in_range_or_close_group(&record_address)
.await?;

let self_address = NetworkAddress::from_peer(self.peer_id());
Expand Down Expand Up @@ -1117,6 +1121,75 @@ impl Network {
Ok(closest_peers.into_iter().cloned().collect())
}

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// If `client` is false, then include `self` among the `closest_peers`
/// Returns all peers found inside the range
///
/// If less than CLOSE_GROUP_SIZE peers are found, it will return all the peers found up to the CLOSE_GROUP_SIZE
pub async fn get_all_close_peers_in_range_or_close_group(
&self,
key: &NetworkAddress,
client: bool,
) -> Result<Vec<PeerId>> {
let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
debug!("Getting the all closest peers in range of {pretty_key:?}");
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
key: key.clone(),
sender,
});

let found_peers = receiver.await?;

// Count self in if among the CLOSE_GROUP_SIZE closest and sort the result
let result_len = found_peers.len();
let mut closest_peers = found_peers;

let expected_range = self.get_range().await?;

// ensure we're not including self here
if client {
// remove our peer id from the calculations here:
closest_peers.retain(|&x| x != self.peer_id());
if result_len != closest_peers.len() {
info!("Remove self client from the closest_peers");
}
}

if tracing::level_enabled!(tracing::Level::DEBUG) {
let close_peers_pretty_print: Vec<_> = closest_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
)
})
.collect();

debug!(
"Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
);
}

let mut restricted_closest_peers =
sort_peers_by_address_and_limit_by_distance(&closest_peers, key, expected_range)?;

if restricted_closest_peers.len() < CLOSE_GROUP_SIZE {
warn!(
"Getting close peers to {pretty_key:?} current GetRange of {:?} too strict giving insufficient peers... Falling back to all peers found"
, expected_range.ilog2());

restricted_closest_peers =
sort_peers_by_address_and_limit(&closest_peers, key, CLOSE_GROUP_SIZE)?;
}

debug!(
"Network knowledge of closest peers in range of {:?} to target {pretty_key:?} are: {:?}", expected_range.ilog2(), restricted_closest_peers.len()
);
Ok(restricted_closest_peers.into_iter().cloned().collect())
}

/// Send a `Request` to the provided set of peers and wait for their responses concurrently.
/// If `get_all_responses` is true, we wait for the responses from all the peers.
/// NB TODO: Will return an error if the request timeouts.
Expand Down
2 changes: 1 addition & 1 deletion sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ impl Node {
async fn close_nodes_shunning_peer(network: &Network, peer_id: PeerId) -> bool {
// using `client` to exclude self
let closest_peers = match network
.client_get_closest_peers(&NetworkAddress::from_peer(peer_id))
.client_get_all_close_peers_in_range_or_close_group(&NetworkAddress::from_peer(peer_id))
.await
{
Ok(peers) => peers,
Expand Down

0 comments on commit 44f1f87

Please sign in to comment.