From bd710df61aa91c759885ae72d08303fc51896848 Mon Sep 17 00:00:00 2001 From: hopinheimer <48147533+hopinheimer@users.noreply.github.com> Date: Mon, 23 Dec 2024 02:32:04 -0500 Subject: [PATCH] feat(mdns): emit `ToSwarm::NewExternalAddrOfPeer` on discovery fixes #5104 and superseeds #5179 Pull-Request: #5753. --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/mdns/CHANGELOG.md | 5 + protocols/mdns/Cargo.toml | 2 +- protocols/mdns/src/behaviour.rs | 178 ++++++++++++++++++-------------- 5 files changed, 111 insertions(+), 78 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 31df58e8ec4..43bcd4c8689 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2985,7 +2985,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" -version = "0.46.1" +version = "0.46.2" dependencies = [ "async-io", "async-std", diff --git a/Cargo.toml b/Cargo.toml index c77768db311..7a116949bcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,7 +84,7 @@ libp2p-gossipsub = { version = "0.48.0", path = "protocols/gossipsub" } libp2p-identify = { version = "0.46.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.10" } libp2p-kad = { version = "0.47.1", path = "protocols/kad" } -libp2p-mdns = { version = "0.46.1", path = "protocols/mdns" } +libp2p-mdns = { version = "0.46.2", path = "protocols/mdns" } libp2p-memory-connection-limits = { version = "0.3.1", path = "misc/memory-connection-limits" } libp2p-metrics = { version = "0.15.0", path = "misc/metrics" } libp2p-mplex = { version = "0.42.0", path = "muxers/mplex" } diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 61290703c34..98dc3d55454 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.46.2 + +- Emit `ToSwarm::NewExternalAddrOfPeer` on discovery. + See [PR 5753](https://github.com/libp2p/rust-libp2p/pull/5753) + ## 0.46.1 - Upgrade `hickory-proto`. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 16436848efe..618d41e9b9d 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mdns" edition = "2021" rust-version = { workspace = true } -version = "0.46.1" +version = "0.46.2" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index b6dde8f4487..68e28cf3d63 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -24,7 +24,11 @@ mod timer; use std::{ cmp, - collections::hash_map::{Entry, HashMap}, + collections::{ + hash_map::{Entry, HashMap}, + VecDeque, + }, + convert::Infallible, fmt, future::Future, io, @@ -188,6 +192,9 @@ where listen_addresses: Arc>, local_peer_id: PeerId, + + /// Pending behaviour events to be emitted. + pending_events: VecDeque>, } impl

Behaviour

