// Copyright 2020 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. mod error; pub(crate) mod pool; mod supported_protocols; pub use error::ConnectionError; pub(crate) use error::{ PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; pub use supported_protocols::SupportedProtocols; use crate::handler::{ AddressChange, ConnectionEvent, ConnectionHandler, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, ProtocolSupport, ProtocolsAdded, ProtocolsChange, UpgradeInfoSend, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; use crate::{ ConnectionHandlerEvent, KeepAlive, Stream, StreamProtocol, StreamUpgradeError, SubstreamProtocol, }; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use futures_timer::Delay; use instant::Instant; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt, SubstreamBox}; use libp2p_core::upgrade; use libp2p_core::upgrade::{NegotiationError, ProtocolError}; use libp2p_core::Endpoint; use libp2p_identity::PeerId; use std::collections::HashSet; use std::fmt::{Display, Formatter}; use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::Waker; use std::time::Duration; use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll}; static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1); /// Connection identifier. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ConnectionId(usize); impl ConnectionId { /// Creates an _unchecked_ [`ConnectionId`]. /// /// [`Swarm`](crate::Swarm) enforces that [`ConnectionId`]s are unique and not reused. /// This constructor does not, hence the _unchecked_. /// /// It is primarily meant for allowing manual tests of [`NetworkBehaviour`](crate::NetworkBehaviour)s. pub fn new_unchecked(id: usize) -> Self { Self(id) } /// Returns the next available [`ConnectionId`]. pub(crate) fn next() -> Self { Self(NEXT_CONNECTION_ID.fetch_add(1, Ordering::SeqCst)) } } impl Display for ConnectionId { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!(f, "{}", self.0) } } /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct Connected { /// The connected endpoint, including network address information. pub(crate) endpoint: ConnectedPoint, /// Information obtained from the transport. pub(crate) peer_id: PeerId, } /// Event generated by a [`Connection`]. #[derive(Debug, Clone)] pub(crate) enum Event { /// Event generated by the [`ConnectionHandler`]. Handler(T), /// Address of the remote has changed. AddressChange(Multiaddr), } /// A multiplexed connection to a peer with an associated [`ConnectionHandler`]. pub(crate) struct Connection where THandler: ConnectionHandler, { /// Node that handles the muxing. muxing: StreamMuxerBox, /// The underlying handler. handler: THandler, /// Futures that upgrade incoming substreams. negotiating_in: FuturesUnordered< StreamUpgrade< THandler::InboundOpenInfo, ::Output, ::Error, >, >, /// Futures that upgrade outgoing substreams. negotiating_out: FuturesUnordered< StreamUpgrade< THandler::OutboundOpenInfo, ::Output, ::Error, >, >, /// The currently planned connection & handler shutdown. shutdown: Shutdown, /// The substream upgrade protocol override, if any. substream_upgrade_protocol_override: Option, /// The maximum number of inbound streams concurrently negotiating on a /// connection. New inbound streams exceeding the limit are dropped and thus /// reset. /// /// Note: This only enforces a limit on the number of concurrently /// negotiating inbound streams. The total number of inbound streams on a /// connection is the sum of negotiating and negotiated streams. A limit on /// the total number of streams can be enforced at the /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. max_negotiating_inbound_streams: usize, /// Contains all upgrades that are waiting for a new outbound substream. /// /// The upgrade timeout is already ticking here so this may fail in case the remote is not quick /// enough in providing us with a new stream. requested_substreams: FuturesUnordered< SubstreamRequested, >, local_supported_protocols: HashSet, remote_supported_protocols: HashSet, } impl fmt::Debug for Connection where THandler: ConnectionHandler + fmt::Debug, THandler::OutboundOpenInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") .field("handler", &self.handler) .finish() } } impl Unpin for Connection where THandler: ConnectionHandler {} impl Connection where THandler: ConnectionHandler, { /// Builds a new `Connection` from the given substream multiplexer /// and connection handler. pub(crate) fn new( muxer: StreamMuxerBox, mut handler: THandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, ) -> Self { let initial_protocols = gather_supported_protocols(&handler); if !initial_protocols.is_empty() { handler.on_connection_event(ConnectionEvent::LocalProtocolsChange( ProtocolsChange::Added(ProtocolsAdded::from_set(&initial_protocols)), )); } Connection { muxing: muxer, handler, negotiating_in: Default::default(), negotiating_out: Default::default(), shutdown: Shutdown::None, substream_upgrade_protocol_override, max_negotiating_inbound_streams, requested_substreams: Default::default(), local_supported_protocols: initial_protocols, remote_supported_protocols: Default::default(), } } /// Notifies the connection handler of an event. pub(crate) fn on_behaviour_event(&mut self, event: THandler::FromBehaviour) { self.handler.on_behaviour_event(event); } /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. pub(crate) fn close(self) -> (THandler, impl Future>) { (self.handler, self.muxing.close()) } /// Polls the handler and the substream, forwarding events from the former to the latter and /// vice versa. pub(crate) fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, ConnectionError>> { let Self { requested_substreams, muxing, handler, negotiating_out, negotiating_in, shutdown, max_negotiating_inbound_streams, substream_upgrade_protocol_override, local_supported_protocols: supported_protocols, remote_supported_protocols, } = self.get_mut(); loop { match requested_substreams.poll_next_unpin(cx) { Poll::Ready(Some(Ok(()))) => continue, Poll::Ready(Some(Err(info))) => { handler.on_connection_event(ConnectionEvent::DialUpgradeError( DialUpgradeError { info, error: StreamUpgradeError::Timeout, }, )); continue; } Poll::Ready(None) | Poll::Pending => {} } // Poll the [`ConnectionHandler`]. match handler.poll(cx) { Poll::Pending => {} Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { let timeout = *protocol.timeout(); let (upgrade, user_data) = protocol.into_upgrade(); requested_substreams.push(SubstreamRequested::new(user_data, timeout, upgrade)); continue; // Poll handler until exhausted. } Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) => { return Poll::Ready(Ok(Event::Handler(event))); } Poll::Ready(ConnectionHandlerEvent::Close(err)) => { return Poll::Ready(Err(ConnectionError::Handler(err))); } Poll::Ready(ConnectionHandlerEvent::ReportRemoteProtocols( ProtocolSupport::Added(protocols), )) => { if let Some(added) = ProtocolsChange::add(remote_supported_protocols, &protocols) { handler.on_connection_event(ConnectionEvent::RemoteProtocolsChange(added)); remote_supported_protocols.extend(protocols); } continue; } Poll::Ready(ConnectionHandlerEvent::ReportRemoteProtocols( ProtocolSupport::Removed(protocols), )) => { if let Some(removed) = ProtocolsChange::remove(remote_supported_protocols, &protocols) { handler .on_connection_event(ConnectionEvent::RemoteProtocolsChange(removed)); remote_supported_protocols.retain(|p| !protocols.contains(p)); } continue; } } // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. match negotiating_out.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} Poll::Ready(Some((info, Ok(protocol)))) => { handler.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound( FullyNegotiatedOutbound { protocol, info }, )); continue; } Poll::Ready(Some((info, Err(error)))) => { handler.on_connection_event(ConnectionEvent::DialUpgradeError( DialUpgradeError { info, error }, )); continue; } } // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not // make any more progress, poll the negotiating inbound streams. match negotiating_in.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} Poll::Ready(Some((info, Ok(protocol)))) => { handler.on_connection_event(ConnectionEvent::FullyNegotiatedInbound( FullyNegotiatedInbound { protocol, info }, )); continue; } Poll::Ready(Some((info, Err(StreamUpgradeError::Apply(error))))) => { handler.on_connection_event(ConnectionEvent::ListenUpgradeError( ListenUpgradeError { info, error }, )); continue; } Poll::Ready(Some((_, Err(StreamUpgradeError::Io(e))))) => { log::debug!("failed to upgrade inbound stream: {e}"); continue; } Poll::Ready(Some((_, Err(StreamUpgradeError::NegotiationFailed)))) => { log::debug!("no protocol could be agreed upon for inbound stream"); continue; } Poll::Ready(Some((_, Err(StreamUpgradeError::Timeout)))) => { log::debug!("inbound stream upgrade timed out"); continue; } } // Ask the handler whether it wants the connection (and the handler itself) // to be kept alive, which determines the planned shutdown, if any. let keep_alive = handler.connection_keep_alive(); match (&mut *shutdown, keep_alive) { (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { if *deadline != t { *deadline = t; if let Some(dur) = deadline.checked_duration_since(Instant::now()) { timer.reset(dur) } } } (_, KeepAlive::Until(t)) => { if let Some(dur) = t.checked_duration_since(Instant::now()) { *shutdown = Shutdown::Later(Delay::new(dur), t) } } (_, KeepAlive::No) => *shutdown = Shutdown::Asap, (_, KeepAlive::Yes) => *shutdown = Shutdown::None, }; // Check if the connection (and handler) should be shut down. // As long as we're still negotiating substreams, shutdown is always postponed. if negotiating_in.is_empty() && negotiating_out.is_empty() && requested_substreams.is_empty() { match shutdown { Shutdown::None => {} Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), Shutdown::Later(delay, _) => match Future::poll(Pin::new(delay), cx) { Poll::Ready(_) => { return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) } Poll::Pending => {} }, } } match muxing.poll_unpin(cx)? { Poll::Pending => {} Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { handler.on_connection_event(ConnectionEvent::AddressChange(AddressChange { new_address: &address, })); return Poll::Ready(Ok(Event::AddressChange(address))); } } if let Some(requested_substream) = requested_substreams.iter_mut().next() { match muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { let (user_data, timeout, upgrade) = requested_substream.extract(); negotiating_out.push(StreamUpgrade::new_outbound( substream, user_data, timeout, upgrade, *substream_upgrade_protocol_override, )); continue; // Go back to the top, handler can potentially make progress again. } } } if negotiating_in.len() < *max_negotiating_inbound_streams { match muxing.poll_inbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { let protocol = handler.listen_protocol(); negotiating_in.push(StreamUpgrade::new_inbound(substream, protocol)); continue; // Go back to the top, handler can potentially make progress again. } } } let new_protocols = gather_supported_protocols(handler); let changes = ProtocolsChange::from_full_sets(supported_protocols, &new_protocols); if !changes.is_empty() { for change in changes { handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(change)); } *supported_protocols = new_protocols; continue; // Go back to the top, handler can potentially make progress again. } return Poll::Pending; // Nothing can make progress, return `Pending`. } } #[cfg(test)] fn poll_noop_waker( &mut self, ) -> Poll, ConnectionError>> { Pin::new(self).poll(&mut Context::from_waker(futures::task::noop_waker_ref())) } } fn gather_supported_protocols(handler: &impl ConnectionHandler) -> HashSet { handler .listen_protocol() .upgrade() .protocol_info() .filter_map(|i| StreamProtocol::try_from_owned(i.as_ref().to_owned()).ok()) .collect() } /// Borrowed information about an incoming connection currently being negotiated. #[derive(Debug, Copy, Clone)] pub(crate) struct IncomingInfo<'a> { /// Local connection address. pub(crate) local_addr: &'a Multiaddr, /// Address used to send back data to the remote. pub(crate) send_back_addr: &'a Multiaddr, } impl<'a> IncomingInfo<'a> { /// Builds the [`ConnectedPoint`] corresponding to the incoming connection. pub(crate) fn create_connected_point(&self) -> ConnectedPoint { ConnectedPoint::Listener { local_addr: self.local_addr.clone(), send_back_addr: self.send_back_addr.clone(), } } } struct StreamUpgrade { user_data: Option, timeout: Delay, upgrade: BoxFuture<'static, Result>>, } impl StreamUpgrade { fn new_outbound( substream: SubstreamBox, user_data: UserData, timeout: Delay, upgrade: Upgrade, version_override: Option, ) -> Self where Upgrade: OutboundUpgradeSend, { let effective_version = match version_override { Some(version_override) if version_override != upgrade::Version::default() => { log::debug!( "Substream upgrade protocol override: {:?} -> {:?}", upgrade::Version::default(), version_override ); version_override } _ => upgrade::Version::default(), }; let protocols = upgrade.protocol_info(); Self { user_data: Some(user_data), timeout, upgrade: Box::pin(async move { let (info, stream) = multistream_select::dialer_select_proto( substream, protocols, effective_version, ) .await .map_err(to_stream_upgrade_error)?; let output = upgrade .upgrade_outbound(Stream::new(stream), info) .await .map_err(StreamUpgradeError::Apply)?; Ok(output) }), } } } impl StreamUpgrade { fn new_inbound( substream: SubstreamBox, protocol: SubstreamProtocol, ) -> Self where Upgrade: InboundUpgradeSend, { let timeout = *protocol.timeout(); let (upgrade, open_info) = protocol.into_upgrade(); let protocols = upgrade.protocol_info(); Self { user_data: Some(open_info), timeout: Delay::new(timeout), upgrade: Box::pin(async move { let (info, stream) = multistream_select::listener_select_proto(substream, protocols) .await .map_err(to_stream_upgrade_error)?; let output = upgrade .upgrade_inbound(Stream::new(stream), info) .await .map_err(StreamUpgradeError::Apply)?; Ok(output) }), } } } fn to_stream_upgrade_error(e: NegotiationError) -> StreamUpgradeError { match e { NegotiationError::Failed => StreamUpgradeError::NegotiationFailed, NegotiationError::ProtocolError(ProtocolError::IoError(e)) => StreamUpgradeError::Io(e), NegotiationError::ProtocolError(other) => { StreamUpgradeError::Io(io::Error::new(io::ErrorKind::Other, other)) } } } impl Unpin for StreamUpgrade {} impl Future for StreamUpgrade { type Output = (UserData, Result>); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.timeout.poll_unpin(cx) { Poll::Ready(()) => { return Poll::Ready(( self.user_data .take() .expect("Future not to be polled again once ready."), Err(StreamUpgradeError::Timeout), )) } Poll::Pending => {} } let result = futures::ready!(self.upgrade.poll_unpin(cx)); let user_data = self .user_data .take() .expect("Future not to be polled again once ready."); Poll::Ready((user_data, result)) } } enum SubstreamRequested { Waiting { user_data: UserData, timeout: Delay, upgrade: Upgrade, /// A waker to notify our [`FuturesUnordered`] that we have extracted the data. /// /// This will ensure that we will get polled again in the next iteration which allows us to /// resolve with `Ok(())` and be removed from the [`FuturesUnordered`]. extracted_waker: Option, }, Done, } impl SubstreamRequested { fn new(user_data: UserData, timeout: Duration, upgrade: Upgrade) -> Self { Self::Waiting { user_data, timeout: Delay::new(timeout), upgrade, extracted_waker: None, } } fn extract(&mut self) -> (UserData, Delay, Upgrade) { match mem::replace(self, Self::Done) { SubstreamRequested::Waiting { user_data, timeout, upgrade, extracted_waker: waker, } => { if let Some(waker) = waker { waker.wake(); } (user_data, timeout, upgrade) } SubstreamRequested::Done => panic!("cannot extract twice"), } } } impl Unpin for SubstreamRequested {} impl Future for SubstreamRequested { type Output = Result<(), UserData>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match mem::replace(this, Self::Done) { SubstreamRequested::Waiting { user_data, upgrade, mut timeout, .. } => match timeout.poll_unpin(cx) { Poll::Ready(()) => Poll::Ready(Err(user_data)), Poll::Pending => { *this = Self::Waiting { user_data, upgrade, timeout, extracted_waker: Some(cx.waker().clone()), }; Poll::Pending } }, SubstreamRequested::Done => Poll::Ready(Ok(())), } } } /// The options for a planned connection & handler shutdown. /// /// A shutdown is planned anew based on the the return value of /// [`ConnectionHandler::connection_keep_alive`] of the underlying handler /// after every invocation of [`ConnectionHandler::poll`]. /// /// A planned shutdown is always postponed for as long as there are ingoing /// or outgoing substreams being negotiated, i.e. it is a graceful, "idle" /// shutdown. #[derive(Debug)] enum Shutdown { /// No shutdown is planned. None, /// A shut down is planned as soon as possible. Asap, /// A shut down is planned for when a `Delay` has elapsed. Later(Delay, Instant), } #[cfg(test)] mod tests { use super::*; use crate::keep_alive; use futures::future; use futures::AsyncRead; use futures::AsyncWrite; use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::StreamMuxer; use quickcheck::*; use std::sync::{Arc, Weak}; use void::Void; #[test] fn max_negotiating_inbound_streams() { fn prop(max_negotiating_inbound_streams: u8) { let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); let alive_substream_counter = Arc::new(()); let mut connection = Connection::new( StreamMuxerBox::new(DummyStreamMuxer { counter: alive_substream_counter.clone(), }), keep_alive::ConnectionHandler, None, max_negotiating_inbound_streams, ); let result = connection.poll_noop_waker(); assert!(result.is_pending()); assert_eq!( Arc::weak_count(&alive_substream_counter), max_negotiating_inbound_streams, "Expect no more than the maximum number of allowed streams" ); } QuickCheck::new().quickcheck(prop as fn(_)); } #[test] fn outbound_stream_timeout_starts_on_request() { let upgrade_timeout = Duration::from_secs(1); let mut connection = Connection::new( StreamMuxerBox::new(PendingStreamMuxer), MockConnectionHandler::new(upgrade_timeout), None, 2, ); connection.handler.open_new_outbound(); let _ = connection.poll_noop_waker(); std::thread::sleep(upgrade_timeout + Duration::from_secs(1)); let _ = connection.poll_noop_waker(); assert!(matches!( connection.handler.error.unwrap(), StreamUpgradeError::Timeout )) } #[test] fn propagates_changes_to_supported_inbound_protocols() { let mut connection = Connection::new( StreamMuxerBox::new(PendingStreamMuxer), ConfigurableProtocolConnectionHandler::default(), None, 0, ); // First, start listening on a single protocol. connection.handler.listen_on(&["/foo"]); let _ = connection.poll_noop_waker(); assert_eq!(connection.handler.local_added, vec![vec!["/foo"]]); assert!(connection.handler.local_removed.is_empty()); // Second, listen on two protocols. connection.handler.listen_on(&["/foo", "/bar"]); let _ = connection.poll_noop_waker(); assert_eq!( connection.handler.local_added, vec![vec!["/foo"], vec!["/bar"]], "expect to only receive an event for the newly added protocols" ); assert!(connection.handler.local_removed.is_empty()); // Third, stop listening on the first protocol. connection.handler.listen_on(&["/bar"]); let _ = connection.poll_noop_waker(); assert_eq!( connection.handler.local_added, vec![vec!["/foo"], vec!["/bar"]] ); assert_eq!(connection.handler.local_removed, vec![vec!["/foo"]]); } #[test] fn only_propagtes_actual_changes_to_remote_protocols_to_handler() { let mut connection = Connection::new( StreamMuxerBox::new(PendingStreamMuxer), ConfigurableProtocolConnectionHandler::default(), None, 0, ); // First, remote supports a single protocol. connection.handler.remote_adds_support_for(&["/foo"]); let _ = connection.poll_noop_waker(); assert_eq!(connection.handler.remote_added, vec![vec!["/foo"]]); assert!(connection.handler.remote_removed.is_empty()); // Second, it adds a protocol but also still includes the first one. connection .handler .remote_adds_support_for(&["/foo", "/bar"]); let _ = connection.poll_noop_waker(); assert_eq!( connection.handler.remote_added, vec![vec!["/foo"], vec!["/bar"]], "expect to only receive an event for the newly added protocol" ); assert!(connection.handler.remote_removed.is_empty()); // Third, stop listening on a protocol it never advertised (we can't control what handlers do so this needs to be handled gracefully). connection.handler.remote_removes_support_for(&["/baz"]); let _ = connection.poll_noop_waker(); assert_eq!( connection.handler.remote_added, vec![vec!["/foo"], vec!["/bar"]] ); assert!(&connection.handler.remote_removed.is_empty()); // Fourth, stop listening on a protocol that was previously supported connection.handler.remote_removes_support_for(&["/bar"]); let _ = connection.poll_noop_waker(); assert_eq!( connection.handler.remote_added, vec![vec!["/foo"], vec!["/bar"]] ); assert_eq!(connection.handler.remote_removed, vec![vec!["/bar"]]); } struct DummyStreamMuxer { counter: Arc<()>, } impl StreamMuxer for DummyStreamMuxer { type Substream = PendingSubstream; type Error = Void; fn poll_inbound( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Ready(Ok(PendingSubstream(Arc::downgrade(&self.counter)))) } fn poll_outbound( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Pending } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Pending } } /// A [`StreamMuxer`] which never returns a stream. struct PendingStreamMuxer; impl StreamMuxer for PendingStreamMuxer { type Substream = PendingSubstream; type Error = Void; fn poll_inbound( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Pending } fn poll_outbound( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Pending } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Pending } fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll> { Poll::Pending } } struct PendingSubstream(Weak<()>); impl AsyncRead for PendingSubstream { fn poll_read( self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &mut [u8], ) -> Poll> { Poll::Pending } } impl AsyncWrite for PendingSubstream { fn poll_write( self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &[u8], ) -> Poll> { Poll::Pending } fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Pending } fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Pending } } struct MockConnectionHandler { outbound_requested: bool, error: Option>, upgrade_timeout: Duration, } impl MockConnectionHandler { fn new(upgrade_timeout: Duration) -> Self { Self { outbound_requested: false, error: None, upgrade_timeout, } } fn open_new_outbound(&mut self) { self.outbound_requested = true; } } #[derive(Default)] struct ConfigurableProtocolConnectionHandler { events: Vec>, active_protocols: HashSet, local_added: Vec>, local_removed: Vec>, remote_added: Vec>, remote_removed: Vec>, } impl ConfigurableProtocolConnectionHandler { fn listen_on(&mut self, protocols: &[&'static str]) { self.active_protocols = protocols.iter().copied().map(StreamProtocol::new).collect(); } fn remote_adds_support_for(&mut self, protocols: &[&'static str]) { self.events .push(ConnectionHandlerEvent::ReportRemoteProtocols( ProtocolSupport::Added( protocols.iter().copied().map(StreamProtocol::new).collect(), ), )); } fn remote_removes_support_for(&mut self, protocols: &[&'static str]) { self.events .push(ConnectionHandlerEvent::ReportRemoteProtocols( ProtocolSupport::Removed( protocols.iter().copied().map(StreamProtocol::new).collect(), ), )); } } impl ConnectionHandler for MockConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; type Error = Void; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); type OutboundOpenInfo = (); fn listen_protocol( &self, ) -> SubstreamProtocol { SubstreamProtocol::new(DeniedUpgrade, ()).with_timeout(self.upgrade_timeout) } fn on_connection_event( &mut self, event: ConnectionEvent< Self::InboundProtocol, Self::OutboundProtocol, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol, .. }) => void::unreachable(protocol), ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol, .. }) => void::unreachable(protocol), ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { self.error = Some(error) } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) | ConnectionEvent::LocalProtocolsChange(_) | ConnectionEvent::RemoteProtocolsChange(_) => {} } } fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { void::unreachable(event) } fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Yes } fn poll( &mut self, _: &mut Context<'_>, ) -> Poll< ConnectionHandlerEvent< Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour, Self::Error, >, > { if self.outbound_requested { self.outbound_requested = false; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(DeniedUpgrade, ()) .with_timeout(self.upgrade_timeout), }); } Poll::Pending } } impl ConnectionHandler for ConfigurableProtocolConnectionHandler { type FromBehaviour = Void; type ToBehaviour = Void; type Error = Void; type InboundProtocol = ManyProtocolsUpgrade; type OutboundProtocol = DeniedUpgrade; type InboundOpenInfo = (); type OutboundOpenInfo = (); fn listen_protocol( &self, ) -> SubstreamProtocol { SubstreamProtocol::new( ManyProtocolsUpgrade { protocols: Vec::from_iter(self.active_protocols.clone()), }, (), ) } fn on_connection_event( &mut self, event: ConnectionEvent< Self::InboundProtocol, Self::OutboundProtocol, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, ) { match event { ConnectionEvent::LocalProtocolsChange(ProtocolsChange::Added(added)) => { self.local_added.push(added.cloned().collect()) } ConnectionEvent::LocalProtocolsChange(ProtocolsChange::Removed(removed)) => { self.local_removed.push(removed.cloned().collect()) } ConnectionEvent::RemoteProtocolsChange(ProtocolsChange::Added(added)) => { self.remote_added.push(added.cloned().collect()) } ConnectionEvent::RemoteProtocolsChange(ProtocolsChange::Removed(removed)) => { self.remote_removed.push(removed.cloned().collect()) } _ => {} } } fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { void::unreachable(event) } fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Yes } fn poll( &mut self, _: &mut Context<'_>, ) -> Poll< ConnectionHandlerEvent< Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour, Self::Error, >, > { if let Some(event) = self.events.pop() { return Poll::Ready(event); } Poll::Pending } } struct ManyProtocolsUpgrade { protocols: Vec, } impl UpgradeInfo for ManyProtocolsUpgrade { type Info = StreamProtocol; type InfoIter = std::vec::IntoIter; fn protocol_info(&self) -> Self::InfoIter { self.protocols.clone().into_iter() } } impl InboundUpgrade for ManyProtocolsUpgrade { type Output = C; type Error = Void; type Future = future::Ready>; fn upgrade_inbound(self, stream: C, _: Self::Info) -> Self::Future { future::ready(Ok(stream)) } } impl OutboundUpgrade for ManyProtocolsUpgrade { type Output = C; type Error = Void; type Future = future::Ready>; fn upgrade_outbound(self, stream: C, _: Self::Info) -> Self::Future { future::ready(Ok(stream)) } } } /// The endpoint roles associated with a pending peer-to-peer connection. #[derive(Debug, Clone, PartialEq, Eq, Hash)] enum PendingPoint { /// The socket comes from a dialer. /// /// There is no single address associated with the Dialer of a pending /// connection. Addresses are dialed in parallel. Only once the first dial /// is successful is the address of the connection known. Dialer { /// Same as [`ConnectedPoint::Dialer`] `role_override`. role_override: Endpoint, }, /// The socket comes from a listener. Listener { /// Local connection address. local_addr: Multiaddr, /// Address used to send back data to the remote. send_back_addr: Multiaddr, }, } impl From for PendingPoint { fn from(endpoint: ConnectedPoint) -> Self { match endpoint { ConnectedPoint::Dialer { role_override, .. } => PendingPoint::Dialer { role_override }, ConnectedPoint::Listener { local_addr, send_back_addr, } => PendingPoint::Listener { local_addr, send_back_addr, }, } } }