diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index ad06aefa..ea2d14a3 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,11 +1,15 @@ ## 0.40.0 - unreleased +- Allow `ListenerId` to be user-controlled, i.e. to be provided on `Transport::listen_on`. + See [PR 3567]. + - Raise MSRV to 1.65. See [PR 3715]. - Remove deprecated symbols related to upgrades. See [PR 3867]. +[PR 3567]: https://github.com/libp2p/rust-libp2p/pull/3567 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3867]: https://github.com/libp2p/rust-libp2p/pull/3867 diff --git a/core/src/either.rs b/core/src/either.rs index 32e09ca6..3f79b2b3 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -154,14 +154,18 @@ where } } - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { use TransportError::*; match self { - Either::Left(a) => a.listen_on(addr).map_err(|e| match e { + Either::Left(a) => a.listen_on(id, addr).map_err(|e| match e { MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr), Other(err) => Other(Either::Left(err)), }), - Either::Right(b) => b.listen_on(addr).map_err(|e| match e { + Either::Right(b) => b.listen_on(id, addr).map_err(|e| match e { MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr), Other(err) => Other(Either::Right(err)), }), diff --git a/core/src/transport.rs b/core/src/transport.rs index ca1796b3..c9861297 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -31,6 +31,7 @@ use std::{ error::Error, fmt, pin::Pin, + sync::atomic::{AtomicUsize, Ordering}, task::{Context, Poll}, }; @@ -55,6 +56,8 @@ pub use self::memory::MemoryTransport; pub use self::optional::OptionalTransport; pub use self::upgrade::Upgrade; +static NEXT_LISTENER_ID: AtomicUsize = AtomicUsize::new(1); + /// A transport provides connection-oriented communication between two peers /// through ordered streams of data (i.e. connections). /// @@ -109,8 +112,12 @@ pub trait Transport { /// obtained from [dialing](Transport::dial). type Dial: Future>; - /// Listens on the given [`Multiaddr`] for inbound connections. - fn listen_on(&mut self, addr: Multiaddr) -> Result>; + /// Listens on the given [`Multiaddr`] for inbound connections with a provided [`ListenerId`]. + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError>; /// Remove a listener. /// @@ -241,18 +248,25 @@ pub trait Transport { /// The ID of a single listener. #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub struct ListenerId(u64); +pub struct ListenerId(usize); impl ListenerId { + #[deprecated(note = "Renamed to ` ListenerId::next`.")] + #[allow(clippy::new_without_default)] /// Creates a new `ListenerId`. pub fn new() -> Self { - ListenerId(rand::random()) + ListenerId::next() } -} -impl Default for ListenerId { - fn default() -> Self { - Self::new() + /// Creates a new `ListenerId`. + pub fn next() -> Self { + ListenerId(NEXT_LISTENER_ID.fetch_add(1, Ordering::SeqCst)) + } + + #[deprecated(note = "Use ` ListenerId::next` instead.")] + #[allow(clippy::should_implement_trait)] + pub fn default() -> Self { + Self::next() } } diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index fb528056..6e0c7e32 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -54,9 +54,13 @@ where type ListenerUpgrade = AndThenFuture; type Dial = AndThenFuture; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { self.transport - .listen_on(addr) + .listen_on(id, addr) .map_err(|err| err.map(Either::Left)) } diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index a55e4db8..8274d557 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -52,7 +52,11 @@ type Dial = Pin> + Send>>; type ListenerUpgrade = Pin> + Send>>; trait Abstract { - fn listen_on(&mut self, addr: Multiaddr) -> Result>; + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError>; fn remove_listener(&mut self, id: ListenerId) -> bool; fn dial(&mut self, addr: Multiaddr) -> Result, TransportError>; fn dial_as_listener(&mut self, addr: Multiaddr) -> Result, TransportError>; @@ -70,8 +74,12 @@ where T::Dial: Send + 'static, T::ListenerUpgrade: Send + 'static, { - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - Transport::listen_on(self, addr).map_err(|e| e.map(box_err)) + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + Transport::listen_on(self, id, addr).map_err(|e| e.map(box_err)) } fn remove_listener(&mut self, id: ListenerId) -> bool { @@ -123,8 +131,12 @@ impl Transport for Boxed { type ListenerUpgrade = ListenerUpgrade; type Dial = Dial; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.inner.listen_on(addr) + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + self.inner.listen_on(id, addr) } fn remove_listener(&mut self, id: ListenerId) -> bool { diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index bb7d542d..b7eaacab 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -46,13 +46,17 @@ where type ListenerUpgrade = EitherFuture; type Dial = EitherFuture; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - let addr = match self.0.listen_on(addr) { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + let addr = match self.0.listen_on(id, addr) { Err(TransportError::MultiaddrNotSupported(addr)) => addr, res => return res.map_err(|err| err.map(Either::Left)), }; - let addr = match self.1.listen_on(addr) { + let addr = match self.1.listen_on(id, addr) { Err(TransportError::MultiaddrNotSupported(addr)) => addr, res => return res.map_err(|err| err.map(Either::Right)), }; diff --git a/core/src/transport/dummy.rs b/core/src/transport/dummy.rs index a7d1cab9..951d1039 100644 --- a/core/src/transport/dummy.rs +++ b/core/src/transport/dummy.rs @@ -59,7 +59,11 @@ impl Transport for DummyTransport { type ListenerUpgrade = futures::future::Pending>; type Dial = futures::future::Pending>; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + _id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { Err(TransportError::MultiaddrNotSupported(addr)) } diff --git a/core/src/transport/global_only.rs b/core/src/transport/global_only.rs index b0a12de0..4f1fe8ab 100644 --- a/core/src/transport/global_only.rs +++ b/core/src/transport/global_only.rs @@ -276,8 +276,12 @@ impl crate::Transport for Transport { type ListenerUpgrade = ::ListenerUpgrade; type Dial = ::Dial; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.inner.listen_on(addr) + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + self.inner.listen_on(id, addr) } fn remove_listener(&mut self, id: ListenerId) -> bool { diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index 50f7b826..553f3e63 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -61,8 +61,12 @@ where type ListenerUpgrade = MapFuture; type Dial = MapFuture; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.transport.listen_on(addr) + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + self.transport.listen_on(id, addr) } fn remove_listener(&mut self, id: ListenerId) -> bool { diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index 99f29124..56e1ebf2 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -50,9 +50,15 @@ where type ListenerUpgrade = MapErrListenerUpgrade; type Dial = MapErrDial; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { let map = self.map.clone(); - self.transport.listen_on(addr).map_err(|err| err.map(map)) + self.transport + .listen_on(id, addr) + .map_err(|err| err.map(map)) } fn remove_listener(&mut self, id: ListenerId) -> bool { diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 7e079d07..4c30ee9b 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -179,7 +179,11 @@ impl Transport for MemoryTransport { type ListenerUpgrade = Ready>; type Dial = DialFuture; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { let port = if let Ok(port) = parse_memory_addr(&addr) { port } else { @@ -191,7 +195,6 @@ impl Transport for MemoryTransport { None => return Err(TransportError::Other(MemoryTransportError::Unreachable)), }; - let id = ListenerId::new(); let listener = Listener { id, port, @@ -201,7 +204,7 @@ impl Transport for MemoryTransport { }; self.listeners.push_back(Box::pin(listener)); - Ok(id) + Ok(()) } fn remove_listener(&mut self, id: ListenerId) -> bool { @@ -457,30 +460,40 @@ mod tests { let addr_1: Multiaddr = "/memory/1639174018481".parse().unwrap(); let addr_2: Multiaddr = "/memory/8459375923478".parse().unwrap(); - let listener_id_1 = transport.listen_on(addr_1.clone()).unwrap(); + let listener_id_1 = ListenerId::next(); + + transport.listen_on(listener_id_1, addr_1.clone()).unwrap(); assert!( transport.remove_listener(listener_id_1), "Listener doesn't exist." ); - let listener_id_2 = transport.listen_on(addr_1.clone()).unwrap(); - let listener_id_3 = transport.listen_on(addr_2.clone()).unwrap(); + let listener_id_2 = ListenerId::next(); + transport.listen_on(listener_id_2, addr_1.clone()).unwrap(); + let listener_id_3 = ListenerId::next(); + transport.listen_on(listener_id_3, addr_2.clone()).unwrap(); - assert!(transport.listen_on(addr_1.clone()).is_err()); - assert!(transport.listen_on(addr_2.clone()).is_err()); + assert!(transport + .listen_on(ListenerId::next(), addr_1.clone()) + .is_err()); + assert!(transport + .listen_on(ListenerId::next(), addr_2.clone()) + .is_err()); assert!( transport.remove_listener(listener_id_2), "Listener doesn't exist." ); - assert!(transport.listen_on(addr_1).is_ok()); - assert!(transport.listen_on(addr_2.clone()).is_err()); + assert!(transport.listen_on(ListenerId::next(), addr_1).is_ok()); + assert!(transport + .listen_on(ListenerId::next(), addr_2.clone()) + .is_err()); assert!( transport.remove_listener(listener_id_3), "Listener doesn't exist." ); - assert!(transport.listen_on(addr_2).is_ok()); + assert!(transport.listen_on(ListenerId::next(), addr_2).is_ok()); } #[test] @@ -489,8 +502,11 @@ mod tests { assert!(transport .dial("/memory/810172461024613".parse().unwrap()) .is_err()); - let _listener = transport - .listen_on("/memory/810172461024613".parse().unwrap()) + transport + .listen_on( + ListenerId::next(), + "/memory/810172461024613".parse().unwrap(), + ) .unwrap(); assert!(transport .dial("/memory/810172461024613".parse().unwrap()) @@ -504,7 +520,8 @@ mod tests { let mut transport = MemoryTransport::default().boxed(); futures::executor::block_on(async { - let listener_id = transport.listen_on(addr.clone()).unwrap(); + let listener_id = ListenerId::next(); + transport.listen_on(listener_id, addr.clone()).unwrap(); let reported_addr = transport .select_next_some() .await @@ -539,7 +556,7 @@ mod tests { let mut t1 = MemoryTransport::default().boxed(); let listener = async move { - t1.listen_on(t1_addr.clone()).unwrap(); + t1.listen_on(ListenerId::next(), t1_addr.clone()).unwrap(); let upgrade = loop { let event = t1.select_next_some().await; if let Some(upgrade) = event.into_incoming() { @@ -577,7 +594,9 @@ mod tests { let mut listener_transport = MemoryTransport::default().boxed(); let listener = async move { - listener_transport.listen_on(listener_addr.clone()).unwrap(); + listener_transport + .listen_on(ListenerId::next(), listener_addr.clone()) + .unwrap(); loop { if let TransportEvent::Incoming { send_back_addr, .. } = listener_transport.select_next_some().await @@ -614,7 +633,9 @@ mod tests { let mut listener_transport = MemoryTransport::default().boxed(); let listener = async move { - listener_transport.listen_on(listener_addr.clone()).unwrap(); + listener_transport + .listen_on(ListenerId::next(), listener_addr.clone()) + .unwrap(); loop { if let TransportEvent::Incoming { send_back_addr, .. } = listener_transport.select_next_some().await diff --git a/core/src/transport/optional.rs b/core/src/transport/optional.rs index 2d930776..839f55a4 100644 --- a/core/src/transport/optional.rs +++ b/core/src/transport/optional.rs @@ -60,9 +60,13 @@ where type ListenerUpgrade = T::ListenerUpgrade; type Dial = T::Dial; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { if let Some(inner) = self.0.as_mut() { - inner.listen_on(addr) + inner.listen_on(id, addr) } else { Err(TransportError::MultiaddrNotSupported(addr)) } diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index c796e6f0..0e8ab3f5 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -85,9 +85,13 @@ where type ListenerUpgrade = Timeout; type Dial = Timeout; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { self.inner - .listen_on(addr) + .listen_on(id, addr) .map_err(|err| err.map(TransportTimeoutError::Other)) } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 9f6998d9..201918f2 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -350,8 +350,12 @@ where self.0.dial_as_listener(addr) } - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.0.listen_on(addr) + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + self.0.listen_on(id, addr) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { @@ -429,9 +433,13 @@ where }) } - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { self.inner - .listen_on(addr) + .listen_on(id, addr) .map_err(|err| err.map(TransportUpgradeError::Transport)) } diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index ac724a64..193ee73c 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p_core::transport::{MemoryTransport, Transport}; +use libp2p_core::transport::{ListenerId, MemoryTransport, Transport}; use libp2p_core::upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_identity as identity; use libp2p_mplex::MplexConfig; @@ -102,7 +102,9 @@ fn upgrade_pipeline() { let listen_addr1 = Multiaddr::from(Protocol::Memory(random::())); let listen_addr2 = listen_addr1.clone(); - listener_transport.listen_on(listen_addr1).unwrap(); + listener_transport + .listen_on(ListenerId::next(), listen_addr1) + .unwrap(); let server = async move { loop { diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index 56bd934f..dfdc619a 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -27,6 +27,7 @@ use futures::future::poll_fn; use futures::prelude::*; use futures::{channel::oneshot, future::join}; use libp2p_core::muxing::StreamMuxerExt; +use libp2p_core::transport::ListenerId; use libp2p_core::{multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, Transport}; use libp2p_identity as identity; use libp2p_identity::PeerId; @@ -100,7 +101,9 @@ fn run( payload: &Vec, listen_addr: &Multiaddr, ) { - receiver_trans.listen_on(listen_addr.clone()).unwrap(); + receiver_trans + .listen_on(ListenerId::next(), listen_addr.clone()) + .unwrap(); let (addr_sender, addr_receiver) = oneshot::channel(); let mut addr_sender = Some(addr_sender); let payload_len = payload.len(); diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 34f81652..59e583a8 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -88,7 +88,7 @@ mod tests { use futures::StreamExt; use libp2p_core::{ multiaddr::multiaddr, - transport::{memory::MemoryTransport, Transport}, + transport::{memory::MemoryTransport, ListenerId, Transport}, }; use rand::{thread_rng, Rng}; use std::time::Duration; @@ -97,7 +97,7 @@ mod tests { fn ping_pong() { let mem_addr = multiaddr![Memory(thread_rng().gen::())]; let mut transport = MemoryTransport::new().boxed(); - transport.listen_on(mem_addr).unwrap(); + transport.listen_on(ListenerId::next(), mem_addr).unwrap(); let listener_addr = transport .select_next_some() diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index 6dceefd8..23e0e34e 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -70,7 +70,7 @@ use thiserror::Error; /// 3. Listen for incoming relayed connections via specific relay. /// /// ``` -/// # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, Transport, PeerId}; +/// # use libp2p_core::{Multiaddr, multiaddr::{Protocol}, transport::ListenerId, Transport, PeerId}; /// # use libp2p_core::transport::memory::MemoryTransport; /// # use libp2p_core::transport::choice::OrTransport; /// # use libp2p_relay as relay; @@ -85,7 +85,7 @@ use thiserror::Error; /// .with(Protocol::Memory(40)) // Relay address. /// .with(Protocol::P2p(relay_id.into())) // Relay peer id. /// .with(Protocol::P2pCircuit); // Signal to listen via remote relay node. -/// transport.listen_on(relay_addr).unwrap(); +/// transport.listen_on(ListenerId::next(), relay_addr).unwrap(); /// ``` pub struct Transport { to_behaviour: mpsc::Sender, @@ -111,7 +111,11 @@ impl libp2p_core::Transport for Transport { type ListenerUpgrade = Ready>; type Dial = BoxFuture<'static, Result>; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + listener_id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? { RelayedMultiaddr { relay_peer_id: None, @@ -138,7 +142,6 @@ impl libp2p_core::Transport for Transport { to_listener, }); - let listener_id = ListenerId::new(); let listener = Listener { listener_id, queued_events: Default::default(), @@ -146,7 +149,7 @@ impl libp2p_core::Transport for Transport { is_closed: false, }; self.listeners.push(listener); - Ok(listener_id) + Ok(()) } fn remove_listener(&mut self, id: ListenerId) -> bool { diff --git a/swarm/src/behaviour/listen_addresses.rs b/swarm/src/behaviour/listen_addresses.rs index 2a8adb8f..8882db64 100644 --- a/swarm/src/behaviour/listen_addresses.rs +++ b/swarm/src/behaviour/listen_addresses.rs @@ -34,7 +34,7 @@ impl ListenAddresses { mod tests { use super::*; use crate::dummy; - use libp2p_core::multiaddr::Protocol; + use libp2p_core::{multiaddr::Protocol, transport::ListenerId}; use once_cell::sync::Lazy; #[test] @@ -62,14 +62,14 @@ mod tests { fn new_listen_addr() -> FromSwarm<'static, dummy::ConnectionHandler> { FromSwarm::NewListenAddr(NewListenAddr { - listener_id: Default::default(), + listener_id: ListenerId::next(), addr: &MEMORY_ADDR, }) } fn expired_listen_addr() -> FromSwarm<'static, dummy::ConnectionHandler> { FromSwarm::ExpiredListenAddr(ExpiredListenAddr { - listener_id: Default::default(), + listener_id: ListenerId::next(), addr: &MEMORY_ADDR, }) } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 0796a26f..3360a0bf 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -474,7 +474,8 @@ where /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`]. /// Depending on the underlying transport, one listener may have multiple listening addresses. pub fn listen_on(&mut self, addr: Multiaddr) -> Result> { - let id = self.transport.listen_on(addr)?; + let id = ListenerId::next(); + self.transport.listen_on(id, addr)?; self.behaviour .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener { listener_id: id, @@ -2222,7 +2223,9 @@ mod tests { let mut transports = Vec::new(); for _ in 0..num_listen_addrs { let mut transport = transport::MemoryTransport::default().boxed(); - transport.listen_on("/memory/0".parse().unwrap()).unwrap(); + transport + .listen_on(ListenerId::next(), "/memory/0".parse().unwrap()) + .unwrap(); match transport.select_next_some().await { TransportEvent::NewAddress { listen_addr, .. } => { diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 7de86ba3..c39b9ad0 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -198,10 +198,14 @@ where BoxFuture<'static, Result>, >; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { self.inner .lock() - .listen_on(addr) + .listen_on(id, addr) .map_err(|e| e.map(DnsErr::Transport)) } @@ -599,8 +603,9 @@ mod tests { fn listen_on( &mut self, + _: ListenerId, _: Multiaddr, - ) -> Result> { + ) -> Result<(), TransportError> { unreachable!() } diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index af6dd187..594ba0b6 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -32,7 +32,7 @@ //! # fn main() -> std::io::Result<()> { //! # //! use libp2p_quic as quic; -//! use libp2p_core::{Multiaddr, Transport}; +//! use libp2p_core::{Multiaddr, Transport, transport::ListenerId}; //! //! let keypair = libp2p_identity::Keypair::generate_ed25519(); //! let quic_config = quic::Config::new(&keypair); @@ -40,7 +40,7 @@ //! let mut quic_transport = quic::async_std::Transport::new(quic_config); //! //! let addr = "/ip4/127.0.0.1/udp/12345/quic-v1".parse().expect("address should be valid"); -//! quic_transport.listen_on(addr).expect("listen error."); +//! quic_transport.listen_on(ListenerId::next(), addr).expect("listen error."); //! # //! # Ok(()) //! # } diff --git a/transports/quic/src/transport.rs b/transports/quic/src/transport.rs index d68eb7f1..668034ed 100644 --- a/transports/quic/src/transport.rs +++ b/transports/quic/src/transport.rs @@ -98,10 +98,13 @@ impl Transport for GenTransport

