Skip to content

Commit

Permalink
Merge pull request #2259 from joshuef/Feat-CloseInRange
Browse files Browse the repository at this point in the history
leverage get_range more
  • Loading branch information
joshuef authored Oct 18, 2024
2 parents 3249f53 + 44f1f87 commit 84324eb
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 12 deletions.
8 changes: 4 additions & 4 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub enum LocalSwarmCmd {
sender: oneshot::Sender<Vec<PeerId>>,
},
/// Return the current GetRange as determined by the SwarmDriver
GetCurrentRange {
GetCurrentRequestRange {
sender: oneshot::Sender<KBucketDistance>,
},
/// Get a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that
Expand Down Expand Up @@ -247,7 +247,7 @@ impl Debug for LocalSwarmCmd {
LocalSwarmCmd::GetKBuckets { .. } => {
write!(f, "LocalSwarmCmd::GetKBuckets")
}
LocalSwarmCmd::GetCurrentRange { .. } => {
LocalSwarmCmd::GetCurrentRequestRange { .. } => {
write!(f, "SwarmCmd::GetCurrentRange")
}
LocalSwarmCmd::GetAllLocalPeersExcludingSelf { .. } => {
Expand Down Expand Up @@ -744,8 +744,8 @@ impl SwarmDriver {
.record_addresses();
let _ = sender.send(addresses);
}
LocalSwarmCmd::GetCurrentRange { sender } => {
cmd_string = "GetCurrentRange";
LocalSwarmCmd::GetCurrentRequestRange { sender } => {
cmd_string = "GetCurrentRequestRange";
let _ = sender.send(self.get_request_range());
}
LocalSwarmCmd::GetKBuckets { sender } => {
Expand Down
93 changes: 86 additions & 7 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl Network {
/// Return the GetRange as determined by the internal SwarmDriver
pub async fn get_range(&self) -> Result<KBucketDistance> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetCurrentRange { sender });
self.send_local_swarm_cmd(LocalSwarmCmd::GetCurrentRequestRange { sender });
receiver.await.map_err(NetworkError::from)
}

Expand Down Expand Up @@ -398,8 +398,12 @@ impl Network {

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// Excludes the client's `PeerId` while calculating the closest peers.
pub async fn client_get_closest_peers(&self, key: &NetworkAddress) -> Result<Vec<PeerId>> {
self.get_closest_peers(key, true).await
pub async fn client_get_all_close_peers_in_range_or_close_group(
&self,
key: &NetworkAddress,
) -> Result<Vec<PeerId>> {
self.get_all_close_peers_in_range_or_close_group(key, true)
.await
}

/// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that
Expand Down Expand Up @@ -449,7 +453,9 @@ impl Network {
// Do not query the closest_peers during every re-try attempt.
// The close_nodes don't change often and the previous set of close_nodes might be taking a while to write
// the Chunk, so query them again incase of a failure.
close_nodes = self.get_closest_peers(&chunk_address, true).await?;
close_nodes = self
.client_get_all_close_peers_in_range_or_close_group(&chunk_address)
.await?;
}
retry_attempts += 1;
info!(
Expand Down Expand Up @@ -514,7 +520,9 @@ impl Network {
) -> Result<PayeeQuote> {
// The requirement of having at least CLOSE_GROUP_SIZE
// close nodes will be checked internally automatically.
let mut close_nodes = self.get_closest_peers(&record_address, true).await?;
let mut close_nodes = self
.client_get_all_close_peers_in_range_or_close_group(&record_address)
.await?;
// Filter out results from the ignored peers.
close_nodes.retain(|peer_id| !ignore_peers.contains(peer_id));

Expand Down Expand Up @@ -597,7 +605,9 @@ impl Network {
let record_address = NetworkAddress::from_record_key(&key);
// The requirement of having at least CLOSE_GROUP_SIZE
// close nodes will be checked internally automatically.
let close_nodes = self.get_closest_peers(&record_address, true).await?;
let close_nodes = self
.client_get_all_close_peers_in_range_or_close_group(&record_address)
.await?;

let self_address = NetworkAddress::from_peer(self.peer_id());
let request = Request::Query(Query::GetRegisterRecord {
Expand Down Expand Up @@ -1069,7 +1079,7 @@ impl Network {

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// If `client` is false, then include `self` among the `closest_peers`
pub async fn get_closest_peers(
pub async fn get_close_group_closest_peers(
&self,
key: &NetworkAddress,
client: bool,
Expand Down Expand Up @@ -1111,6 +1121,75 @@ impl Network {
Ok(closest_peers.into_iter().cloned().collect())
}

/// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name.
/// If `client` is false, then include `self` among the `closest_peers`
/// Returns all peers found inside the range
///
/// If less than CLOSE_GROUP_SIZE peers are found, it will return all the peers found up to the CLOSE_GROUP_SIZE
pub async fn get_all_close_peers_in_range_or_close_group(
&self,
key: &NetworkAddress,
client: bool,
) -> Result<Vec<PeerId>> {
let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key());
debug!("Getting the all closest peers in range of {pretty_key:?}");
let (sender, receiver) = oneshot::channel();
self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork {
key: key.clone(),
sender,
});

let found_peers = receiver.await?;

// Count self in if among the CLOSE_GROUP_SIZE closest and sort the result
let result_len = found_peers.len();
let mut closest_peers = found_peers;

let expected_range = self.get_range().await?;

// ensure we're not including self here
if client {
// remove our peer id from the calculations here:
closest_peers.retain(|&x| x != self.peer_id());
if result_len != closest_peers.len() {
info!("Remove self client from the closest_peers");
}
}

if tracing::level_enabled!(tracing::Level::DEBUG) {
let close_peers_pretty_print: Vec<_> = closest_peers
.iter()
.map(|peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
)
})
.collect();

debug!(
"Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}"
);
}

let mut restricted_closest_peers =
sort_peers_by_address_and_limit_by_distance(&closest_peers, key, expected_range)?;

if restricted_closest_peers.len() < CLOSE_GROUP_SIZE {
warn!(
"Getting close peers to {pretty_key:?} current GetRange of {:?} too strict giving insufficient peers... Falling back to all peers found"
, expected_range.ilog2());

restricted_closest_peers =
sort_peers_by_address_and_limit(&closest_peers, key, CLOSE_GROUP_SIZE)?;
}

debug!(
"Network knowledge of closest peers in range of {:?} to target {pretty_key:?} are: {:?}", expected_range.ilog2(), restricted_closest_peers.len()
);
Ok(restricted_closest_peers.into_iter().cloned().collect())
}

/// Send a `Request` to the provided set of peers and wait for their responses concurrently.
/// If `get_all_responses` is true, we wait for the responses from all the peers.
/// NB TODO: Will return an error if the request timeouts.
Expand Down
2 changes: 1 addition & 1 deletion sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ impl Node {
async fn close_nodes_shunning_peer(network: &Network, peer_id: PeerId) -> bool {
// using `client` to exclude self
let closest_peers = match network
.client_get_closest_peers(&NetworkAddress::from_peer(peer_id))
.client_get_all_close_peers_in_range_or_close_group(&NetworkAddress::from_peer(peer_id))
.await
{
Ok(peers) => peers,
Expand Down

0 comments on commit 84324eb

Please sign in to comment.