Skip to content
Open
2 changes: 2 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
functionality using `Behaviour::set_status` to explicitly set `Status::{Enable,Disable}` to enable or disable
protocol advertisement.
See [PR 6154](https://github.com/libp2p/rust-libp2p/pull/6154).
- Expire external address when a relay listener is closed without a replacement reservation.
See [PR 6285](https://github.com/libp2p/rust-libp2p/pull/6285).

## 0.21.1
- reduce allocations by replacing `get_or_insert` with `get_or_insert_with`
Expand Down
49 changes: 47 additions & 2 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ use futures::{
ready,
stream::StreamExt,
};
use libp2p_core::{Endpoint, Multiaddr, multiaddr::Protocol, transport::PortUse};
use libp2p_core::{
Endpoint, Multiaddr,
multiaddr::Protocol,
transport::{ListenerId, PortUse},
};
use libp2p_identity::PeerId;
use libp2p_swarm::{
ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm},
behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm, ListenerClosed},
dial_opts::DialOpts,
dummy,
};
Expand Down Expand Up @@ -100,6 +104,12 @@ pub struct Behaviour {
/// `/p2p-circuit` address we reserved on it.
reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,

/// Stores the [`ListenerId`] to the [`ConnectionId`] of the relay connection
/// that the listener's reservation is on. This allows us to expire external addresses
/// when a listener closes, but only if no other listener still holds a
/// reservation for the same address.
listener_id_to_connection_id: HashMap<ListenerId, ConnectionId>,

/// Queue of actions to return when polled.
queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Infallible>>>,

Expand All @@ -114,6 +124,7 @@ pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
from_transport,
directly_connected_peers: Default::default(),
reservation_addresses: Default::default(),
listener_id_to_connection_id: Default::default(),
queued_actions: Default::default(),
pending_handler_commands: Default::default(),
};
Expand Down Expand Up @@ -148,6 +159,8 @@ impl Behaviour {
unreachable!("`on_connection_closed` for unconnected peer.")
}
};
self.listener_id_to_connection_id
.retain(|_, cid| *cid != connection_id);
if let Some((addr, ReservationStatus::Confirmed)) =
self.reservation_addresses.remove(&connection_id)
{
Expand All @@ -156,6 +169,32 @@ impl Behaviour {
}
}
}

fn on_listener_closed(&mut self, ListenerClosed { listener_id, .. }: ListenerClosed) {
let Some(connection_id) = self.listener_id_to_connection_id.remove(&listener_id) else {
return;
};

// Only expire the external address if no other listener still holds
// a reservation on the same connection. During reservation replacement the new listener is
// registered before the old one closes, so there will still be another entry, so we should
// not emit the event to expire the address address.
let listenr_and_connection = self
.listener_id_to_connection_id
.values()
.any(|cid| *cid == connection_id);

if listenr_and_connection {
return;
}

if let Some((addr, ReservationStatus::Confirmed)) =
self.reservation_addresses.remove(&connection_id)
{
self.queued_actions
.push_back(ToSwarm::ExternalAddrExpired(addr));
}
}
}

impl NetworkBehaviour for Behaviour {
Expand Down Expand Up @@ -222,6 +261,7 @@ impl NetworkBehaviour for Behaviour {
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::ListenerClosed(listener_closed) => self.on_listener_closed(listener_closed),
FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
self.reservation_addresses.remove(&connection_id);
self.pending_handler_commands.remove(&connection_id);
Expand Down Expand Up @@ -285,6 +325,7 @@ impl NetworkBehaviour for Behaviour {

let action = match ready!(self.from_transport.poll_next_unpin(cx)) {
Some(transport::TransportToBehaviourMsg::ListenReq {
listener_id,
relay_peer_id,
relay_addr,
to_listener,
Expand All @@ -295,6 +336,8 @@ impl NetworkBehaviour for Behaviour {
.and_then(|cs| cs.first())
{
Some(connection_id) => {
self.listener_id_to_connection_id
.insert(listener_id, *connection_id);
self.reservation_addresses.insert(
*connection_id,
(
Expand All @@ -319,6 +362,8 @@ impl NetworkBehaviour for Behaviour {
.build();
let relayed_connection_id = opts.connection_id();

self.listener_id_to_connection_id
.insert(listener_id, relayed_connection_id);
self.reservation_addresses.insert(
relayed_connection_id,
(
Expand Down
2 changes: 2 additions & 0 deletions protocols/relay/src/priv_client/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl libp2p_core::Transport for Transport {
let (to_listener, from_behaviour) = mpsc::channel(0);
self.pending_to_behaviour
.push_back(TransportToBehaviourMsg::ListenReq {
listener_id,
relay_peer_id,
relay_addr,
to_listener,
Expand Down Expand Up @@ -459,6 +460,7 @@ pub(crate) enum TransportToBehaviourMsg {
},
/// Listen for incoming relayed connections via relay node.
ListenReq {
listener_id: ListenerId,
relay_peer_id: PeerId,
relay_addr: Multiaddr,
to_listener: mpsc::Sender<ToListenerMsg>,
Expand Down
71 changes: 71 additions & 0 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,77 @@ async fn new_reservation_to_same_relay_replaces_old() {
}
}

#[tokio::test]
async fn closing_relay_listener_expires_external_address() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

let relay_addr = Multiaddr::empty().with(Protocol::Memory(rand::random::<u64>()));
let mut relay = build_relay();
let relay_peer_id = *relay.local_peer_id();

relay.listen_on(relay_addr.clone()).unwrap();
relay.add_external_address(relay_addr.clone());
tokio::spawn(async move {
relay.collect::<Vec<_>>().await;
});

let mut client = build_client();
let client_peer_id = *client.local_peer_id();
let client_addr = relay_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit);
let client_addr_with_peer_id = client_addr.clone().with(Protocol::P2p(client_peer_id));

let listener_id = client.listen_on(client_addr.clone()).unwrap();

// Wait for connection to relay.
assert!(wait_for_dial(&mut client, relay_peer_id).await);

// Wait for reservation to be accepted.
wait_for_reservation(
&mut client,
client_addr_with_peer_id.clone(),
relay_peer_id,
false, // No renewal.
)
.await;

// Now remove the relay listener.
assert!(client.remove_listener(listener_id));

// Expect the listener to close and the external address to expire.
let mut listener_closed = false;
let mut external_addr_expired = false;
loop {
match client.select_next_some().await {
SwarmEvent::ListenerClosed {
listener_id: closed_id,
addresses,
..
} => {
assert_eq!(closed_id, listener_id);
assert_eq!(addresses, vec![client_addr_with_peer_id.clone()]);
listener_closed = true;
}
SwarmEvent::ExternalAddrExpired { address } => {
assert_eq!(address, client_addr_with_peer_id);
external_addr_expired = true;
}
SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {}
SwarmEvent::ExpiredListenAddr { .. } => {}
e => panic!("{e:?}"),
}
if listener_closed && external_addr_expired {
break;
}
}

assert!(listener_closed);
assert!(external_addr_expired);
}

#[tokio::test]
async fn connect() {
let _ = tracing_subscriber::fmt()
Expand Down
Loading