Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dcutr): handle empty holepunch_candidates #5583

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ libp2p-allow-block-list = { version = "0.4.1", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.13.1", path = "protocols/autonat" }
libp2p-connection-limits = { version = "0.4.0", path = "misc/connection-limits" }
libp2p-core = { version = "0.42.0", path = "core" }
libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" }
libp2p-dcutr = { version = "0.12.1", path = "protocols/dcutr" }
libp2p-dns = { version = "0.42.0", path = "transports/dns" }
libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/dcutr/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.12.1

- Handle empty hole-punch candidates at relayed connection startup.
See [PR 5583](https://github.com/libp2p/rust-libp2p/pull/5583).

## 0.12.0

<!-- Update to libp2p-swarm v0.45.0 -->
Expand Down
2 changes: 1 addition & 1 deletion protocols/dcutr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-dcutr"
edition = "2021"
rust-version = { workspace = true }
description = "Direct connection upgrade through relay"
version = "0.12.0"
version = "0.12.1"
authors = ["Max Inden <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
70 changes: 43 additions & 27 deletions protocols/dcutr/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub struct Behaviour {
/// Queue of actions to return when polled.
queued_events: VecDeque<ToSwarm<Event, Either<handler::relayed::Command, Void>>>,

/// All direct (non-relayed) connections.
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,
/// All relayed connections.
relayed_connections: HashMap<PeerId, HashSet<ConnectionId>>,

address_candidates: Candidates,

Expand All @@ -86,15 +86,15 @@ impl Behaviour {
pub fn new(local_peer_id: PeerId) -> Self {
Behaviour {
queued_events: Default::default(),
direct_connections: Default::default(),
relayed_connections: Default::default(),
address_candidates: Candidates::new(local_peer_id),
direct_to_relayed_connections: Default::default(),
outgoing_direct_connection_attempts: Default::default(),
}
}

fn observed_addresses(&self) -> Vec<Multiaddr> {
self.address_candidates.iter().cloned().collect()
fn observed_addresses(&self) -> LruCache<Multiaddr, ()> {
self.address_candidates.inner.clone()
}

fn on_dial_failure(
Expand Down Expand Up @@ -148,17 +148,17 @@ impl Behaviour {
..
}: ConnectionClosed,
) {
if !connected_point.is_relayed() {
if connected_point.is_relayed() {
let connections = self
.direct_connections
.relayed_connections
.get_mut(&peer_id)
.expect("Peer of direct connection to be tracked.");
.expect("Peer of relayed connection to be tracked.");
connections
.remove(&connection_id)
.then_some(())
.expect("Direct connection to be tracked.");
.expect("Relayed connection to be tracked.");
if connections.is_empty() {
self.direct_connections.remove(&peer_id);
self.relayed_connections.remove(&peer_id);
}
}
}
Expand All @@ -176,6 +176,11 @@ impl NetworkBehaviour for Behaviour {
remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
if is_relayed(local_addr) {
self.relayed_connections
.entry(peer)
.or_default()
.insert(connection_id);

let connected_point = ConnectedPoint::Listener {
local_addr: local_addr.clone(),
send_back_addr: remote_addr.clone(),
Expand All @@ -186,10 +191,6 @@ impl NetworkBehaviour for Behaviour {

return Ok(Either::Left(handler)); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
}
self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);

assert!(
!self
Expand All @@ -210,6 +211,11 @@ impl NetworkBehaviour for Behaviour {
port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
if is_relayed(addr) {
self.relayed_connections
.entry(peer)
.or_default()
.insert(connection_id);

return Ok(Either::Left(handler::relayed::Handler::new(
ConnectedPoint::Dialer {
address: addr.clone(),
Expand All @@ -220,11 +226,6 @@ impl NetworkBehaviour for Behaviour {
))); // TODO: We could make two `handler::relayed::Handler` here, one inbound one outbound.
}

self.direct_connections
.entry(peer)
.or_default()
.insert(connection_id);

// Whether this is a connection requested by this behaviour.
if let Some(&relayed_connection_id) = self.direct_to_relayed_connections.get(&connection_id)
{
Expand Down Expand Up @@ -336,7 +337,21 @@ impl NetworkBehaviour for Behaviour {
}
FromSwarm::DialFailure(dial_failure) => self.on_dial_failure(dial_failure),
FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => {
self.address_candidates.add(addr.clone());
if let Some(address) = self.address_candidates.add(addr.clone()) {
for (peer, connections) in &self.relayed_connections {
for connection in connections {
self.queued_events.push_back(ToSwarm::NotifyHandler {
handler: NotifyHandler::One(*connection),
peer_id: *peer,
event: Either::Left(
handler::relayed::Command::NewExternalAddrCandidate(
address.clone(),
),
),
})
}
}
}
}
_ => {}
}
Expand All @@ -362,20 +377,21 @@ impl Candidates {
}
}

fn add(&mut self, mut address: Multiaddr) {
/// Format the address and push it into the LruCache if it is not relayed.
///
/// Returns the provided address formatted if it was pushed.
/// Returns `None` otherwise.
fn add(&mut self, mut address: Multiaddr) -> Option<Multiaddr> {
if is_relayed(&address) {
return;
return None;
}

if address.iter().last() != Some(Protocol::P2p(self.me)) {
address.push(Protocol::P2p(self.me));
}

self.inner.push(address, ());
}

fn iter(&self) -> impl Iterator<Item = &Multiaddr> {
self.inner.iter().map(|(a, _)| a)
self.inner.push(address.clone(), ());
Some(address)
}
}

Expand Down
114 changes: 84 additions & 30 deletions protocols/dcutr/src/handler/relayed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ use libp2p_swarm::handler::{
ListenUpgradeError,
};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError,
SubstreamProtocol,
};
use lru::LruCache;
use protocol::{inbound, outbound};
use std::collections::VecDeque;
use std::io;
use std::task::{Context, Poll};
use std::time::Duration;
use tracing::{debug, warn};

#[derive(Debug)]
pub enum Command {
Connect,
NewExternalAddrCandidate(Multiaddr),
}

#[derive(Debug)]
Expand All @@ -67,28 +70,76 @@ pub struct Handler {

// Inbound DCUtR handshakes
inbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, inbound::Error>>,
pending_inbound_stream: Option<Stream>,

// Outbound DCUtR handshake.
outbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, outbound::Error>>,

/// The addresses we will send to the other party for hole-punching attempts.
holepunch_candidates: Vec<Multiaddr>,
holepunch_candidates: LruCache<Multiaddr, ()>,
pending_outbound_stream: Option<Stream>,

attempts: u8,
}

impl Handler {
pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec<Multiaddr>) -> Self {
pub fn new(endpoint: ConnectedPoint, holepunch_candidates: LruCache<Multiaddr, ()>) -> Self {
Self {
endpoint,
queued_events: Default::default(),
inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
pending_inbound_stream: None,
outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
pending_outbound_stream: None,
holepunch_candidates,
attempts: 0,
}
}

fn set_stream(&mut self, stream_type: StreamType, stream: Stream) {
if self.holepunch_candidates.is_empty() {
debug!(
"New {stream_type} connect stream but holepunch_candidates is empty. Buffering it."
);
let has_replaced_old_stream = match stream_type {
StreamType::Inbound => self.pending_inbound_stream.replace(stream).is_some(),
StreamType::Outbound => self.pending_outbound_stream.replace(stream).is_some(),
};
if has_replaced_old_stream {
warn!("New {stream_type} connect stream while still buffering previous one. Replacing previous with new.");
}
} else {
let holepunch_candidates = self
.holepunch_candidates
.iter()
.map(|(a, _)| a)
.cloned()
.collect();

let has_replaced_old_stream = match stream_type {
StreamType::Inbound => {
// TODO: when upstreaming this fix, ask libp2p about merging the `attempts` incrementation.
self.attempts += 1;

// FIXME: actually does not replace !!
self.inbound_stream
.try_push(inbound::handshake(stream, holepunch_candidates))
.is_err()
}
StreamType::Outbound => {
// FIXME: actually does not replace !!
self.outbound_stream
.try_push(outbound::handshake(stream, holepunch_candidates))
.is_err()
}
};

if has_replaced_old_stream {
warn!("New {stream_type} connect stream while still upgrading previous one. Replacing previous with new.");
}
}
}

fn on_fully_negotiated_inbound(
&mut self,
FullyNegotiatedInbound {
Expand All @@ -99,21 +150,7 @@ impl Handler {
>,
) {
match output {
future::Either::Left(stream) => {
if self
.inbound_stream
.try_push(inbound::handshake(
stream,
self.holepunch_candidates.clone(),
))
.is_err()
{
tracing::warn!(
"New inbound connect stream while still upgrading previous one. Replacing previous with new.",
);
}
self.attempts += 1;
}
future::Either::Left(stream) => self.set_stream(StreamType::Inbound, stream),
// A connection listener denies all incoming substreams, thus none can ever be fully negotiated.
// TODO: remove when Rust 1.82 is MSRV
#[allow(unreachable_patterns)]
Expand All @@ -134,18 +171,7 @@ impl Handler {
self.endpoint.is_listener(),
"A connection dialer never initiates a connection upgrade."
);
if self
.outbound_stream
.try_push(outbound::handshake(
stream,
self.holepunch_candidates.clone(),
))
.is_err()
{
tracing::warn!(
"New outbound connect stream while still upgrading previous one. Replacing previous with new.",
);
}
self.set_stream(StreamType::Outbound, stream);
}

fn on_listen_upgrade_error(
Expand Down Expand Up @@ -214,8 +240,22 @@ impl ConnectionHandler for Handler {
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()),
});

// TODO: when upstreaming this fix, ask libp2p about merging the `attempts` incrementation.
// Indeed, even if the `queued_event` above will be trigger on stream opening, it might not start
self.attempts += 1;
}
Command::NewExternalAddrCandidate(address) => {
self.holepunch_candidates.push(address, ());

if let Some(stream) = self.pending_inbound_stream.take() {
self.set_stream(StreamType::Inbound, stream);
}

if let Some(stream) = self.pending_outbound_stream.take() {
self.set_stream(StreamType::Outbound, stream);
}
}
}
}

Expand Down Expand Up @@ -316,3 +356,17 @@ impl ConnectionHandler for Handler {
}
}
}

enum StreamType {
Inbound,
Outbound,
}

impl std::fmt::Display for StreamType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Inbound => write!(f, "inbound"),
Self::Outbound => write!(f, "outbound"),
}
}
}
Loading