Skip to content

Commit

Permalink
feat(request-response): Add support for custom dial options when maki…
Browse files Browse the repository at this point in the history
…ng request to disconnected peer
  • Loading branch information
nazar-pc committed Nov 26, 2024
1 parent 00588a5 commit d8d075f
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ libp2p-pnet = { version = "0.25.0", path = "transports/pnet" }
libp2p-quic = { version = "0.11.2", path = "transports/quic" }
libp2p-relay = { version = "0.18.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.15.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.27.1", path = "protocols/request-response" }
libp2p-request-response = { version = "0.28.0", path = "protocols/request-response" }
libp2p-server = { version = "0.12.8", path = "misc/server" }
libp2p-stream = { version = "0.2.0-alpha.1", path = "protocols/stream" }
libp2p-swarm = { version = "0.45.2", path = "swarm" }
Expand Down
21 changes: 15 additions & 6 deletions protocols/autonat/src/v1/behaviour/as_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,20 @@ impl HandleInnerEvent for AsClient<'_> {
error,
request_id,
} => {
tracing::debug!(
%peer,
"Outbound Failure {} when on dial-back request to peer.",
error,
);
if let Some(peer) = peer {
tracing::debug!(
%peer,
%request_id,
"Outbound Failure {} when on dial-back request to peer.",
error,
);
} else {
tracing::debug!(
%request_id,
"Outbound Failure {} when on dial-back request to peer.",
error,
);
}
let probe_id = self
.ongoing_outbound
.remove(&request_id)
Expand All @@ -169,7 +178,7 @@ impl HandleInnerEvent for AsClient<'_> {
VecDeque::from([ToSwarm::GenerateEvent(Event::OutboundProbe(
OutboundProbeEvent::Error {
probe_id,
peer: Some(peer),
peer,
error: OutboundProbeError::OutboundRequest(error),
},
))])
Expand Down
5 changes: 5 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.28.0

- `send_request` Add support for custom dial options when making request to disconnected peer.
See [PR 5692](https://github.com/libp2p/rust-libp2p/pull/5692).

## 0.27.1

- Deprecate `void` crate.
Expand Down
4 changes: 2 additions & 2 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-request-response"
edition = "2021"
rust-version = { workspace = true }
description = "Generic Request/Response Protocols"
version = "0.27.1"
version = "0.28.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -19,7 +19,7 @@ libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
rand = "0.8"
serde = { version = "1.0", optional = true}
serde = { version = "1.0", optional = true }
serde_json = { version = "1.0.117", optional = true }
smallvec = "1.13.2"
tracing = { workspace = true }
Expand Down
125 changes: 86 additions & 39 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
/// An outbound request failed.
OutboundFailure {
/// The peer to whom the request was sent.
peer: PeerId,
peer: Option<PeerId>,
/// The (local) ID of the failed request.
request_id: OutboundRequestId,
/// The error that occurred.
Expand Down Expand Up @@ -333,6 +333,24 @@ impl Config {
}
}

#[derive(Debug, Eq, PartialEq, Hash)]
enum PendingOutgoingRequest {
PeerId(PeerId),
ConnectionId(ConnectionId),
}

impl From<PeerId> for PendingOutgoingRequest {
fn from(peer_id: PeerId) -> Self {
Self::PeerId(peer_id)
}
}

impl From<ConnectionId> for PendingOutgoingRequest {
fn from(connection_id: ConnectionId) -> Self {
Self::ConnectionId(connection_id)
}
}

/// A request/response protocol for some message codec.
pub struct Behaviour<TCodec>
where
Expand Down Expand Up @@ -360,7 +378,8 @@ where
addresses: PeerAddresses,
/// Requests that have not yet been sent and are waiting for a connection
/// to be established.
pending_outbound_requests: HashMap<PeerId, SmallVec<[OutboundMessage<TCodec>; 10]>>,
pending_outbound_requests:
HashMap<PendingOutgoingRequest, SmallVec<[OutboundMessage<TCodec>; 10]>>,
}

impl<TCodec> Behaviour<TCodec>
Expand Down Expand Up @@ -417,28 +436,45 @@ where
/// connection is established.
///
/// > **Note**: In order for such a dialing attempt to succeed,
/// > the `RequestResonse` protocol must either be embedded
/// > in another `NetworkBehaviour` that provides peer and
/// > address discovery, or known addresses of peers must be
/// > managed via [`Behaviour::add_address`] and
/// > the `peer` must be [`DialOpts`] with multiaddresses or
/// > in case of simple [`PeerId`] `RequestResponse` protocol
/// > must either be embedded in another `NetworkBehaviour`
/// > that provides peer and address discovery, or known addresses of
/// > peers must be managed via [`Behaviour::add_address`] and
/// > [`Behaviour::remove_address`].
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
pub fn send_request<Peer>(&mut self, peer: Peer, request: TCodec::Request) -> OutboundRequestId
where
DialOpts: From<Peer>,
{
let request_id = self.next_outbound_request_id();
let request = OutboundMessage {
request_id,
request,
protocols: self.outbound_protocols.clone(),
};

if let Some(request) = self.try_send_request(peer, request) {
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(*peer).build(),
});
self.pending_outbound_requests
.entry(*peer)
.or_default()
.push(request);
}
let opts = DialOpts::from(peer);
let maybe_peer_id = opts.get_peer_id();
let request = if let Some(peer_id) = &maybe_peer_id {
if let Some(request) = self.try_send_request(peer_id, request) {
request
} else {
// Sent successfully
return request_id;
}
} else {
request
};

