diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 0f78a21a..dc567c8c 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -19,15 +19,12 @@ // DEALINGS IN THE SOFTWARE. mod error; +mod handler_wrapper; mod listeners; mod substream; pub(crate) mod pool; -use crate::protocols_handler::{ - NodeHandlerWrapper, NodeHandlerWrapperError, NodeHandlerWrapperEvent, - NodeHandlerWrapperOutboundOpenInfo, ProtocolsHandler, -}; pub use error::{ ConnectionError, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, @@ -37,9 +34,12 @@ pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; pub use substream::{Close, Substream, SubstreamEndpoint}; +use crate::protocols_handler::ProtocolsHandler; +use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::upgrade; use libp2p_core::PeerId; use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; use substream::{Muxing, SubstreamEvent}; @@ -56,21 +56,21 @@ pub struct Connected { /// Event generated by a [`Connection`]. #[derive(Debug, Clone)] pub enum Event { - /// Event generated by the [`NodeHandlerWrapper`]. + /// Event generated by the [`ProtocolsHandler`]. Handler(T), /// Address of the remote has changed. AddressChange(Multiaddr), } -/// A multiplexed connection to a peer with an associated [`NodeHandlerWrapper`]. +/// A multiplexed connection to a peer with an associated [`ProtocolsHandler`]. pub struct Connection where THandler: ProtocolsHandler, { /// Node that handles the muxing. - muxing: substream::Muxing>, + muxing: substream::Muxing>, /// Handler that processes substreams. - handler: NodeHandlerWrapper, + handler: HandlerWrapper, } impl fmt::Debug for Connection @@ -93,10 +93,15 @@ where { /// Builds a new `Connection` from the given substream multiplexer /// and connection handler. - pub fn new(muxer: StreamMuxerBox, handler: NodeHandlerWrapper) -> Self { + pub fn new( + muxer: StreamMuxerBox, + handler: THandler, + substream_upgrade_protocol_override: Option, + ) -> Self { + let wrapped_handler = HandlerWrapper::new(handler, substream_upgrade_protocol_override); Connection { muxing: Muxing::new(muxer), - handler, + handler: wrapped_handler, } } @@ -107,8 +112,8 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. - pub fn close(self) -> (NodeHandlerWrapper, Close) { - (self.handler, self.muxing.close().0) + pub fn close(self) -> (THandler, Close) { + (self.handler.into_protocols_handler(), self.muxing.close().0) } /// Polls the connection for events produced by the associated handler @@ -116,12 +121,7 @@ where pub fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll< - Result< - Event, - ConnectionError>, - >, - > { + ) -> Poll, ConnectionError>> { loop { let mut io_pending = false; @@ -153,13 +153,13 @@ where return Poll::Pending; // Nothing to do } } - Poll::Ready(Ok(NodeHandlerWrapperEvent::OutboundSubstreamRequest(user_data))) => { + Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => { self.muxing.open_substream(user_data); } - Poll::Ready(Ok(NodeHandlerWrapperEvent::Custom(event))) => { + Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => { return Poll::Ready(Ok(Event::Handler(event))); } - Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))), + Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), } } } diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 33d97ff9..4e8d85a8 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use super::handler_wrapper; use crate::transport::TransportError; use crate::Multiaddr; use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; @@ -30,6 +31,9 @@ pub enum ConnectionError { // TODO: Eventually this should also be a custom error? IO(io::Error), + /// The connection keep-alive timeout expired. + KeepAliveTimeout, + /// The connection handler produced an error. Handler(THandlerErr), } @@ -41,6 +45,9 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err), + ConnectionError::KeepAliveTimeout => { + write!(f, "Connection closed due to expired keep-alive timeout.") + } ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err), } } @@ -53,11 +60,21 @@ where fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { ConnectionError::IO(err) => Some(err), + ConnectionError::KeepAliveTimeout => None, ConnectionError::Handler(err) => Some(err), } } } +impl From> for ConnectionError { + fn from(error: handler_wrapper::Error) -> Self { + match error { + handler_wrapper::Error::Handler(e) => Self::Handler(e), + handler_wrapper::Error::KeepAliveTimeout => Self::KeepAliveTimeout, + } + } +} + /// Errors that can occur in the context of a pending outgoing `Connection`. /// /// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/connection/handler_wrapper.rs similarity index 77% rename from swarm/src/protocols_handler/node_handler.rs rename to swarm/src/connection/handler_wrapper.rs index 22efbff6..672d7b62 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -18,10 +18,9 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::connection::{Connected, Substream, SubstreamEndpoint}; +use crate::connection::{Substream, SubstreamEndpoint}; use crate::protocols_handler::{ - IntoProtocolsHandler, KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, - ProtocolsHandlerUpgrErr, + KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, }; use crate::upgrade::SendWrapper; @@ -36,59 +35,6 @@ use libp2p_core::{ }; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; -/// Prototype for a `NodeHandlerWrapper`. -pub struct NodeHandlerWrapperBuilder { - /// The underlying handler. - handler: TIntoProtoHandler, - /// The substream upgrade protocol override, if any. - substream_upgrade_protocol_override: Option, -} - -impl NodeHandlerWrapperBuilder -where - TIntoProtoHandler: IntoProtocolsHandler, -{ - /// Builds a `NodeHandlerWrapperBuilder`. - pub(crate) fn new(handler: TIntoProtoHandler) -> Self { - NodeHandlerWrapperBuilder { - handler, - substream_upgrade_protocol_override: None, - } - } - - pub(crate) fn with_substream_upgrade_protocol_override( - mut self, - version: Option, - ) -> Self { - self.substream_upgrade_protocol_override = version; - self - } - - pub(crate) fn into_protocols_handler(self) -> TIntoProtoHandler { - self.handler - } -} - -impl NodeHandlerWrapperBuilder -where - TIntoProtoHandler: IntoProtocolsHandler, - TProtoHandler: ProtocolsHandler, -{ - pub fn into_handler(self, connected: &Connected) -> NodeHandlerWrapper { - NodeHandlerWrapper { - handler: self - .handler - .into_handler(&connected.peer_id, &connected.endpoint), - negotiating_in: Default::default(), - negotiating_out: Default::default(), - queued_dial_upgrades: Vec::new(), - unique_dial_upgrade_id: 0, - shutdown: Shutdown::None, - substream_upgrade_protocol_override: self.substream_upgrade_protocol_override, - } - } -} - /// A wrapper for an underlying [`ProtocolsHandler`]. /// /// It extends [`ProtocolsHandler`] with: @@ -96,7 +42,7 @@ where /// - Driving substream upgrades /// - Handling connection timeout // TODO: add a caching system for protocols that are supported or not -pub struct NodeHandlerWrapper +pub struct HandlerWrapper where TProtoHandler: ProtocolsHandler, { @@ -133,9 +79,9 @@ where substream_upgrade_protocol_override: Option, } -impl std::fmt::Debug for NodeHandlerWrapper { +impl std::fmt::Debug for HandlerWrapper { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("NodeHandlerWrapper") + f.debug_struct("HandlerWrapper") .field("negotiating_in", &self.negotiating_in) .field("negotiating_out", &self.negotiating_out) .field("unique_dial_upgrade_id", &self.unique_dial_upgrade_id) @@ -148,7 +94,22 @@ impl std::fmt::Debug for NodeHandlerWrapper NodeHandlerWrapper { +impl HandlerWrapper { + pub(crate) fn new( + handler: TProtoHandler, + substream_upgrade_protocol_override: Option, + ) -> Self { + Self { + handler, + negotiating_in: Default::default(), + negotiating_out: Default::default(), + queued_dial_upgrades: Vec::new(), + unique_dial_upgrade_id: 0, + shutdown: Shutdown::None, + substream_upgrade_protocol_override, + } + } + pub(crate) fn into_protocols_handler(self) -> TProtoHandler { self.handler } @@ -222,54 +183,54 @@ enum Shutdown { Later(Delay, Instant), } -/// Error generated by the `NodeHandlerWrapper`. +/// Error generated by the [`HandlerWrapper`]. #[derive(Debug)] -pub enum NodeHandlerWrapperError { +pub enum Error { /// The connection handler encountered an error. Handler(TErr), /// The connection keep-alive timeout expired. KeepAliveTimeout, } -impl From for NodeHandlerWrapperError { - fn from(err: TErr) -> NodeHandlerWrapperError { - NodeHandlerWrapperError::Handler(err) +impl From for Error { + fn from(err: TErr) -> Error { + Error::Handler(err) } } -impl fmt::Display for NodeHandlerWrapperError +impl fmt::Display for Error where TErr: fmt::Display, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - NodeHandlerWrapperError::Handler(err) => write!(f, "{}", err), - NodeHandlerWrapperError::KeepAliveTimeout => { + Error::Handler(err) => write!(f, "{}", err), + Error::KeepAliveTimeout => { write!(f, "Connection closed due to expired keep-alive timeout.") } } } } -impl error::Error for NodeHandlerWrapperError +impl error::Error for Error where TErr: error::Error + 'static, { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { - NodeHandlerWrapperError::Handler(err) => Some(err), - NodeHandlerWrapperError::KeepAliveTimeout => None, + Error::Handler(err) => Some(err), + Error::KeepAliveTimeout => None, } } } -pub type NodeHandlerWrapperOutboundOpenInfo = ( +pub type OutboundOpenInfo = ( u64, ::OutboundOpenInfo, Duration, ); -impl NodeHandlerWrapper +impl HandlerWrapper where TProtoHandler: ProtocolsHandler, { @@ -278,7 +239,7 @@ where substream: Substream, // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). - endpoint: SubstreamEndpoint>, + endpoint: SubstreamEndpoint>, ) { match endpoint { SubstreamEndpoint::Listener => { @@ -342,11 +303,8 @@ where cx: &mut Context<'_>, ) -> Poll< Result< - NodeHandlerWrapperEvent< - NodeHandlerWrapperOutboundOpenInfo, - TProtoHandler::OutEvent, - >, - NodeHandlerWrapperError, + Event, TProtoHandler::OutEvent>, + Error, >, > { while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { @@ -393,7 +351,7 @@ where match poll_result { Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { - return Poll::Ready(Ok(NodeHandlerWrapperEvent::Custom(event))); + return Poll::Ready(Ok(Event::Custom(event))); } Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { let id = self.unique_dial_upgrade_id; @@ -401,9 +359,7 @@ where self.unique_dial_upgrade_id += 1; let (upgrade, info) = protocol.into_upgrade(); self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); - return Poll::Ready(Ok(NodeHandlerWrapperEvent::OutboundSubstreamRequest(( - id, info, timeout, - )))); + return Poll::Ready(Ok(Event::OutboundSubstreamRequest((id, info, timeout)))); } Poll::Ready(ProtocolsHandlerEvent::Close(err)) => return Poll::Ready(Err(err.into())), Poll::Pending => (), @@ -414,13 +370,9 @@ where if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { match self.shutdown { Shutdown::None => {} - Shutdown::Asap => { - return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout)) - } + Shutdown::Asap => return Poll::Ready(Err(Error::KeepAliveTimeout)), Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) { - Poll::Ready(_) => { - return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout)) - } + Poll::Ready(_) => return Poll::Ready(Err(Error::KeepAliveTimeout)), Poll::Pending => {} }, } @@ -430,9 +382,9 @@ where } } -/// Event produced by a [`NodeHandlerWrapper`]. +/// Event produced by a [`HandlerWrapper`]. #[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum NodeHandlerWrapperEvent { +pub enum Event { /// Require a new outbound substream to be opened with the remote. OutboundSubstreamRequest(TOutboundOpenInfo), diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index d9903f61..bb8e4760 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -25,7 +25,6 @@ use crate::{ Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, - protocols_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError}, transport::{Transport, TransportError}, ConnectedPoint, Executor, IntoProtocolsHandler, Multiaddr, PeerId, ProtocolsHandler, }; @@ -85,6 +84,9 @@ where /// Number of addresses concurrently dialed for a single outbound connection attempt. dial_concurrency_factor: NonZeroU8, + /// The configured override for substream protocol upgrades, if any. + substream_upgrade_protocol_override: Option, + /// The executor to use for running the background tasks. If `None`, /// the tasks are kept in `local_spawns` instead and polled on the /// current thread when the [`Pool`] is polled for new events. @@ -138,7 +140,7 @@ struct PendingConnectionInfo { /// [`PeerId`] of the remote peer. peer_id: Option, /// Handler to handle connection once no longer pending but established. - handler: NodeHandlerWrapperBuilder, + handler: THandler, endpoint: PendingPoint, /// When dropped, notifies the task which then knows to terminate. abort_notifier: Option>, @@ -187,16 +189,12 @@ where connected: Connected, /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. - error: Option< - ConnectionError< - NodeHandlerWrapperError<::Error>, - >, - >, + error: Option::Error>>, /// A reference to the pool that used to manage the connection. pool: &'a mut Pool, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, - handler: NodeHandlerWrapper, + handler: THandler::Handler, }, /// An outbound connection attempt failed. @@ -206,7 +204,7 @@ where /// The error that occurred. error: PendingOutboundConnectionError, /// The handler that was supposed to handle the connection. - handler: NodeHandlerWrapperBuilder, + handler: THandler, /// The (expected) peer of the failed connection. peer: Option, }, @@ -222,7 +220,7 @@ where /// The error that occurred. error: PendingInboundConnectionError, /// The handler that was supposed to handle the connection. - handler: NodeHandlerWrapperBuilder, + handler: THandler, }, /// A node has produced an event. @@ -330,6 +328,7 @@ where next_connection_id: ConnectionId::new(0), task_command_buffer_size: config.task_command_buffer_size, dial_concurrency_factor: config.dial_concurrency_factor, + substream_upgrade_protocol_override: config.substream_upgrade_protocol_override, executor: config.executor, local_spawns: FuturesUnordered::new(), pending_connection_events_tx, @@ -481,10 +480,10 @@ where transport: TTrans, addresses: impl Iterator + Send + 'static, peer: Option, - handler: NodeHandlerWrapperBuilder, + handler: THandler, role_override: Endpoint, dial_concurrency_factor_override: Option, - ) -> Result)> + ) -> Result where TTrans: Clone + Send, TTrans::Dial: Send + 'static, @@ -538,9 +537,9 @@ where pub fn add_incoming( &mut self, future: TFut, - handler: NodeHandlerWrapperBuilder, + handler: THandler, info: IncomingInfo<'_>, - ) -> Result)> + ) -> Result where TFut: Future> + Send + 'static, { @@ -816,13 +815,11 @@ where }, ); - let connected = Connected { - peer_id: obtained_peer_id, - endpoint, - }; - - let connection = - super::Connection::new(muxer, handler.into_handler(&connected)); + let connection = super::Connection::new( + muxer, + handler.into_handler(&obtained_peer_id, &endpoint), + self.substream_upgrade_protocol_override, + ); self.spawn( task::new_for_established_connection( id, @@ -1226,6 +1223,9 @@ pub struct PoolConfig { /// Number of addresses concurrently dialed for a single outbound connection attempt. pub dial_concurrency_factor: NonZeroU8, + + /// The configured override for substream protocol upgrades, if any. + substream_upgrade_protocol_override: Option, } impl Default for PoolConfig { @@ -1236,6 +1236,7 @@ impl Default for PoolConfig { task_command_buffer_size: 7, // By default, addresses of a single connection attempt are dialed in sequence. dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"), + substream_upgrade_protocol_override: None, } } } @@ -1285,6 +1286,15 @@ impl PoolConfig { self.dial_concurrency_factor = factor; self } + + /// Configures an override for the substream upgrade protocol to use. + pub fn with_substream_upgrade_protocol_override( + mut self, + v: libp2p_core::upgrade::Version, + ) -> Self { + self.substream_upgrade_protocol_override = Some(v); + self + } } trait EntryExt<'a, K, V> { diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index 60bdd87b..7464b7a1 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -26,7 +26,6 @@ use crate::{ connection::{ self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, - protocols_handler::{NodeHandlerWrapper, NodeHandlerWrapperError}, transport::{Transport, TransportError}, Multiaddr, PeerId, ProtocolsHandler, }; @@ -93,8 +92,8 @@ pub enum EstablishedConnectionEvent { Closed { id: ConnectionId, peer_id: PeerId, - error: Option>>, - handler: NodeHandlerWrapper, + error: Option>, + handler: THandler, }, } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index d2e17999..462fabaf 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -93,7 +93,6 @@ use libp2p_core::{ upgrade::ProtocolName, Executor, Multiaddr, Negotiated, PeerId, Transport, }; -use protocols_handler::NodeHandlerWrapperError; use registry::{AddressIntoIter, Addresses}; use smallvec::SmallVec; use std::collections::HashSet; @@ -163,7 +162,7 @@ pub enum SwarmEvent { num_established: u32, /// Reason for the disconnection, if it was not a successful /// active close. - cause: Option>>, + cause: Option>, }, /// A new connection arrived on a listener and is in the process of protocol negotiation. /// @@ -294,9 +293,6 @@ where /// (or dropped if the peer disconnected) before the `behaviour` /// can be polled again. pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent)>, - - /// The configured override for substream protocol upgrades, if any. - substream_upgrade_protocol_override: Option, } impl Unpin for Swarm where TBehaviour: NetworkBehaviour {} @@ -506,10 +502,6 @@ where } }; - let handler = handler - .into_node_handler_builder() - .with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override); - match self.pool.add_outgoing( self.listeners.transport().clone(), addresses, @@ -521,8 +513,7 @@ where Ok(_connection_id) => Ok(()), Err((connection_limit, handler)) => { let error = DialError::ConnectionLimit(connection_limit); - self.behaviour - .inject_dial_failure(None, handler.into_protocols_handler(), &error); + self.behaviour.inject_dial_failure(None, handler, &error); return Err(error); } } @@ -675,13 +666,7 @@ where local_addr, send_back_addr, }) => { - let handler = this - .behaviour - .new_handler() - .into_node_handler_builder() - .with_substream_upgrade_protocol_override( - this.substream_upgrade_protocol_override, - ); + let handler = this.behaviour.new_handler(); match this.pool.add_incoming( upgrade, handler, @@ -700,7 +685,7 @@ where this.behaviour.inject_listen_failure( &local_addr, &send_back_addr, - handler.into_protocols_handler(), + handler, ); log::warn!("Incoming connection rejected: {:?}", connection_limit); } @@ -828,11 +813,7 @@ where }) => { let error = error.into(); - this.behaviour.inject_dial_failure( - peer, - handler.into_protocols_handler(), - &error, - ); + this.behaviour.inject_dial_failure(peer, handler, &error); if let Some(peer) = peer { log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); @@ -853,11 +834,8 @@ where handler, }) => { log::debug!("Incoming connection failed: {:?}", error); - this.behaviour.inject_listen_failure( - &local_addr, - &send_back_addr, - handler.into_protocols_handler(), - ); + this.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); return Poll::Ready(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, @@ -900,7 +878,7 @@ where &peer_id, &id, &endpoint, - handler.into_protocols_handler(), + handler, remaining_non_banned, ); } @@ -1242,7 +1220,6 @@ pub struct SwarmBuilder { behaviour: TBehaviour, pool_config: PoolConfig, connection_limits: ConnectionLimits, - substream_upgrade_protocol_override: Option, } impl SwarmBuilder @@ -1263,7 +1240,6 @@ where behaviour, pool_config: Default::default(), connection_limits: Default::default(), - substream_upgrade_protocol_override: None, } } @@ -1341,7 +1317,7 @@ where /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour` /// > are ignored. pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self { - self.substream_upgrade_protocol_override = Some(v); + self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v); self } @@ -1382,7 +1358,6 @@ where banned_peers: HashSet::new(), banned_peer_connections: HashSet::new(), pending_event: None, - substream_upgrade_protocol_override: self.substream_upgrade_protocol_override, } } } diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index d4ebec1d..5a9d71cb 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -42,7 +42,6 @@ pub mod either; mod map_in; mod map_out; pub mod multi; -mod node_handler; mod one_shot; mod select; @@ -55,10 +54,6 @@ use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration}; pub use dummy::DummyProtocolsHandler; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; -pub use node_handler::{ - NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError, - NodeHandlerWrapperEvent, NodeHandlerWrapperOutboundOpenInfo, -}; pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; @@ -222,17 +217,6 @@ pub trait ProtocolsHandler: Send + 'static { { ProtocolsHandlerSelect::new(self, other) } - - /// Creates a builder that allows creating a `NodeHandler` that handles this protocol - /// exclusively. - /// - /// > **Note**: This method should not be redefined in a custom `ProtocolsHandler`. - fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder - where - Self: Sized, - { - IntoProtocolsHandler::into_node_handler_builder(self) - } } /// Configuration of inbound or outbound substream protocol(s) @@ -493,15 +477,6 @@ pub trait IntoProtocolsHandler: Send + 'static { { IntoProtocolsHandlerSelect::new(self, other) } - - /// Creates a builder that will allow creating a `NodeHandler` that handles this protocol - /// exclusively. - fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder - where - Self: Sized, - { - NodeHandlerWrapperBuilder::new(self) - } } impl IntoProtocolsHandler for T