Skip to content

Commit

Permalink
feat(mdns): emit ToSwarm::NewExternalAddrOfPeer on discovery
Browse files Browse the repository at this point in the history
fixes libp2p#5104 and superseeds libp2p#5179

Pull-Request: libp2p#5753.
  • Loading branch information
hopinheimer authored Dec 23, 2024
1 parent 503b136 commit bd710df
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 78 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ libp2p-gossipsub = { version = "0.48.0", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.46.1", path = "protocols/identify" }
libp2p-identity = { version = "0.2.10" }
libp2p-kad = { version = "0.47.1", path = "protocols/kad" }
libp2p-mdns = { version = "0.46.1", path = "protocols/mdns" }
libp2p-mdns = { version = "0.46.2", path = "protocols/mdns" }
libp2p-memory-connection-limits = { version = "0.3.1", path = "misc/memory-connection-limits" }
libp2p-metrics = { version = "0.15.0", path = "misc/metrics" }
libp2p-mplex = { version = "0.42.0", path = "muxers/mplex" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/mdns/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.46.2

- Emit `ToSwarm::NewExternalAddrOfPeer` on discovery.
See [PR 5753](https://github.com/libp2p/rust-libp2p/pull/5753)

## 0.46.1

- Upgrade `hickory-proto`.
Expand Down
2 changes: 1 addition & 1 deletion protocols/mdns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-mdns"
edition = "2021"
rust-version = { workspace = true }
version = "0.46.1"
version = "0.46.2"
description = "Implementation of the libp2p mDNS discovery method"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
Expand Down
178 changes: 103 additions & 75 deletions protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ mod timer;

use std::{
cmp,
collections::hash_map::{Entry, HashMap},
collections::{
hash_map::{Entry, HashMap},
VecDeque,
},
convert::Infallible,
fmt,
future::Future,
io,
Expand Down Expand Up @@ -188,6 +192,9 @@ where
listen_addresses: Arc<RwLock<ListenAddresses>>,

local_peer_id: PeerId,

/// Pending behaviour events to be emitted.
pending_events: VecDeque<ToSwarm<Event, Infallible>>,
}

impl<P> Behaviour<P>
Expand All @@ -208,6 +215,7 @@ where
closest_expiration: Default::default(),
listen_addresses: Default::default(),
local_peer_id,
pending_events: Default::default(),
})
}

