From 4906372193f5997708b69db3787fb398bd80bd2e Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Wed, 23 Apr 2025 16:25:47 +0200 Subject: [PATCH 1/8] peer-store: single event `Store::Event` --- misc/peer-store/src/behaviour.rs | 46 ++++-------------------- misc/peer-store/src/lib.rs | 2 +- misc/peer-store/src/memory_store.rs | 56 ++++++++++++++--------------- misc/peer-store/src/store.rs | 18 ++++------ 4 files changed, 41 insertions(+), 81 deletions(-) diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs index 36bbd11d2c1..2b88c2f8fa5 100644 --- a/misc/peer-store/src/behaviour.rs +++ b/misc/peer-store/src/behaviour.rs @@ -1,24 +1,8 @@ -use std::{collections::VecDeque, task::Poll}; - use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::{dummy, NetworkBehaviour}; use crate::store::Store; -/// Events generated by [`Behaviour`] and emitted back to [`Swarm`](libp2p_swarm::Swarm). -#[derive(Debug, Clone)] -pub enum Event { - /// The peer's record has been updated. - /// Manually updating a record will always emit this event - /// even if it provides no new information. - RecordUpdated { - /// The peer that has an update. - peer: PeerId, - }, - /// Event from the internal store. - Store(T), -} - /// Behaviour that maintains a peer address book. /// /// Usage: @@ -39,8 +23,6 @@ pub enum Event { pub struct Behaviour { /// The internal store. store: S, - /// Pending Events to be emitted back to [`Swarm`](libp2p_swarm::Swarm). - pending_events: VecDeque>, } impl<'a, S> Behaviour @@ -49,10 +31,7 @@ where { /// Build a new [`Behaviour`] with the given store. pub fn new(store: S) -> Self { - Self { - store, - pending_events: VecDeque::new(), - } + Self { store } } /// Try to get all observed address of the given peer. @@ -73,24 +52,16 @@ where pub fn store_mut(&mut self) -> &mut S { &mut self.store } - - fn handle_store_event(&mut self, event: crate::store::Event<::FromStore>) { - use crate::store::Event::*; - match event { - RecordUpdated(peer) => self.pending_events.push_back(Event::RecordUpdated { peer }), - Store(ev) => self.pending_events.push_back(Event::Store(ev)), - } - } } impl NetworkBehaviour for Behaviour where S: Store + 'static, - ::FromStore: Send + Sync, + ::Event: Send + Sync, { type ConnectionHandler = dummy::ConnectionHandler; - type ToSwarm = Event; + type ToSwarm = S::Event; fn handle_established_inbound_connection( &mut self, @@ -149,13 +120,8 @@ where cx: &mut std::task::Context<'_>, ) -> std::task::Poll>> { - if let Some(ev) = self.store.poll(cx) { - self.handle_store_event(ev); - }; - - if let Some(ev) = self.pending_events.pop_front() { - return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); - } - Poll::Pending + self.store + .poll(cx) + .map(libp2p_swarm::ToSwarm::GenerateEvent) } } diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs index e306b828f51..31f7eb6497c 100644 --- a/misc/peer-store/src/lib.rs +++ b/misc/peer-store/src/lib.rs @@ -21,5 +21,5 @@ mod behaviour; pub mod memory_store; mod store; -pub use behaviour::{Behaviour, Event}; +pub use behaviour::Behaviour; pub use store::Store; diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 49e542a9e2f..f4d2361cf26 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -11,7 +11,7 @@ use std::{ collections::{HashMap, VecDeque}, num::NonZeroUsize, - task::Waker, + task::{Poll, Waker}, }; use libp2p_core::{Multiaddr, PeerId}; @@ -25,6 +25,8 @@ use super::Store; pub enum Event { /// Custom data of the peer has been updated. CustomDataUpdated(PeerId), + /// An address record has been updated. + RecordUpdated(PeerId), } /// A in-memory store that uses LRU cache for bounded storage of addresses @@ -34,7 +36,7 @@ pub struct MemoryStore { /// The internal store. records: HashMap>, /// Events to emit to [`Behaviour`](crate::Behaviour) and [`Swarm`](libp2p_swarm::Swarm). - pending_events: VecDeque>, + pending_events: VecDeque, /// Config of the store. config: Config, /// Waker for store events. @@ -63,7 +65,7 @@ impl MemoryStore { pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { let is_updated = self.update_address_silent(peer, address, true); if is_updated { - self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer)); + self.push_event_and_wake(Event::RecordUpdated(*peer)); } is_updated } @@ -92,7 +94,7 @@ impl MemoryStore { pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { let is_updated = self.remove_address_silent(peer, address, true); if is_updated { - self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer)); + self.push_event_and_wake(Event::RecordUpdated(*peer)); } is_updated } @@ -132,7 +134,7 @@ impl MemoryStore { /// Insert the data and notify the swarm about the update, dropping the old data if it exists. pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) { self.insert_custom_data_silent(peer, custom_data); - self.push_event_and_wake(crate::store::Event::Store(Event::CustomDataUpdated(*peer))); + self.push_event_and_wake(Event::CustomDataUpdated(*peer)); } /// Insert the data without notifying the swarm. Old data will be dropped if it exists. @@ -154,7 +156,7 @@ impl MemoryStore { .get(peer) .is_some_and(|r| r.get_custom_data().is_some()) { - self.push_event_and_wake(crate::store::Event::Store(Event::CustomDataUpdated(*peer))); + self.push_event_and_wake(Event::CustomDataUpdated(*peer)); }; self.records @@ -173,7 +175,7 @@ impl MemoryStore { self.records.iter_mut() } - fn push_event_and_wake(&mut self, event: crate::store::Event) { + fn push_event_and_wake(&mut self, event: Event) { self.pending_events.push_back(event); if let Some(waker) = self.waker.take() { waker.wake(); // wake up because of update @@ -182,13 +184,13 @@ impl MemoryStore { } impl Store for MemoryStore { - type FromStore = Event; + type Event = Event; fn on_swarm_event(&mut self, swarm_event: &FromSwarm) { match swarm_event { FromSwarm::NewExternalAddrOfPeer(info) => { if self.update_address_silent(&info.peer_id, info.addr, false) { - self.push_event_and_wake(crate::store::Event::RecordUpdated(info.peer_id)); + self.push_event_and_wake(Event::RecordUpdated(info.peer_id)); } } FromSwarm::ConnectionEstablished(ConnectionEstablished { @@ -207,7 +209,7 @@ impl Store for MemoryStore { is_record_updated |= self.update_address_silent(peer_id, endpoint.get_remote_address(), false); if is_record_updated { - self.push_event_and_wake(crate::store::Event::RecordUpdated(*peer_id)); + self.push_event_and_wake(Event::RecordUpdated(*peer_id)); } } FromSwarm::DialFailure(info) => { @@ -224,17 +226,15 @@ impl Store for MemoryStore { DialError::LocalPeerId { .. } => { // The stored peer is the local peer. Remove peer fully. if self.records.remove(&peer).is_some() { - self.push_event_and_wake(crate::store::Event::RecordUpdated(peer)); + self.push_event_and_wake(Event::RecordUpdated(peer)); } } DialError::WrongPeerId { obtained, address } => { // The stored peer id is incorrect, remove incorrect and add correct one. if self.remove_address_silent(&peer, address, false) { - self.push_event_and_wake(crate::store::Event::RecordUpdated(peer)); + self.push_event_and_wake(Event::RecordUpdated(peer)); if self.update_address_silent(obtained, address, false) { - self.push_event_and_wake(crate::store::Event::RecordUpdated( - *obtained, - )); + self.push_event_and_wake(Event::RecordUpdated(*obtained)); } } } @@ -245,7 +245,7 @@ impl Store for MemoryStore { is_record_updated |= self.remove_address_silent(&peer, addr, false); } if is_record_updated { - self.push_event_and_wake(crate::store::Event::RecordUpdated(peer)); + self.push_event_and_wake(Event::RecordUpdated(peer)); } } _ => {} @@ -259,14 +259,14 @@ impl Store for MemoryStore { self.records.get(peer).map(|record| record.addresses()) } - fn poll( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Option> { - if self.pending_events.is_empty() { - self.waker = Some(cx.waker().clone()); + fn poll(&mut self, cx: &mut std::task::Context<'_>) -> Poll { + match self.pending_events.pop_front() { + Some(ev) => Poll::Ready(ev), + None => { + self.waker = Some(cx.waker().clone()); + Poll::Pending + } } - self.pending_events.pop_front() } } @@ -396,7 +396,7 @@ mod test { use libp2p_swarm::{NetworkBehaviour, SwarmEvent}; use libp2p_swarm_test::SwarmExt; - use super::MemoryStore; + use super::{Event, MemoryStore}; use crate::Store; #[test] @@ -475,7 +475,7 @@ mod test { ) { swarm .wait(|ev| match ev { - SwarmEvent::Behaviour(crate::Event::RecordUpdated { peer }) => { + SwarmEvent::Behaviour(Event::RecordUpdated(peer)) => { (peer == expected_peer).then_some(()) } _ => None, @@ -563,9 +563,9 @@ mod test { async fn expect_record_update(swarm: &mut Swarm, expected_peer: PeerId) { swarm .wait(|ev| match ev { - SwarmEvent::Behaviour(BehaviourEvent::PeerStore( - crate::Event::RecordUpdated { peer }, - )) => (peer == expected_peer).then_some(()), + SwarmEvent::Behaviour(BehaviourEvent::PeerStore(Event::RecordUpdated( + peer, + ))) => (peer == expected_peer).then_some(()), _ => None, }) .await diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs index 1226c8f6071..adf5087b80d 100644 --- a/misc/peer-store/src/store.rs +++ b/misc/peer-store/src/store.rs @@ -1,4 +1,7 @@ -use std::{fmt::Debug, task::Context}; +use std::{ + fmt::Debug, + task::{Context, Poll}, +}; use libp2p_core::{Multiaddr, PeerId}; use libp2p_swarm::FromSwarm; @@ -7,7 +10,7 @@ use libp2p_swarm::FromSwarm; pub trait Store { /// Event generated by the store and emitted to [`Swarm`](libp2p_swarm::Swarm). /// [`Behaviour`](crate::Behaviour) cannot handle this event. - type FromStore: Debug + Send; + type Event: Debug + Send; /// How this store handles events from [`Swarm`](libp2p_swarm::Swarm). fn on_swarm_event(&mut self, event: &FromSwarm); @@ -18,14 +21,5 @@ pub trait Store { /// Polls for things that the store should do. /// The task should be waked up to emit events to [`Behaviour`](crate::Behaviour) and /// [`Swarm`](libp2p_swarm::Swarm). - fn poll(&mut self, cx: &mut Context<'_>) -> Option>; -} - -/// Event that will be handled by [`Behaviour`](crate::Behaviour). -pub enum Event { - /// An address record has been updated. - RecordUpdated(PeerId), - /// Event generated by the store. - /// [`Behaviour`](crate::Behaviour) can only forward the event to swarm. - Store(T), + fn poll(&mut self, cx: &mut Context<'_>) -> Poll; } From e676e4748317e6dbc1a44c2097a1a221a303dbbc Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Sun, 27 Apr 2025 01:09:30 +0200 Subject: [PATCH 2/8] peer-store/memory-store: remove `Event::CustomDataUpdated` No need to emit an event for a synchronous, user-triggered operation. --- misc/peer-store/src/memory_store.rs | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index f4d2361cf26..65fdadcabad 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -23,8 +23,6 @@ use super::Store; /// Event from the store and emitted to [`Swarm`](libp2p_swarm::Swarm). #[derive(Debug, Clone)] pub enum Event { - /// Custom data of the peer has been updated. - CustomDataUpdated(PeerId), /// An address record has been updated. RecordUpdated(PeerId), } @@ -131,14 +129,8 @@ impl MemoryStore { None } - /// Insert the data and notify the swarm about the update, dropping the old data if it exists. + /// Insert the data, dropping the old data if it exists. pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) { - self.insert_custom_data_silent(peer, custom_data); - self.push_event_and_wake(Event::CustomDataUpdated(*peer)); - } - - /// Insert the data without notifying the swarm. Old data will be dropped if it exists. - fn insert_custom_data_silent(&mut self, peer: &PeerId, custom_data: T) { if let Some(r) = self.records.get_mut(peer) { return r.insert_custom_data(custom_data); } @@ -147,18 +139,8 @@ impl MemoryStore { self.records.insert(*peer, new_record); } - /// Get a mutable reference to a peer's custom data. If it exists, the swarm is notified about - /// its modification, regardless of whether it will actually be modified. + /// Get a mutable reference to a peer's custom data. pub fn get_custom_data_mut(&mut self, peer: &PeerId) -> Option<&mut T> { - // We have to do this separately to avoid a double borrow. - if self - .records - .get(peer) - .is_some_and(|r| r.get_custom_data().is_some()) - { - self.push_event_and_wake(Event::CustomDataUpdated(*peer)); - }; - self.records .get_mut(peer) .and_then(|r| r.get_custom_data_mut()) From f6327f952611ad51562389c449ce3e2a1fb4a17a Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Sun, 27 Apr 2025 01:07:48 +0200 Subject: [PATCH 3/8] peer-store/memory-store: include more info in events --- misc/peer-store/src/memory_store.rs | 107 +++++++++++----------------- 1 file changed, 43 insertions(+), 64 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 65fdadcabad..f01333c7929 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -23,8 +23,10 @@ use super::Store; /// Event from the store and emitted to [`Swarm`](libp2p_swarm::Swarm). #[derive(Debug, Clone)] pub enum Event { - /// An address record has been updated. - RecordUpdated(PeerId), + /// A new peer address has been added to the store. + PeerAddressAdded { peer_id: PeerId, address: Multiaddr }, + /// A peer address has been removed from the store. + PeerAddressRemoved { peer_id: PeerId, address: Multiaddr }, } /// A in-memory store that uses LRU cache for bounded storage of addresses @@ -52,7 +54,7 @@ impl MemoryStore { } } - /// Update an address record and notify swarm if the address is new. + /// Add an address for a peer. /// /// The added address will NOT be removed from the store on dial failure. If the added address /// is supposed to be cleared from the store on dial failure, add it by emitting @@ -60,53 +62,49 @@ impl MemoryStore { /// [`Swarm::add_peer_address`](libp2p_swarm::Swarm::add_peer_address). /// /// Returns `true` if the address is new. - pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { - let is_updated = self.update_address_silent(peer, address, true); - if is_updated { - self.push_event_and_wake(Event::RecordUpdated(*peer)); - } - is_updated + pub fn add_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + self.add_address_inner(peer, address, true) } /// Update an address record without notifying swarm. /// /// Returns `true` if the address is new. - fn update_address_silent( - &mut self, - peer: &PeerId, - address: &Multiaddr, - permanent: bool, - ) -> bool { - if let Some(record) = self.records.get_mut(peer) { - return record.update_address(address, permanent); + fn add_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool { + let record = self + .records + .entry(*peer) + .or_insert(PeerRecord::new(self.config.record_capacity)); + let is_new = record.add_address(address, permanent); + if is_new { + self.push_event_and_wake(Event::PeerAddressAdded { + peer_id: *peer, + address: address.clone(), + }); } - let mut new_record = PeerRecord::new(self.config.record_capacity); - new_record.update_address(address, permanent); - self.records.insert(*peer, new_record); - true + is_new } /// Remove an address record. /// /// Returns `true` when the address existed. pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { - let is_updated = self.remove_address_silent(peer, address, true); - if is_updated { - self.push_event_and_wake(Event::RecordUpdated(*peer)); - } - is_updated + self.remove_address_inner(peer, address, true) } /// Remove an address record without notifying swarm. /// /// Returns `true` when the address is removed, `false` if the address didn't exist /// or the address is permanent and `force` false. - fn remove_address_silent(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool { + fn remove_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool { if let Some(record) = self.records.get_mut(peer) { if record.remove_address(address, force) { - if record.addresses.is_empty() && record.custom_data.is_none() { + if record.addresses.is_empty() { self.records.remove(peer); } + self.push_event_and_wake(Event::PeerAddressRemoved { + peer_id: *peer, + address: address.clone(), + }); return true; } } @@ -152,7 +150,6 @@ impl MemoryStore { } /// Iterate over all internal records mutably. - /// Will not wake up the task. pub fn record_iter_mut(&mut self) -> impl Iterator)> { self.records.iter_mut() } @@ -171,9 +168,7 @@ impl Store for MemoryStore { fn on_swarm_event(&mut self, swarm_event: &FromSwarm) { match swarm_event { FromSwarm::NewExternalAddrOfPeer(info) => { - if self.update_address_silent(&info.peer_id, info.addr, false) { - self.push_event_and_wake(Event::RecordUpdated(info.peer_id)); - } + self.add_address_inner(&info.peer_id, info.addr, false); } FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, @@ -181,18 +176,12 @@ impl Store for MemoryStore { endpoint, .. }) if endpoint.is_dialer() => { - let mut is_record_updated = false; if self.config.remove_addr_on_dial_error { for failed_addr in *failed_addresses { - is_record_updated |= - self.remove_address_silent(peer_id, failed_addr, false); + self.remove_address_inner(peer_id, failed_addr, false); } } - is_record_updated |= - self.update_address_silent(peer_id, endpoint.get_remote_address(), false); - if is_record_updated { - self.push_event_and_wake(Event::RecordUpdated(*peer_id)); - } + self.add_address_inner(peer_id, endpoint.get_remote_address(), false); } FromSwarm::DialFailure(info) => { if !self.config.remove_addr_on_dial_error { @@ -205,29 +194,15 @@ impl Store for MemoryStore { }; match info.error { - DialError::LocalPeerId { .. } => { - // The stored peer is the local peer. Remove peer fully. - if self.records.remove(&peer).is_some() { - self.push_event_and_wake(Event::RecordUpdated(peer)); - } - } DialError::WrongPeerId { obtained, address } => { // The stored peer id is incorrect, remove incorrect and add correct one. - if self.remove_address_silent(&peer, address, false) { - self.push_event_and_wake(Event::RecordUpdated(peer)); - if self.update_address_silent(obtained, address, false) { - self.push_event_and_wake(Event::RecordUpdated(*obtained)); - } + if self.remove_address_inner(&peer, address, false) { + self.add_address_inner(obtained, address, false); } } DialError::Transport(errors) => { - // Remove all attempted addresses. - let mut is_record_updated = false; for (addr, _) in errors { - is_record_updated |= self.remove_address_silent(&peer, addr, false); - } - if is_record_updated { - self.push_event_and_wake(Event::RecordUpdated(peer)); + self.remove_address_inner(&peer, addr, false); } } _ => {} @@ -330,7 +305,7 @@ impl PeerRecord { /// insert it to the front if not. /// /// Returns true when the address is new. - pub fn update_address(&mut self, address: &Multiaddr, permanent: bool) -> bool { + pub fn add_address(&mut self, address: &Multiaddr, permanent: bool) -> bool { if let Some(was_permanent) = self.addresses.get(address) { if !*was_permanent && permanent { self.addresses.get_or_insert(address.clone(), || permanent); @@ -367,6 +342,10 @@ impl PeerRecord { pub fn insert_custom_data(&mut self, custom_data: T) { let _ = self.custom_data.insert(custom_data); } + + pub fn is_empty(&self) -> bool { + self.addresses.is_empty() && self.custom_data.is_none() + } } #[cfg(test)] @@ -388,9 +367,9 @@ mod test { let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); let addr3 = Multiaddr::from_str("/ip4/127.0.0.3").expect("parsing to succeed"); - store.update_address(&peer, &addr1); - store.update_address(&peer, &addr2); - store.update_address(&peer, &addr3); + store.add_address(&peer, &addr1); + store.add_address(&peer, &addr2); + store.add_address(&peer, &addr3); assert!( store .records @@ -400,7 +379,7 @@ mod test { .collect::>() == vec![&addr3, &addr2, &addr1] ); - store.update_address(&peer, &addr1); + store.add_address(&peer, &addr1); assert!( store .records @@ -410,7 +389,7 @@ mod test { .collect::>() == vec![&addr1, &addr3, &addr2] ); - store.update_address(&peer, &addr3); + store.add_address(&peer, &addr3); assert!( store .records @@ -428,7 +407,7 @@ mod test { let peer = PeerId::random(); for i in 1..10 { let addr_string = format!("/ip4/127.0.0.{}", i); - store.update_address( + store.add_address( &peer, &Multiaddr::from_str(&addr_string).expect("parsing to succeed"), ); From 2418412f67ffe202d9407e065355be8c8db9b049 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Sun, 27 Apr 2025 01:08:03 +0200 Subject: [PATCH 4/8] peer-store/memory-store: fix tests --- misc/peer-store/src/memory_store.rs | 154 +++++++++++++--------------- 1 file changed, 72 insertions(+), 82 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index f01333c7929..ab0cb8aa634 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -352,9 +352,9 @@ impl PeerRecord { mod test { use std::{num::NonZero, str::FromStr}; - use libp2p::Swarm; - use libp2p_core::{Multiaddr, PeerId}; - use libp2p_swarm::{NetworkBehaviour, SwarmEvent}; + use libp2p::identify; + use libp2p_core::{multiaddr::Protocol, Multiaddr, PeerId}; + use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; use libp2p_swarm_test::SwarmExt; use super::{Event, MemoryStore}; @@ -433,15 +433,15 @@ mod test { async fn expect_record_update( swarm: &mut Swarm>, expected_peer: PeerId, + expected_address: Option<&Multiaddr>, ) { - swarm - .wait(|ev| match ev { - SwarmEvent::Behaviour(Event::RecordUpdated(peer)) => { - (peer == expected_peer).then_some(()) - } - _ => None, - }) - .await + match swarm.next_behaviour_event().await { + Event::PeerAddressAdded { peer_id, address } => { + assert_eq!(peer_id, expected_peer); + assert!(expected_address.is_none_or(|a| *a == address)); + } + ev => panic!("Unexpected event {:?}.", ev), + } } let store1: MemoryStore<()> = MemoryStore::new( @@ -458,19 +458,13 @@ mod test { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { - let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await; let swarm1_peer_id = *swarm1.local_peer_id(); let swarm2_peer_id = *swarm2.local_peer_id(); - swarm2.dial(listen_addr.clone()).expect("dial to succeed"); - let handle = spawn_wait_conn_established(swarm1); - swarm2 - .wait(|ev| match ev { - SwarmEvent::ConnectionEstablished { .. } => Some(()), - _ => None, - }) - .await; - let mut swarm1 = handle.await.expect("future to complete"); - expect_record_update(&mut swarm2, swarm1_peer_id).await; + swarm2.connect(&mut swarm1).await; + + listen_addr.push(Protocol::P2p(swarm1_peer_id.into())); + expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await; assert!(swarm2 .behaviour() .address_of_peer(&swarm1_peer_id) @@ -481,8 +475,8 @@ mod test { .behaviour() .address_of_peer(&swarm2_peer_id) .is_none()); - let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await; - let handle = spawn_wait_conn_established(swarm1); + let (mut new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + tokio::spawn(swarm1.loop_on_next()); swarm2 .dial( libp2p_swarm::dial_opts::DialOpts::peer_id(swarm1_peer_id) @@ -497,19 +491,19 @@ mod test { _ => None, }) .await; - handle.await.expect("future to complete"); - expect_record_update(&mut swarm2, swarm1_peer_id).await; + new_listen_addr.push(Protocol::P2p(swarm1_peer_id.into())); + expect_record_update(&mut swarm2, swarm1_peer_id, Some(&new_listen_addr)).await; // The address in store will contain peer ID. let new_listen_addr = new_listen_addr .with_p2p(swarm1_peer_id) .expect("extend to succeed"); - assert!( + assert_eq!( swarm2 .behaviour() .address_of_peer(&swarm1_peer_id) .expect("peer to exist") - .collect::>() - == vec![&new_listen_addr, &listen_addr] + .collect::>(), + vec![&new_listen_addr, &listen_addr] ); }) } @@ -519,17 +513,24 @@ mod test { #[derive(NetworkBehaviour)] struct Behaviour { peer_store: crate::Behaviour, - identify: libp2p::identify::Behaviour, + identify: identify::Behaviour, } - async fn expect_record_update(swarm: &mut Swarm, expected_peer: PeerId) { - swarm - .wait(|ev| match ev { - SwarmEvent::Behaviour(BehaviourEvent::PeerStore(Event::RecordUpdated( - peer, - ))) => (peer == expected_peer).then_some(()), - _ => None, - }) - .await + async fn expect_record_update( + swarm: &mut Swarm, + expected_peer: PeerId, + expected_address: Option<&Multiaddr>, + ) { + loop { + match swarm.next_behaviour_event().await { + BehaviourEvent::PeerStore(Event::PeerAddressAdded { peer_id, address }) => { + assert_eq!(peer_id, expected_peer); + assert!(expected_address.is_none_or(|a| *a == address)); + break; + } + ev @ BehaviourEvent::PeerStore(_) => panic!("Unexpected event {:?}.", ev), + _ => {} + } + } } fn build_swarm() -> Swarm { Swarm::new_ephemeral_tokio(|kp| Behaviour { @@ -537,7 +538,7 @@ mod test { crate::memory_store::Config::default() .set_record_capacity(NonZero::new(4).expect("4 > 0")), )), - identify: libp2p::identify::Behaviour::new(libp2p::identify::Config::new( + identify: identify::Behaviour::new(identify::Config::new( "/TODO/0.0.1".to_string(), kp.public(), )), @@ -548,29 +549,32 @@ mod test { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { - let (listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + let (mut listen_addr, _) = swarm1.listen().with_memory_addr_external().await; let swarm1_peer_id = *swarm1.local_peer_id(); let swarm2_peer_id = *swarm2.local_peer_id(); - swarm2.dial(listen_addr.clone()).expect("dial to succeed"); - let handle = spawn_wait_conn_established(swarm1); - let mut swarm2 = spawn_wait_conn_established(swarm2) - .await - .expect("future to complete"); - let mut swarm1 = handle.await.expect("future to complete"); - expect_record_update(&mut swarm2, swarm1_peer_id).await; - assert!(swarm2 - .behaviour() - .peer_store - .address_of_peer(&swarm1_peer_id) - .expect("swarm should be connected and record about it should be created") - .any(|addr| *addr == listen_addr)); - assert!(swarm1 - .behaviour() - .peer_store - .address_of_peer(&swarm2_peer_id) - .is_none()); - swarm1.next_swarm_event().await; // skip `identify::Event::Sent` - swarm1.next_swarm_event().await; // skip `identify::Event::Received` + swarm2.connect(&mut swarm1).await; + + listen_addr.push(Protocol::P2p(swarm1_peer_id.into())); + expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await; + + assert_eq!( + swarm2 + .behaviour() + .peer_store + .address_of_peer(&swarm1_peer_id) + .expect("swarm should be connected and record about it should be created") + .collect::>(), + vec![&listen_addr] + ); + + assert!(matches!( + swarm1.next_behaviour_event().await, + BehaviourEvent::Identify(identify::Event::Sent { .. }) + )); + assert!(matches!( + swarm1.next_behaviour_event().await, + BehaviourEvent::Identify(identify::Event::Received { .. }) + )); let (new_listen_addr, _) = swarm1.listen().with_memory_addr_external().await; swarm1.behaviour_mut().identify.push([swarm2_peer_id]); tokio::spawn(swarm1.loop_on_next()); @@ -578,32 +582,18 @@ mod test { // 2 pair of mem and tcp address for two calls to `::listen()` // with one address already present through direct connection. // FLAKY: tcp addresses are not explicitly marked as external addresses. - expect_record_update(&mut swarm2, swarm1_peer_id).await; - expect_record_update(&mut swarm2, swarm1_peer_id).await; - expect_record_update(&mut swarm2, swarm1_peer_id).await; + expect_record_update(&mut swarm2, swarm1_peer_id, None).await; + expect_record_update(&mut swarm2, swarm1_peer_id, None).await; + expect_record_update(&mut swarm2, swarm1_peer_id, None).await; // The address in store won't contain peer ID because it is from Identify. - assert!(swarm2 + let known_listen_addresses = swarm2 .behaviour() .peer_store .address_of_peer(&swarm1_peer_id) .expect("peer to exist") - .any(|addr| *addr == new_listen_addr)); - }) - } - - fn spawn_wait_conn_established(mut swarm: Swarm) -> tokio::task::JoinHandle> - where - T: NetworkBehaviour + Send + Sync, - Swarm: SwarmExt, - { - tokio::spawn(async move { - swarm - .wait(|ev| match ev { - SwarmEvent::ConnectionEstablished { .. } => Some(()), - _ => None, - }) - .await; - swarm + .collect::>(); + assert!(known_listen_addresses.contains(&&new_listen_addr)); + assert_eq!(known_listen_addresses.len(), 4) }) } } From 36bc3593ded391993f907fe033a6ecc63677cf7d Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Tue, 29 Apr 2025 00:26:12 +0200 Subject: [PATCH 5/8] make clippy happy --- misc/peer-store/src/memory_store.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index ab0cb8aa634..019e0473c8e 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -463,7 +463,7 @@ mod test { let swarm2_peer_id = *swarm2.local_peer_id(); swarm2.connect(&mut swarm1).await; - listen_addr.push(Protocol::P2p(swarm1_peer_id.into())); + listen_addr.push(Protocol::P2p(swarm1_peer_id)); expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await; assert!(swarm2 .behaviour() @@ -491,7 +491,7 @@ mod test { _ => None, }) .await; - new_listen_addr.push(Protocol::P2p(swarm1_peer_id.into())); + new_listen_addr.push(Protocol::P2p(swarm1_peer_id)); expect_record_update(&mut swarm2, swarm1_peer_id, Some(&new_listen_addr)).await; // The address in store will contain peer ID. let new_listen_addr = new_listen_addr @@ -554,7 +554,7 @@ mod test { let swarm2_peer_id = *swarm2.local_peer_id(); swarm2.connect(&mut swarm1).await; - listen_addr.push(Protocol::P2p(swarm1_peer_id.into())); + listen_addr.push(Protocol::P2p(swarm1_peer_id)); expect_record_update(&mut swarm2, swarm1_peer_id, Some(&listen_addr)).await; assert_eq!( From 97203c6aa54c4b95e2b176e384a63c7f9995c7ce Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Tue, 29 Apr 2025 13:30:23 +0200 Subject: [PATCH 6/8] peer-store/memory-store: extend docs --- misc/peer-store/src/memory_store.rs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 019e0473c8e..6fbca180db4 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -20,13 +20,23 @@ use lru::LruCache; use super::Store; -/// Event from the store and emitted to [`Swarm`](libp2p_swarm::Swarm). +/// Event emitted from the [`MemoryStore`] to the [`Swarm`](libp2p_swarm::Swarm). #[derive(Debug, Clone)] pub enum Event { /// A new peer address has been added to the store. - PeerAddressAdded { peer_id: PeerId, address: Multiaddr }, + PeerAddressAdded { + /// ID of the peer for which the address was added. + peer_id: PeerId, + /// The added address. + address: Multiaddr, + }, /// A peer address has been removed from the store. - PeerAddressRemoved { peer_id: PeerId, address: Multiaddr }, + PeerAddressRemoved { + /// ID of the peer for which the address was removed. + peer_id: PeerId, + /// The removed address. + address: Multiaddr, + }, } /// A in-memory store that uses LRU cache for bounded storage of addresses @@ -66,7 +76,7 @@ impl MemoryStore { self.add_address_inner(peer, address, true) } - /// Update an address record without notifying swarm. + /// Update an address record and notify the swarm. /// /// Returns `true` if the address is new. fn add_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool { @@ -91,7 +101,7 @@ impl MemoryStore { self.remove_address_inner(peer, address, true) } - /// Remove an address record without notifying swarm. + /// Remove an address record and notify the swarm. /// /// Returns `true` when the address is removed, `false` if the address didn't exist /// or the address is permanent and `force` false. @@ -111,6 +121,7 @@ impl MemoryStore { false } + /// Get a reference to a peer's custom data. pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> { self.records.get(peer).and_then(|r| r.get_custom_data()) } @@ -149,7 +160,9 @@ impl MemoryStore { self.records.iter() } - /// Iterate over all internal records mutably. + /// Iterate over all internal records mutably. + /// + /// Changes to the records will not generate an event. pub fn record_iter_mut(&mut self) -> impl Iterator)> { self.records.iter_mut() } From 67abd879054e734bb97371b31880ef5aa6c39044 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Tue, 29 Apr 2025 13:34:02 +0200 Subject: [PATCH 7/8] peer-store/memory-store: add `is_permanent` to `Event::PeerAddressAdded` --- misc/peer-store/src/memory_store.rs | 42 +++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index 6fbca180db4..be1d3ce172b 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -29,6 +29,14 @@ pub enum Event { peer_id: PeerId, /// The added address. address: Multiaddr, + /// Whether the address will be kept in the store after a dial-failure. + /// + /// Set to `true` when an address was added explicitly through + /// [`MemoryStore::add_address`], `false` if the address was discovered through the + /// swarm or other behaviors. + /// + /// Only relevant when [`Config::is_remove_addr_on_dial_error`] is `true`. + is_permanent: bool, }, /// A peer address has been removed from the store. PeerAddressRemoved { @@ -79,16 +87,22 @@ impl MemoryStore { /// Update an address record and notify the swarm. /// /// Returns `true` if the address is new. - fn add_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, permanent: bool) -> bool { + fn add_address_inner( + &mut self, + peer: &PeerId, + address: &Multiaddr, + is_permanent: bool, + ) -> bool { let record = self .records .entry(*peer) .or_insert(PeerRecord::new(self.config.record_capacity)); - let is_new = record.add_address(address, permanent); + let is_new = record.add_address(address, is_permanent); if is_new { self.push_event_and_wake(Event::PeerAddressAdded { peer_id: *peer, address: address.clone(), + is_permanent, }); } is_new @@ -318,14 +332,16 @@ impl PeerRecord { /// insert it to the front if not. /// /// Returns true when the address is new. - pub fn add_address(&mut self, address: &Multiaddr, permanent: bool) -> bool { + pub fn add_address(&mut self, address: &Multiaddr, is_permanent: bool) -> bool { if let Some(was_permanent) = self.addresses.get(address) { - if !*was_permanent && permanent { - self.addresses.get_or_insert(address.clone(), || permanent); + if !*was_permanent && is_permanent { + self.addresses + .get_or_insert(address.clone(), || is_permanent); } return false; } - self.addresses.get_or_insert(address.clone(), || permanent); + self.addresses + .get_or_insert(address.clone(), || is_permanent); true } @@ -449,9 +465,14 @@ mod test { expected_address: Option<&Multiaddr>, ) { match swarm.next_behaviour_event().await { - Event::PeerAddressAdded { peer_id, address } => { + Event::PeerAddressAdded { + peer_id, + address, + is_permanent, + } => { assert_eq!(peer_id, expected_peer); assert!(expected_address.is_none_or(|a| *a == address)); + assert!(!is_permanent) } ev => panic!("Unexpected event {:?}.", ev), } @@ -535,9 +556,14 @@ mod test { ) { loop { match swarm.next_behaviour_event().await { - BehaviourEvent::PeerStore(Event::PeerAddressAdded { peer_id, address }) => { + BehaviourEvent::PeerStore(Event::PeerAddressAdded { + peer_id, + address, + is_permanent, + }) => { assert_eq!(peer_id, expected_peer); assert!(expected_address.is_none_or(|a| *a == address)); + assert!(!is_permanent); break; } ev @ BehaviourEvent::PeerStore(_) => panic!("Unexpected event {:?}.", ev), From a2d8f73741b13b7b12d276a7b1af2e8093288e65 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Wed, 30 Apr 2025 13:27:50 +0200 Subject: [PATCH 8/8] peer-store/memory-store: revert accidental change --- misc/peer-store/src/memory_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs index be1d3ce172b..946b0a3aa4f 100644 --- a/misc/peer-store/src/memory_store.rs +++ b/misc/peer-store/src/memory_store.rs @@ -122,7 +122,7 @@ impl MemoryStore { fn remove_address_inner(&mut self, peer: &PeerId, address: &Multiaddr, force: bool) -> bool { if let Some(record) = self.records.get_mut(peer) { if record.remove_address(address, force) { - if record.addresses.is_empty() { + if record.addresses.is_empty() && record.custom_data.is_none() { self.records.remove(peer); } self.push_event_and_wake(Event::PeerAddressRemoved {