self.pending_outbound_requests
.entry(if let Some(peer_id) = maybe_peer_id {
peer_id.into()
} else {
opts.connection_id().into()
})
.or_default()
.push(request);
self.pending_events.push_back(ToSwarm::Dial { opts });

request_id
}
Expand Down Expand Up @@ -506,7 +542,7 @@ where
// Check if request is still pending to be sent.
let pen_conn = self
.pending_outbound_requests
.get(peer)
.get(&PendingOutgoingRequest::from(*peer))
.map(|rps| rps.iter().any(|rp| rp.request_id == *request_id))
.unwrap_or(false);

Expand Down Expand Up @@ -665,30 +701,41 @@ where
for request_id in connection.pending_outbound_responses {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer: peer_id,
peer: Some(peer_id),
request_id,
error: OutboundFailure::ConnectionClosed,
}));
}
}

fn on_dial_failure(&mut self, DialFailure { peer_id, .. }: DialFailure) {
if let Some(peer) = peer_id {
// If there are pending outgoing requests when a dial failure occurs,
// it is implied that we are not connected to the peer, since pending
// outgoing requests are drained when a connection is established and
// only created when a peer is not connected when a request is made.
// Thus these requests must be considered failed, even if there is
// another, concurrent dialing attempt ongoing.
if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
for request in pending {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
request_id: request.request_id,
error: OutboundFailure::DialFailure,
}));
}
fn on_dial_failure(
&mut self,
DialFailure {
peer_id,
connection_id,
..
}: DialFailure,
) {
let key = if let Some(peer_id) = peer_id {
peer_id.into()
} else {
connection_id.into()
};

// If there are pending outgoing requests when a dial failure occurs,
// it is implied that we are not connected to the peer, since pending
// outgoing requests are drained when a connection is established and
// only created when a peer is not connected when a request is made.
// Thus, these requests must be considered failed, even if there is
// another, concurrent dialing attempt ongoing.
if let Some(pending) = self.pending_outbound_requests.remove(&key) {
for request in pending {
self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer: peer_id,
request_id: request.request_id,
error: OutboundFailure::DialFailure,
}));
}
}
}
Expand All @@ -703,7 +750,7 @@ where
) {
let mut connection = Connection::new(connection_id, remote_address);

if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer) {
if let Some(pending_requests) = self.pending_outbound_requests.remove(&peer.into()) {
for request in pending_requests {
connection
.pending_outbound_responses
Expand Down Expand Up @@ -887,7 +934,7 @@ where

self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
peer: Some(peer),
request_id,
error: OutboundFailure::Timeout,
}));
Expand All @@ -901,7 +948,7 @@ where

self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
peer: Some(peer),
request_id,
error: OutboundFailure::UnsupportedProtocols,
}));
Expand All @@ -912,7 +959,7 @@ where

self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
peer: Some(peer),
request_id,
error: OutboundFailure::Io(error),
}))
Expand Down
16 changes: 8 additions & 8 deletions protocols/request-response/tests/error_reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn report_outbound_failure_on_read_response() {
.send_request(&peer1_id, Action::FailOnReadResponse);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

let error = match error {
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn report_outbound_failure_on_write_request() {
.send_request(&peer1_id, Action::FailOnWriteRequest);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

let error = match error {
Expand Down Expand Up @@ -151,7 +151,7 @@ async fn report_outbound_timeout_on_read_response() {
.send_request(&peer1_id, Action::TimeoutOnReadResponse);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);
assert!(matches!(error, OutboundFailure::Timeout));
};
Expand Down Expand Up @@ -203,7 +203,7 @@ async fn report_outbound_failure_on_max_streams() {
.send_request(&peer1_id, Action::FailOnMaxStreams);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, outbound_req_id);
assert!(matches!(error, OutboundFailure::Io(_)));
};
Expand Down Expand Up @@ -236,7 +236,7 @@ async fn report_inbound_failure_on_read_request() {
.send_request(&peer1_id, Action::FailOnReadRequest);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

match error {
Expand Down Expand Up @@ -295,7 +295,7 @@ async fn report_inbound_failure_on_write_response() {
.send_request(&peer1_id, Action::FailOnWriteResponse);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

match error {
Expand Down Expand Up @@ -352,7 +352,7 @@ async fn report_inbound_timeout_on_write_response() {
.send_request(&peer1_id, Action::TimeoutOnWriteResponse);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(peer, Some(peer1_id));
assert_eq!(req_id_done, req_id);

match error {
Expand Down Expand Up @@ -612,7 +612,7 @@ async fn wait_inbound_failure(

async fn wait_outbound_failure(
swarm: &mut Swarm<request_response::Behaviour<TestCodec>>,
) -> Result<(PeerId, OutboundRequestId, OutboundFailure)> {
) -> Result<(Option<PeerId>, OutboundRequestId, OutboundFailure)> {
loop {
match swarm.select_next_some().await.try_into_behaviour_event() {
Ok(request_response::Event::OutboundFailure {
Expand Down
Loading

0 comments on commit d8d075f

Please sign in to comment.