Expand Down Expand Up @@ -304,93 +312,113 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Poll ifwatch.
while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) {
match event {
Ok(IfEvent::Up(inet)) => {
let addr = inet.addr();
if addr.is_loopback() {
continue;
}
if addr.is_ipv4() && self.config.enable_ipv6
|| addr.is_ipv6() && !self.config.enable_ipv6
{
continue;
}
if let Entry::Vacant(e) = self.if_tasks.entry(addr) {
match InterfaceState::<P::Socket, P::Timer>::new(
addr,
self.config.clone(),
self.local_peer_id,
self.listen_addresses.clone(),
self.query_response_sender.clone(),
) {
Ok(iface_state) => {
e.insert(P::spawn(iface_state));
}
Err(err) => {
tracing::error!("failed to create `InterfaceState`: {}", err)
loop {
// Check for pending events and emit them.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}

// Poll ifwatch.
while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) {
match event {
Ok(IfEvent::Up(inet)) => {
let addr = inet.addr();
if addr.is_loopback() {
continue;
}
if addr.is_ipv4() && self.config.enable_ipv6
|| addr.is_ipv6() && !self.config.enable_ipv6
{
continue;
}
if let Entry::Vacant(e) = self.if_tasks.entry(addr) {
match InterfaceState::<P::Socket, P::Timer>::new(
addr,
self.config.clone(),
self.local_peer_id,
self.listen_addresses.clone(),
self.query_response_sender.clone(),
) {
Ok(iface_state) => {
e.insert(P::spawn(iface_state));
}
Err(err) => {
tracing::error!("failed to create `InterfaceState`: {}", err)
}
}
}
}
}
Ok(IfEvent::Down(inet)) => {
if let Some(handle) = self.if_tasks.remove(&inet.addr()) {
tracing::info!(instance=%inet.addr(), "dropping instance");
Ok(IfEvent::Down(inet)) => {
if let Some(handle) = self.if_tasks.remove(&inet.addr()) {
tracing::info!(instance=%inet.addr(), "dropping instance");

handle.abort();
handle.abort();
}
}
Err(err) => tracing::error!("if watch returned an error: {}", err),
}
Err(err) => tracing::error!("if watch returned an error: {}", err),
}
}
// Emit discovered event.
let mut discovered = Vec::new();

while let Poll::Ready(Some((peer, addr, expiration))) =
self.query_response_receiver.poll_next_unpin(cx)
{
if let Some((_, _, cur_expires)) = self
.discovered_nodes
.iter_mut()
.find(|(p, a, _)| *p == peer && *a == addr)
// Emit discovered event.
let mut discovered = Vec::new();

while let Poll::Ready(Some((peer, addr, expiration))) =
self.query_response_receiver.poll_next_unpin(cx)
{
*cur_expires = cmp::max(*cur_expires, expiration);
} else {
tracing::info!(%peer, address=%addr, "discovered peer on address");
self.discovered_nodes.push((peer, addr.clone(), expiration));
discovered.push((peer, addr));
if let Some((_, _, cur_expires)) = self
.discovered_nodes
.iter_mut()
.find(|(p, a, _)| *p == peer && *a == addr)
{
*cur_expires = cmp::max(*cur_expires, expiration);
} else {
tracing::info!(%peer, address=%addr, "discovered peer on address");
self.discovered_nodes.push((peer, addr.clone(), expiration));
discovered.push((peer, addr.clone()));

self.pending_events
.push_back(ToSwarm::NewExternalAddrOfPeer {
peer_id: peer,
address: addr,
});
}
}
}

if !discovered.is_empty() {
let event = Event::Discovered(discovered);
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
// Emit expired event.
let now = Instant::now();
let mut closest_expiration = None;
let mut expired = Vec::new();
self.discovered_nodes.retain(|(peer, addr, expiration)| {
if *expiration <= now {
tracing::info!(%peer, address=%addr, "expired peer on address");
expired.push((*peer, addr.clone()));
return false;
if !discovered.is_empty() {
let event = Event::Discovered(discovered);
// Push to the front of the queue so that the behavior event is reported before
// the individual discovered addresses.
self.pending_events
.push_front(ToSwarm::GenerateEvent(event));
continue;
}
// Emit expired event.
let now = Instant::now();
let mut closest_expiration = None;
let mut expired = Vec::new();
self.discovered_nodes.retain(|(peer, addr, expiration)| {
if *expiration <= now {
tracing::info!(%peer, address=%addr, "expired peer on address");
expired.push((*peer, addr.clone()));
return false;
}
closest_expiration =
Some(closest_expiration.unwrap_or(*expiration).min(*expiration));
true
});
if !expired.is_empty() {
let event = Event::Expired(expired);
self.pending_events.push_back(ToSwarm::GenerateEvent(event));
continue;
}
if let Some(closest_expiration) = closest_expiration {
let mut timer = P::Timer::at(closest_expiration);
let _ = Pin::new(&mut timer).poll_next(cx);

self.closest_expiration = Some(timer);
}
closest_expiration = Some(closest_expiration.unwrap_or(*expiration).min(*expiration));
true
});
if !expired.is_empty() {
let event = Event::Expired(expired);
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
if let Some(closest_expiration) = closest_expiration {
let mut timer = P::Timer::at(closest_expiration);
let _ = Pin::new(&mut timer).poll_next(cx);

self.closest_expiration = Some(timer);
return Poll::Pending;
}
Poll::Pending
}
}

Expand Down

0 comments on commit bd710df

Please sign in to comment.