{ type ListenerUpgrade = Connecting; type Dial = BoxFuture<'static, Result>; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + listener_id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { let (socket_addr, version) = multiaddr_to_socketaddr(&addr, self.support_draft_29) .ok_or(TransportError::MultiaddrNotSupported(addr))?; - let listener_id = ListenerId::new(); let listener = Listener::new( listener_id, socket_addr, @@ -120,7 +123,7 @@ impl Transport for GenTransport

{ // New outbound connections will use the bidirectional (listener) endpoint. self.dialer.remove(&socket_addr.ip().into()); - Ok(listener_id) + Ok(()) } fn remove_listener(&mut self, id: ListenerId) -> bool { @@ -772,8 +775,9 @@ mod test { // Run test twice to check that there is no unexpected behaviour if `Transport.listener` // is temporarily empty. for _ in 0..2 { - let id = transport - .listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap()) + let id = ListenerId::next(); + transport + .listen_on(id, "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap()) .unwrap(); // Copy channel to use it later. @@ -866,8 +870,11 @@ mod test { .await; // Start listening so that the dialer and driver are dropped. - let _ = transport - .listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap()) + transport + .listen_on( + ListenerId::next(), + "/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(), + ) .unwrap(); assert!(!transport.dialer.contains_key(&SocketFamily::Ipv4)); diff --git a/transports/quic/tests/smoke.rs b/transports/quic/tests/smoke.rs index a576d3c9..93bb78e2 100644 --- a/transports/quic/tests/smoke.rs +++ b/transports/quic/tests/smoke.rs @@ -115,9 +115,10 @@ async fn wrapped_with_delay() { fn listen_on( &mut self, + id: ListenerId, addr: Multiaddr, - ) -> Result> { - self.0.lock().unwrap().listen_on(addr) + ) -> Result<(), TransportError> { + self.0.lock().unwrap().listen_on(id, addr) } fn remove_listener(&mut self, id: ListenerId) -> bool { @@ -327,7 +328,10 @@ async fn draft_29_support() { let (_, mut d_transport) = create_transport::(|cfg| cfg.support_draft_29 = false); assert!(matches!( - d_transport.listen_on("/ip4/127.0.0.1/udp/0/quic".parse().unwrap()), + d_transport.listen_on( + ListenerId::next(), + "/ip4/127.0.0.1/udp/0/quic".parse().unwrap() + ), Err(TransportError::MultiaddrNotSupported(_)) )); let d_quic_v1_addr = start_listening(&mut d_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await; @@ -509,7 +513,9 @@ fn create_transport( } async fn start_listening(transport: &mut Boxed<(PeerId, StreamMuxerBox)>, addr: &str) -> Multiaddr { - transport.listen_on(addr.parse().unwrap()).unwrap(); + transport + .listen_on(ListenerId::next(), addr.parse().unwrap()) + .unwrap(); match transport.next().await { Some(TransportEvent::NewAddress { listen_addr, .. }) => listen_addr, e => panic!("{e:?}"), diff --git a/transports/quic/tests/stream_compliance.rs b/transports/quic/tests/stream_compliance.rs index ec4c3121..0eff0584 100644 --- a/transports/quic/tests/stream_compliance.rs +++ b/transports/quic/tests/stream_compliance.rs @@ -1,5 +1,6 @@ use futures::channel::oneshot; use futures::StreamExt; +use libp2p_core::transport::ListenerId; use libp2p_core::Transport; use libp2p_quic as quic; use std::time::Duration; @@ -23,7 +24,10 @@ async fn connected_peers() -> (quic::Connection, quic::Connection) { let mut listener = new_transport().boxed(); listener - .listen_on("/ip4/127.0.0.1/udp/0/quic-v1".parse().unwrap()) + .listen_on( + ListenerId::next(), + "/ip4/127.0.0.1/udp/0/quic-v1".parse().unwrap(), + ) .unwrap(); let listen_address = listener.next().await.unwrap().into_new_address().unwrap(); diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 78f6c3f4..e4576137 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -248,7 +248,7 @@ impl Config { /// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap(); /// /// let mut tcp1 = libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::new().port_reuse(true)).boxed(); - /// tcp1.listen_on( listen_addr1.clone()).expect("listener"); + /// tcp1.listen_on(ListenerId::next(), listen_addr1.clone()).expect("listener"); /// match tcp1.select_next_some().await { /// TransportEvent::NewAddress { listen_addr, .. } => { /// println!("Listening on {:?}", listen_addr); @@ -259,7 +259,7 @@ impl Config { /// } /// /// let mut tcp2 = libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::new().port_reuse(true)).boxed(); - /// tcp2.listen_on( listen_addr2).expect("listener"); + /// tcp2.listen_on(ListenerId::next(), listen_addr2).expect("listener"); /// match tcp2.select_next_some().await { /// TransportEvent::NewAddress { listen_addr, .. } => { /// println!("Listening on {:?}", listen_addr); @@ -437,19 +437,22 @@ where type Dial = Pin> + Send>>; type ListenerUpgrade = Ready>; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(addr.clone()) { sa } else { return Err(TransportError::MultiaddrNotSupported(addr)); }; - let id = ListenerId::new(); log::debug!("listening on {}", socket_addr); let listener = self .do_listen(id, socket_addr) .map_err(TransportError::Other)?; self.listeners.push(listener); - Ok(id) + Ok(()) } fn remove_listener(&mut self, id: ListenerId) -> bool { @@ -916,7 +919,7 @@ mod tests { async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { let mut tcp = Transport::::default().boxed(); - tcp.listen_on(addr).unwrap(); + tcp.listen_on(ListenerId::next(), addr).unwrap(); loop { match tcp.select_next_some().await { TransportEvent::NewAddress { listen_addr, .. } => { @@ -985,7 +988,7 @@ mod tests { async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { let mut tcp = Transport::::default().boxed(); - tcp.listen_on(addr).unwrap(); + tcp.listen_on(ListenerId::next(), addr).unwrap(); loop { match tcp.select_next_some().await { @@ -1058,7 +1061,7 @@ mod tests { port_reuse_rx: oneshot::Receiver>, ) { let mut tcp = Transport::::new(Config::new()).boxed(); - tcp.listen_on(addr).unwrap(); + tcp.listen_on(ListenerId::next(), addr).unwrap(); loop { match tcp.select_next_some().await { TransportEvent::NewAddress { listen_addr, .. } => { @@ -1093,7 +1096,7 @@ mod tests { ) { let dest_addr = ready_rx.next().await.unwrap(); let mut tcp = Transport::::new(Config::new().port_reuse(true)); - tcp.listen_on(addr).unwrap(); + tcp.listen_on(ListenerId::next(), addr).unwrap(); match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await { TransportEvent::NewAddress { .. } => { // Check that tcp and listener share the same port reuse SocketAddr @@ -1161,7 +1164,7 @@ mod tests { async fn listen_twice(addr: Multiaddr) { let mut tcp = Transport::::new(Config::new().port_reuse(true)); - tcp.listen_on(addr).unwrap(); + tcp.listen_on(ListenerId::next(), addr).unwrap(); match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await { TransportEvent::NewAddress { listen_addr: addr1, .. @@ -1176,7 +1179,7 @@ mod tests { assert_eq!(port_reuse_tcp, port_reuse_listener1); // Listen on the same address a second time. - tcp.listen_on(addr1.clone()).unwrap(); + tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap(); match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await { TransportEvent::NewAddress { listen_addr: addr2, .. @@ -1215,7 +1218,7 @@ mod tests { async fn listen(addr: Multiaddr) -> Multiaddr { let mut tcp = Transport::::default().boxed(); - tcp.listen_on(addr).unwrap(); + tcp.listen_on(ListenerId::next(), addr).unwrap(); tcp.select_next_some() .await .into_new_address() @@ -1252,13 +1255,13 @@ mod tests { #[cfg(feature = "async-io")] { let mut tcp = async_io::Transport::default(); - assert!(tcp.listen_on(addr.clone()).is_err()); + assert!(tcp.listen_on(ListenerId::next(), addr.clone()).is_err()); } #[cfg(feature = "tokio")] { let mut tcp = tokio::Transport::default(); - assert!(tcp.listen_on(addr).is_err()); + assert!(tcp.listen_on(ListenerId::next(), addr).is_err()); } } @@ -1320,8 +1323,8 @@ mod tests { async fn cycle_listeners() -> bool { let mut tcp = Transport::::default().boxed(); - let listener_id = tcp - .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + let listener_id = ListenerId::next(); + tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap()) .unwrap(); tcp.remove_listener(listener_id) } diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs index 590f109d..9f43ed23 100644 --- a/transports/tcp/src/provider/async_io.rs +++ b/transports/tcp/src/provider/async_io.rs @@ -32,14 +32,15 @@ use std::task::{Context, Poll}; /// /// ```rust /// # use libp2p_tcp as tcp; -/// # use libp2p_core::Transport; +/// # use libp2p_core::{Transport, transport::ListenerId}; /// # use futures::future; /// # use std::pin::Pin; /// # /// # #[async_std::main] /// # async fn main() { /// let mut transport = tcp::async_io::Transport::new(tcp::Config::default()); -/// let id = transport.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); +/// let id = ListenerId::next(); +/// transport.listen_on(id, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); /// /// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)).await.into_new_address().unwrap(); /// diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index e4b75c8d..b991c6bd 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -36,14 +36,14 @@ use std::task::{Context, Poll}; /// /// ```rust /// # use libp2p_tcp as tcp; -/// # use libp2p_core::Transport; +/// # use libp2p_core::{Transport, transport::ListenerId}; /// # use futures::future; /// # use std::pin::Pin; /// # /// # #[tokio::main] /// # async fn main() { /// let mut transport = tcp::tokio::Transport::new(tcp::Config::default()); -/// let id = transport.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); +/// let id = transport.listen_on(ListenerId::next(), "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); /// /// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)).await.into_new_address().unwrap(); /// diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index 5f3f9ab7..78a6b0f0 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -93,10 +93,10 @@ macro_rules! codegen { fn listen_on( &mut self, + id: ListenerId, addr: Multiaddr, - ) -> Result> { + ) -> Result<(), TransportError> { if let Ok(path) = multiaddr_to_path(&addr) { - let id = ListenerId::new(); let listener = $build_listener(path) .map_err(Err) .map_ok(move |listener| { @@ -138,7 +138,7 @@ macro_rules! codegen { .try_flatten_stream() .boxed(); self.listeners.push_back((id, listener)); - Ok(id) + Ok(()) } else { Err(TransportError::MultiaddrNotSupported(addr)) } @@ -260,6 +260,7 @@ mod tests { use futures::{channel::oneshot, prelude::*}; use libp2p_core::{ multiaddr::{Multiaddr, Protocol}, + transport::ListenerId, Transport, }; use std::{self, borrow::Cow, path::Path}; @@ -292,7 +293,7 @@ mod tests { async_std::task::spawn(async move { let mut transport = UdsConfig::new().boxed(); - transport.listen_on(addr).unwrap(); + transport.listen_on(ListenerId::next(), addr).unwrap(); let listen_addr = transport .select_next_some() @@ -328,7 +329,7 @@ mod tests { let mut uds = UdsConfig::new(); let addr = "/unix//foo/bar".parse::().unwrap(); - assert!(uds.listen_on(addr).is_err()); + assert!(uds.listen_on(ListenerId::next(), addr).is_err()); } #[test] diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index 164cbff4..91236ca8 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -198,7 +198,11 @@ impl Transport for ExtTransport { type ListenerUpgrade = Ready>; type Dial = Dial; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + listener_id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { let iter = self.inner.listen_on(&addr.to_string()).map_err(|err| { if is_not_supported_error(&err) { TransportError::MultiaddrNotSupported(addr) @@ -206,7 +210,6 @@ impl Transport for ExtTransport { TransportError::Other(JsErr::from(err)) } })?; - let listener_id = ListenerId::new(); let listen = Listen { listener_id, iterator: SendWrapper::new(iter), @@ -215,7 +218,7 @@ impl Transport for ExtTransport { is_closed: false, }; self.listeners.push(listen); - Ok(listener_id) + Ok(()) } fn remove_listener(&mut self, id: ListenerId) -> bool { diff --git a/transports/webrtc/src/tokio/transport.rs b/transports/webrtc/src/tokio/transport.rs index 904da61c..21f465ba 100644 --- a/transports/webrtc/src/tokio/transport.rs +++ b/transports/webrtc/src/tokio/transport.rs @@ -80,9 +80,11 @@ impl libp2p_core::Transport for Transport { type ListenerUpgrade = BoxFuture<'static, Result>; type Dial = BoxFuture<'static, Result>; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - let id = ListenerId::new(); - + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { let socket_addr = parse_webrtc_listen_addr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?; let udp_mux = UDPMuxNewAddr::listen_on(socket_addr) @@ -93,7 +95,7 @@ impl libp2p_core::Transport for Transport { .map_err(|e| TransportError::Other(Error::Io(e)))?, ); - Ok(id) + Ok(()) } fn remove_listener(&mut self, id: ListenerId) -> bool { @@ -596,8 +598,12 @@ mod tests { // Run test twice to check that there is no unexpected behaviour if `QuicTransport.listener` // is temporarily empty. for _ in 0..2 { - let listener = transport - .listen_on("/ip4/0.0.0.0/udp/0/webrtc-direct".parse().unwrap()) + let listener = ListenerId::next(); + transport + .listen_on( + listener, + "/ip4/0.0.0.0/udp/0/webrtc-direct".parse().unwrap(), + ) .unwrap(); match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { TransportEvent::NewAddress { diff --git a/transports/webrtc/tests/smoke.rs b/transports/webrtc/tests/smoke.rs index bca159d7..8e56b997 100644 --- a/transports/webrtc/tests/smoke.rs +++ b/transports/webrtc/tests/smoke.rs @@ -23,7 +23,7 @@ use futures::future::{BoxFuture, Either}; use futures::stream::StreamExt; use futures::{future, ready, AsyncReadExt, AsyncWriteExt, FutureExt, SinkExt}; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; -use libp2p_core::transport::{Boxed, TransportEvent}; +use libp2p_core::transport::{Boxed, ListenerId, TransportEvent}; use libp2p_core::{Multiaddr, Transport}; use libp2p_identity::PeerId; use libp2p_webrtc as webrtc; @@ -81,7 +81,9 @@ fn create_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox)>) { } async fn start_listening(transport: &mut Boxed<(PeerId, StreamMuxerBox)>, addr: &str) -> Multiaddr { - transport.listen_on(addr.parse().unwrap()).unwrap(); + transport + .listen_on(ListenerId::next(), addr.parse().unwrap()) + .unwrap(); match transport.next().await { Some(TransportEvent::NewAddress { listen_addr, .. }) => listen_addr, e => panic!("{e:?}"), diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 318090d1..a81f6d45 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -122,7 +122,11 @@ where type ListenerUpgrade = BoxFuture<'static, Result>; type Dial = BoxFuture<'static, Result>; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { let mut inner_addr = addr.clone(); let proto = match inner_addr.pop() { Some(p @ Protocol::Wss(_)) => { @@ -139,10 +143,10 @@ where return Err(TransportError::MultiaddrNotSupported(addr)); } }; - match self.transport.lock().listen_on(inner_addr) { - Ok(id) => { + match self.transport.lock().listen_on(id, inner_addr) { + Ok(()) => { self.listener_protos.insert(id, proto); - Ok(id) + Ok(()) } Err(e) => Err(e.map(Error::Transport)), } diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 369e1d61..fa122322 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -64,7 +64,7 @@ use std::{ /// /// ``` /// # use futures::future; -/// # use libp2p_core::Transport; +/// # use libp2p_core::{transport::ListenerId, Transport}; /// # use libp2p_dns as dns; /// # use libp2p_tcp as tcp; /// # use libp2p_websocket as websocket; @@ -83,7 +83,7 @@ use std::{ /// let cert = websocket::tls::Certificate::new(rcgen_cert.serialize_der().unwrap()); /// transport.set_tls_config(websocket::tls::Config::new(priv_key, vec![cert]).unwrap()); /// -/// let id = transport.listen_on("/ip4/127.0.0.1/tcp/0/wss".parse().unwrap()).unwrap(); +/// let id = transport.listen_on(ListenerId::next(), "/ip4/127.0.0.1/tcp/0/wss".parse().unwrap()).unwrap(); /// /// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)).await.into_new_address().unwrap(); /// println!("Listening on {addr}"); @@ -95,7 +95,7 @@ use std::{ /// /// ``` /// # use futures::future; -/// # use libp2p_core::Transport; +/// # use libp2p_core::{transport::ListenerId, Transport}; /// # use libp2p_dns as dns; /// # use libp2p_tcp as tcp; /// # use libp2p_websocket as websocket; @@ -108,7 +108,7 @@ use std::{ /// tcp::async_io::Transport::new(tcp::Config::default()), /// ); /// -/// let id = transport.listen_on("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()).unwrap(); +/// let id = transport.listen_on(ListenerId::next(), "/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()).unwrap(); /// /// let addr = future::poll_fn(|cx| Pin::new(&mut transport).poll(cx)).await.into_new_address().unwrap(); /// println!("Listening on {addr}"); @@ -195,8 +195,12 @@ where type ListenerUpgrade = MapFuture, WrapperFn>; type Dial = MapFuture, WrapperFn>; - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.transport.listen_on(addr) + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + self.transport.listen_on(id, addr) } fn remove_listener(&mut self, id: ListenerId) -> bool { @@ -293,7 +297,7 @@ where mod tests { use super::WsConfig; use futures::prelude::*; - use libp2p_core::{multiaddr::Protocol, Multiaddr, Transport}; + use libp2p_core::{multiaddr::Protocol, transport::ListenerId, Multiaddr, Transport}; use libp2p_identity::PeerId; use libp2p_tcp as tcp; @@ -315,7 +319,9 @@ mod tests { async fn connect(listen_addr: Multiaddr) { let mut ws_config = new_ws_config().boxed(); - ws_config.listen_on(listen_addr).expect("listener"); + ws_config + .listen_on(ListenerId::next(), listen_addr) + .expect("listener"); let addr = ws_config .next()