diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 2b792309416..e9c272a5333 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -6,6 +6,8 @@ functionality using `Behaviour::set_status` to explicitly set `Status::{Enable,Disable}` to enable or disable protocol advertisement. See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154). +- Expire external address when a relay listener is closed without a replacement reservation. + See [PR 6285](https://github.com/libp2p/rust-libp2p/pull/6285). ## 0.21.1 - reduce allocations by replacing `get_or_insert` with `get_or_insert_with` diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index e8e987d43fb..bd3d2c08eee 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -40,12 +40,16 @@ use futures::{ ready, stream::StreamExt, }; -use libp2p_core::{Endpoint, Multiaddr, multiaddr::Protocol, transport::PortUse}; +use libp2p_core::{ + Endpoint, Multiaddr, + multiaddr::Protocol, + transport::{ListenerId, PortUse}, +}; use libp2p_identity::PeerId; use libp2p_swarm::{ ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour, NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, - behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}, + behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm, ListenerClosed}, dial_opts::DialOpts, dummy, }; @@ -100,6 +104,12 @@ pub struct Behaviour { /// `/p2p-circuit` address we reserved on it. reservation_addresses: HashMap, + /// Stores the [`ListenerId`] to the [`ConnectionId`] of the relay connection + /// that the listener's reservation is on. This allows us to expire external addresses + /// when a listener closes, but only if no other listener still holds a + /// reservation for the same address. + listener_id_to_connection_id: HashMap, + /// Queue of actions to return when polled. queued_actions: VecDeque>>, @@ -114,6 +124,7 @@ pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) { from_transport, directly_connected_peers: Default::default(), reservation_addresses: Default::default(), + listener_id_to_connection_id: Default::default(), queued_actions: Default::default(), pending_handler_commands: Default::default(), }; @@ -148,6 +159,8 @@ impl Behaviour { unreachable!("`on_connection_closed` for unconnected peer.") } }; + self.listener_id_to_connection_id + .retain(|_, cid| *cid != connection_id); if let Some((addr, ReservationStatus::Confirmed)) = self.reservation_addresses.remove(&connection_id) { @@ -156,6 +169,32 @@ impl Behaviour { } } } + + fn on_listener_closed(&mut self, ListenerClosed { listener_id, .. }: ListenerClosed) { + let Some(connection_id) = self.listener_id_to_connection_id.remove(&listener_id) else { + return; + }; + + // Only expire the external address if no other listener still holds + // a reservation on the same connection. During reservation replacement the new listener is + // registered before the old one closes, so there will still be another entry, so we should + // not emit the event to expire the address address. + let listenr_and_connection = self + .listener_id_to_connection_id + .values() + .any(|cid| *cid == connection_id); + + if listenr_and_connection { + return; + } + + if let Some((addr, ReservationStatus::Confirmed)) = + self.reservation_addresses.remove(&connection_id) + { + self.queued_actions + .push_back(ToSwarm::ExternalAddrExpired(addr)); + } + } } impl NetworkBehaviour for Behaviour { @@ -222,6 +261,7 @@ impl NetworkBehaviour for Behaviour { FromSwarm::ConnectionClosed(connection_closed) => { self.on_connection_closed(connection_closed) } + FromSwarm::ListenerClosed(listener_closed) => self.on_listener_closed(listener_closed), FromSwarm::DialFailure(DialFailure { connection_id, .. }) => { self.reservation_addresses.remove(&connection_id); self.pending_handler_commands.remove(&connection_id); @@ -285,6 +325,7 @@ impl NetworkBehaviour for Behaviour { let action = match ready!(self.from_transport.poll_next_unpin(cx)) { Some(transport::TransportToBehaviourMsg::ListenReq { + listener_id, relay_peer_id, relay_addr, to_listener, @@ -295,6 +336,8 @@ impl NetworkBehaviour for Behaviour { .and_then(|cs| cs.first()) { Some(connection_id) => { + self.listener_id_to_connection_id + .insert(listener_id, *connection_id); self.reservation_addresses.insert( *connection_id, ( @@ -319,6 +362,8 @@ impl NetworkBehaviour for Behaviour { .build(); let relayed_connection_id = opts.connection_id(); + self.listener_id_to_connection_id + .insert(listener_id, relayed_connection_id); self.reservation_addresses.insert( relayed_connection_id, ( diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index 6b523adedb5..f91f6560895 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -154,6 +154,7 @@ impl libp2p_core::Transport for Transport { let (to_listener, from_behaviour) = mpsc::channel(0); self.pending_to_behaviour .push_back(TransportToBehaviourMsg::ListenReq { + listener_id, relay_peer_id, relay_addr, to_listener, @@ -459,6 +460,7 @@ pub(crate) enum TransportToBehaviourMsg { }, /// Listen for incoming relayed connections via relay node. ListenReq { + listener_id: ListenerId, relay_peer_id: PeerId, relay_addr: Multiaddr, to_listener: mpsc::Sender, diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index 94994203964..35e276727e1 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -184,6 +184,77 @@ async fn new_reservation_to_same_relay_replaces_old() { } } +#[tokio::test] +async fn closing_relay_listener_expires_external_address() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::())); + let mut relay = build_relay(); + let relay_peer_id = *relay.local_peer_id(); + + relay.listen_on(relay_addr.clone()).unwrap(); + relay.add_external_address(relay_addr.clone()); + tokio::spawn(async move { + relay.collect::>().await; + }); + + let mut client = build_client(); + let client_peer_id = *client.local_peer_id(); + let client_addr = relay_addr + .with(Protocol::P2p(relay_peer_id)) + .with(Protocol::P2pCircuit); + let client_addr_with_peer_id = client_addr.clone().with(Protocol::P2p(client_peer_id)); + + let listener_id = client.listen_on(client_addr.clone()).unwrap(); + + // Wait for connection to relay. + assert!(wait_for_dial(&mut client, relay_peer_id).await); + + // Wait for reservation to be accepted. + wait_for_reservation( + &mut client, + client_addr_with_peer_id.clone(), + relay_peer_id, + false, // No renewal. + ) + .await; + + // Now remove the relay listener. + assert!(client.remove_listener(listener_id)); + + // Expect the listener to close and the external address to expire. + let mut listener_closed = false; + let mut external_addr_expired = false; + loop { + match client.select_next_some().await { + SwarmEvent::ListenerClosed { + listener_id: closed_id, + addresses, + .. + } => { + assert_eq!(closed_id, listener_id); + assert_eq!(addresses, vec![client_addr_with_peer_id.clone()]); + listener_closed = true; + } + SwarmEvent::ExternalAddrExpired { address } => { + assert_eq!(address, client_addr_with_peer_id); + external_addr_expired = true; + } + SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} + SwarmEvent::ExpiredListenAddr { .. } => {} + e => panic!("{e:?}"), + } + if listener_closed && external_addr_expired { + break; + } + } + + assert!(listener_closed); + assert!(external_addr_expired); +} + #[tokio::test] async fn connect() { let _ = tracing_subscriber::fmt()