@@ -208,6 +215,7 @@ where closest_expiration: Default::default(), listen_addresses: Default::default(), local_peer_id, + pending_events: Default::default(), }) } @@ -304,93 +312,113 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll>> { - // Poll ifwatch. - while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { - match event { - Ok(IfEvent::Up(inet)) => { - let addr = inet.addr(); - if addr.is_loopback() { - continue; - } - if addr.is_ipv4() && self.config.enable_ipv6 - || addr.is_ipv6() && !self.config.enable_ipv6 - { - continue; - } - if let Entry::Vacant(e) = self.if_tasks.entry(addr) { - match InterfaceState::::new( - addr, - self.config.clone(), - self.local_peer_id, - self.listen_addresses.clone(), - self.query_response_sender.clone(), - ) { - Ok(iface_state) => { - e.insert(P::spawn(iface_state)); - } - Err(err) => { - tracing::error!("failed to create `InterfaceState`: {}", err) + loop { + // Check for pending events and emit them. + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); + } + + // Poll ifwatch. + while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { + match event { + Ok(IfEvent::Up(inet)) => { + let addr = inet.addr(); + if addr.is_loopback() { + continue; + } + if addr.is_ipv4() && self.config.enable_ipv6 + || addr.is_ipv6() && !self.config.enable_ipv6 + { + continue; + } + if let Entry::Vacant(e) = self.if_tasks.entry(addr) { + match InterfaceState::::new( + addr, + self.config.clone(), + self.local_peer_id, + self.listen_addresses.clone(), + self.query_response_sender.clone(), + ) { + Ok(iface_state) => { + e.insert(P::spawn(iface_state)); + } + Err(err) => { + tracing::error!("failed to create `InterfaceState`: {}", err) + } } } } - } - Ok(IfEvent::Down(inet)) => { - if let Some(handle) = self.if_tasks.remove(&inet.addr()) { - tracing::info!(instance=%inet.addr(), "dropping instance"); + Ok(IfEvent::Down(inet)) => { + if let Some(handle) = self.if_tasks.remove(&inet.addr()) { + tracing::info!(instance=%inet.addr(), "dropping instance"); - handle.abort(); + handle.abort(); + } } + Err(err) => tracing::error!("if watch returned an error: {}", err), } - Err(err) => tracing::error!("if watch returned an error: {}", err), } - } - // Emit discovered event. - let mut discovered = Vec::new(); - - while let Poll::Ready(Some((peer, addr, expiration))) = - self.query_response_receiver.poll_next_unpin(cx) - { - if let Some((_, _, cur_expires)) = self - .discovered_nodes - .iter_mut() - .find(|(p, a, _)| *p == peer && *a == addr) + // Emit discovered event. + let mut discovered = Vec::new(); + + while let Poll::Ready(Some((peer, addr, expiration))) = + self.query_response_receiver.poll_next_unpin(cx) { - *cur_expires = cmp::max(*cur_expires, expiration); - } else { - tracing::info!(%peer, address=%addr, "discovered peer on address"); - self.discovered_nodes.push((peer, addr.clone(), expiration)); - discovered.push((peer, addr)); + if let Some((_, _, cur_expires)) = self + .discovered_nodes + .iter_mut() + .find(|(p, a, _)| *p == peer && *a == addr) + { + *cur_expires = cmp::max(*cur_expires, expiration); + } else { + tracing::info!(%peer, address=%addr, "discovered peer on address"); + self.discovered_nodes.push((peer, addr.clone(), expiration)); + discovered.push((peer, addr.clone())); + + self.pending_events + .push_back(ToSwarm::NewExternalAddrOfPeer { + peer_id: peer, + address: addr, + }); + } } - } - if !discovered.is_empty() { - let event = Event::Discovered(discovered); - return Poll::Ready(ToSwarm::GenerateEvent(event)); - } - // Emit expired event. - let now = Instant::now(); - let mut closest_expiration = None; - let mut expired = Vec::new(); - self.discovered_nodes.retain(|(peer, addr, expiration)| { - if *expiration <= now { - tracing::info!(%peer, address=%addr, "expired peer on address"); - expired.push((*peer, addr.clone())); - return false; + if !discovered.is_empty() { + let event = Event::Discovered(discovered); + // Push to the front of the queue so that the behavior event is reported before + // the individual discovered addresses. + self.pending_events + .push_front(ToSwarm::GenerateEvent(event)); + continue; + } + // Emit expired event. + let now = Instant::now(); + let mut closest_expiration = None; + let mut expired = Vec::new(); + self.discovered_nodes.retain(|(peer, addr, expiration)| { + if *expiration <= now { + tracing::info!(%peer, address=%addr, "expired peer on address"); + expired.push((*peer, addr.clone())); + return false; + } + closest_expiration = + Some(closest_expiration.unwrap_or(*expiration).min(*expiration)); + true + }); + if !expired.is_empty() { + let event = Event::Expired(expired); + self.pending_events.push_back(ToSwarm::GenerateEvent(event)); + continue; + } + if let Some(closest_expiration) = closest_expiration { + let mut timer = P::Timer::at(closest_expiration); + let _ = Pin::new(&mut timer).poll_next(cx); + + self.closest_expiration = Some(timer); } - closest_expiration = Some(closest_expiration.unwrap_or(*expiration).min(*expiration)); - true - }); - if !expired.is_empty() { - let event = Event::Expired(expired); - return Poll::Ready(ToSwarm::GenerateEvent(event)); - } - if let Some(closest_expiration) = closest_expiration { - let mut timer = P::Timer::at(closest_expiration); - let _ = Pin::new(&mut timer).poll_next(cx); - self.closest_expiration = Some(timer); + return Poll::Pending; } - Poll::Pending } }