From 6dc22b1891d318a47e7541834e90c9de1940331a Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Thu, 20 Mar 2025 10:16:45 +0200 Subject: [PATCH 1/3] feat(request-response): add `Behaviour::send_request_with_addresses()` --- protocols/request-response/CHANGELOG.md | 3 + protocols/request-response/src/lib.rs | 24 ++++- protocols/request-response/tests/ping.rs | 125 ++++++++++++++++++++++- 3 files changed, 145 insertions(+), 7 deletions(-) diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 54ed5eb3c50..8d978e88104 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -3,6 +3,9 @@ - fix: public cbor/json codec module See [PR 5830](https://github.com/libp2p/rust-libp2p/pull/5830). +- feat: add `Behaviour::send_request_with_addresses()` + See [PR 5938](https://github.com/libp2p/rust-libp2p/issues/5938). + ## 0.28.0 - Deprecate `void` crate. diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 7fba089b04e..b9cd23970ae 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -431,9 +431,24 @@ where /// > in another `NetworkBehaviour` that provides peer and /// > address discovery, or known addresses of peers must be /// > managed via [`libp2p_swarm::Swarm::add_peer_address`]. - /// > Addresses are automatically removed when dial attempts - /// > to them fail. + /// > Alternatively, [`Behaviour::send_request_with_addresses`] + /// > can be used. Addresses are automatically removed when + /// > dial attempts to them fail. pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { + self.send_request_with_addresses(peer, request, Vec::new()) + } + + /// Initiates sending a request, using provided addresses if a connection needs to be + /// established. + /// + /// This is very similar to [`Behaviour::send_request`], but uses provided addresses when + /// dialing currently not connected peer. + pub fn send_request_with_addresses( + &mut self, + peer: &PeerId, + request: TCodec::Request, + addresses: Vec, + ) -> OutboundRequestId { let request_id = self.next_outbound_request_id(); let request = OutboundMessage { request_id, @@ -443,7 +458,10 @@ where if let Some(request) = self.try_send_request(peer, request) { self.pending_events.push_back(ToSwarm::Dial { - opts: DialOpts::peer_id(*peer).build(), + opts: DialOpts::peer_id(*peer) + .addresses(addresses) + .extend_addresses_through_behaviour() + .build(), }); self.pending_outbound_requests .entry(*peer) diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 94adedac2d7..1abd08023b3 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -43,10 +43,7 @@ async fn is_response_outbound() { let mut swarm1 = Swarm::new_ephemeral(|_| { request_response::cbor::Behaviour::::new( - [( - StreamProtocol::new("/ping/1"), - request_response::ProtocolSupport::Full, - )], + [(StreamProtocol::new("/ping/1"), ProtocolSupport::Full)], request_response::Config::default(), ) }); @@ -180,6 +177,126 @@ async fn ping_protocol() { peer2.await; } +/// Exercises a simple ping protocol where peers are not connected prior to request sending. +#[async_std::test] +#[cfg(feature = "cbor")] +async fn ping_protocol_explicit_address() { + let ping = Ping("ping".to_string().into_bytes()); + let pong = Pong("pong".to_string().into_bytes()); + + let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full)); + let cfg = request_response::Config::default(); + + let mut swarm1 = Swarm::new_ephemeral(|_| { + request_response::cbor::Behaviour::::new(protocols.clone(), cfg.clone()) + }); + let peer1_id = *swarm1.local_peer_id(); + let mut swarm2 = Swarm::new_ephemeral(|_| { + request_response::cbor::Behaviour::::new(protocols, cfg) + }); + let peer2_id = *swarm2.local_peer_id(); + + let (peer1_listen_addr, _) = swarm1.listen().with_memory_addr_external().await; + + let expected_ping = ping.clone(); + let expected_pong = pong.clone(); + + let peer1 = async move { + loop { + match swarm1.next_swarm_event().await.try_into_behaviour_event() { + Ok(request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request, channel, .. + }, + .. + }) => { + assert_eq!(&request, &expected_ping); + assert_eq!(&peer, &peer2_id); + swarm1 + .behaviour_mut() + .send_response(channel, pong.clone()) + .unwrap(); + } + Ok(request_response::Event::ResponseSent { peer, .. }) => { + assert_eq!(&peer, &peer2_id); + } + Ok(e) => { + panic!("Peer1: Unexpected event: {e:?}") + } + Err(..) => {} + } + } + }; + + let peer2 = async { + let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone()); + assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); + + // Can't dial to unknown peer + match swarm2 + .next_swarm_event() + .await + .try_into_behaviour_event() + .unwrap() + { + request_response::Event::OutboundFailure { + peer, request_id, .. + } => { + assert_eq!(&peer, &peer1_id); + assert_eq!(req_id, request_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + + let req_id = swarm2.behaviour_mut().send_request_with_addresses( + &peer1_id, + ping.clone(), + vec![peer1_listen_addr], + ); + assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id)); + + // Dial to peer with explicit address succeeds + match swarm2.select_next_some().await { + SwarmEvent::Dialing { peer_id, .. } => { + assert_eq!(&peer_id, &Some(peer1_id)); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + match swarm2.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + assert_eq!(&peer_id, &peer1_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + match swarm2 + .next_swarm_event() + .await + .try_into_behaviour_event() + .unwrap() + { + request_response::Event::Message { + peer, + message: + request_response::Message::Response { + request_id, + response, + }, + .. + } => { + assert_eq!(&response, &expected_pong); + assert_eq!(&peer, &peer1_id); + assert_eq!(req_id, request_id); + } + e => panic!("Peer2: Unexpected event: {e:?}"), + } + }; + + async_std::task::spawn(Box::pin(peer1)); + peer2.await; +} + #[async_std::test] #[cfg(feature = "cbor")] async fn emits_inbound_connection_closed_failure() { From e4f38c86280d9e4cb889c39510e70385f799a23b Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 15 Apr 2025 14:37:55 +0300 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Elena Frank --- protocols/request-response/src/lib.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index b9cd23970ae..bf925504407 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -431,18 +431,16 @@ where /// > in another `NetworkBehaviour` that provides peer and /// > address discovery, or known addresses of peers must be /// > managed via [`libp2p_swarm::Swarm::add_peer_address`]. + /// > Addresses are automatically removed when dial attempts + /// > to them fail. /// > Alternatively, [`Behaviour::send_request_with_addresses`] - /// > can be used. Addresses are automatically removed when - /// > dial attempts to them fail. + /// > can be used. pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { self.send_request_with_addresses(peer, request, Vec::new()) } - /// Initiates sending a request, using provided addresses if a connection needs to be - /// established. - /// - /// This is very similar to [`Behaviour::send_request`], but uses provided addresses when - /// dialing currently not connected peer. + /// Like [`Behaviour::send_request`], but additionally using the provided addresses + /// if a connection needs to be established. pub fn send_request_with_addresses( &mut self, peer: &PeerId, From 1115a57bc20a91499a700d706a5fb81b62a4fdd1 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 15 Apr 2025 14:40:25 +0300 Subject: [PATCH 3/3] Fix formatting --- protocols/request-response/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index bf925504407..0e9afdc7ae0 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -431,15 +431,15 @@ where /// > in another `NetworkBehaviour` that provides peer and /// > address discovery, or known addresses of peers must be /// > managed via [`libp2p_swarm::Swarm::add_peer_address`]. - /// > Addresses are automatically removed when dial attempts + /// > Addresses are automatically removed when dial attempts /// > to them fail. /// > Alternatively, [`Behaviour::send_request_with_addresses`] - /// > can be used. + /// > can be used. pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { self.send_request_with_addresses(peer, request, Vec::new()) } - /// Like [`Behaviour::send_request`], but additionally using the provided addresses + /// Like [`Behaviour::send_request`], but additionally using the provided addresses /// if a connection needs to be established. pub fn send_request_with_addresses( &mut self,