Skip to content

Commit

Permalink
Merge pull request #3126 from autonomys/get-pieces-in-batches
Browse files Browse the repository at this point in the history
Get pieces in batches
  • Loading branch information
nazar-pc authored Oct 14, 2024
2 parents 1f65ca4 + 81751cf commit f1428ed
Show file tree
Hide file tree
Showing 12 changed files with 641 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primiti
subspace-metrics = { version = "0.1.0", path = "../../shared/subspace-metrics" }
thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
tokio-stream = "0.1.16"
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
unsigned-varint = { version = "0.8.0", features = ["futures", "asynchronous_codec"] }
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/examples/random-walker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ async fn request_sample_piece(
let request_result = node
.send_generic_request(
peer_id,
Vec::new(),
PieceByIndexRequest {
piece_index: sample_piece_index,
cached_pieces: Arc::default(),
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/examples/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async fn main() {

tokio::spawn(async move {
let resp = node_2
.send_generic_request(node_1.id(), ExampleRequest)
.send_generic_request(node_1.id(), Vec::new(), ExampleRequest)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/behavior/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ async fn test_async_handler_works_with_pending_internal_future() {
});

let resp = node_2
.send_generic_request(node_1.id(), ExampleRequest)
.send_generic_request(node_1.id(), Vec::new(), ExampleRequest)
.await
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

//! Networking functionality of Subspace Network, primarily used for DSN (Distributed Storage
//! Network).

#![feature(const_option, impl_trait_in_assoc_type, ip, try_blocks)]
#![warn(missing_docs)]

Expand Down
12 changes: 10 additions & 2 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl Node {
async fn send_generic_request_internal<Request>(
&self,
peer_id: PeerId,
addresses: Vec<Multiaddr>,
request: Request,
acquire_permit: bool,
) -> Result<Request::Response, SendRequestError>
Expand All @@ -387,6 +388,7 @@ impl Node {
let (result_sender, result_receiver) = oneshot::channel();
let command = Command::GenericRequest {
peer_id,
addresses,
protocol_name: Request::PROTOCOL_NAME,
request: request.encode(),
result_sender,
Expand All @@ -400,15 +402,18 @@ impl Node {
}

/// Sends the generic request to the peer and awaits the result.
///
/// Optional addresses will be used for dialing if connection to peer isn't established yet.
pub async fn send_generic_request<Request>(
&self,
peer_id: PeerId,
addresses: Vec<Multiaddr>,
request: Request,
) -> Result<Request::Response, SendRequestError>
where
Request: GenericRequest,
{
self.send_generic_request_internal(peer_id, request, true)
self.send_generic_request_internal(peer_id, addresses, request, true)
.await
}

Expand Down Expand Up @@ -675,16 +680,19 @@ impl NodeRequestsBatchHandle {
self.node.get_closest_peers_internal(key, false).await
}
/// Sends the generic request to the peer and awaits the result.
///
/// Optional addresses will be used for dialing if connection to peer isn't established yet.
pub async fn send_generic_request<Request>(
&mut self,
peer_id: PeerId,
addresses: Vec<Multiaddr>,
request: Request,
) -> Result<Request::Response, SendRequestError>
where
Request: GenericRequest,
{
self.node
.send_generic_request_internal(peer_id, request, false)
.send_generic_request_internal(peer_id, addresses, request, false)
.await
}
}
18 changes: 17 additions & 1 deletion crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use libp2p::kad::{
};
use libp2p::metrics::{Metrics, Recorder};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
use nohash_hasher::IntMap;
Expand Down Expand Up @@ -1430,10 +1430,26 @@ where
}
Command::GenericRequest {
peer_id,
addresses,
protocol_name,
request,
result_sender,
} => {
if !addresses.is_empty()
&& !self
.swarm
.connected_peers()
.any(|candidate| candidate == &peer_id)
{
if let Err(error) = self.swarm.dial(
DialOpts::peer_id(peer_id)
.addresses(addresses)
.condition(PeerCondition::DisconnectedAndNotDialing)
.build(),
) {
warn!(%error, "Failed to dial disconnected peer on generic request");
}
}
self.swarm.behaviour_mut().request_response.send_request(
&peer_id,
protocol_name,
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub(crate) enum Command {
},
GenericRequest {
peer_id: PeerId,
addresses: Vec<Multiaddr>,
protocol_name: &'static str,
request: Vec<u8>,
result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
Expand Down
Loading

0 comments on commit f1428ed

Please sign in to comment.