diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 75404d29..0be2c4ca 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -31,9 +31,12 @@ use libp2p_core::{ use std::{task::Context, task::Poll}; /// Custom event that can be received by the [`ProtocolsHandler`]. -type THandlerInEvent = +pub(crate) type THandlerInEvent = <::Handler as ProtocolsHandler>::InEvent; +pub(crate) type THandlerOutEvent = + <::Handler as ProtocolsHandler>::OutEvent; + /// A [`NetworkBehaviour`] defines the behaviour of the local node on the network. /// /// In contrast to [`Transport`](libp2p_core::Transport) which defines **how** to send bytes on the diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 5b7f09a8..0f78a21a 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -19,17 +19,19 @@ // DEALINGS IN THE SOFTWARE. mod error; -pub(crate) mod handler; mod listeners; mod substream; pub(crate) mod pool; +use crate::protocols_handler::{ + NodeHandlerWrapper, NodeHandlerWrapperError, NodeHandlerWrapperEvent, + NodeHandlerWrapperOutboundOpenInfo, ProtocolsHandler, +}; pub use error::{ ConnectionError, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }; -pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler}; pub use listeners::{ListenersEvent, ListenersStream}; pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; @@ -37,7 +39,7 @@ pub use substream::{Close, Substream, SubstreamEndpoint}; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::StreamMuxer; +use libp2p_core::muxing::StreamMuxerBox; use libp2p_core::PeerId; use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; use substream::{Muxing, SubstreamEvent}; @@ -54,28 +56,26 @@ pub struct Connected { /// Event generated by a [`Connection`]. #[derive(Debug, Clone)] pub enum Event { - /// Event generated by the [`ConnectionHandler`]. + /// Event generated by the [`NodeHandlerWrapper`]. Handler(T), /// Address of the remote has changed. AddressChange(Multiaddr), } -/// A multiplexed connection to a peer with an associated `ConnectionHandler`. -pub struct Connection +/// A multiplexed connection to a peer with an associated [`NodeHandlerWrapper`]. +pub struct Connection where - TMuxer: StreamMuxer, - THandler: ConnectionHandler>, + THandler: ProtocolsHandler, { /// Node that handles the muxing. - muxing: substream::Muxing, + muxing: substream::Muxing>, /// Handler that processes substreams. - handler: THandler, + handler: NodeHandlerWrapper, } -impl fmt::Debug for Connection +impl fmt::Debug for Connection where - TMuxer: StreamMuxer, - THandler: ConnectionHandler> + fmt::Debug, + THandler: ProtocolsHandler + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") @@ -85,21 +85,15 @@ where } } -impl Unpin for Connection -where - TMuxer: StreamMuxer, - THandler: ConnectionHandler>, -{ -} +impl Unpin for Connection where THandler: ProtocolsHandler {} -impl Connection +impl Connection where - TMuxer: StreamMuxer, - THandler: ConnectionHandler>, + THandler: ProtocolsHandler, { /// Builds a new `Connection` from the given substream multiplexer /// and connection handler. - pub fn new(muxer: TMuxer, handler: THandler) -> Self { + pub fn new(muxer: StreamMuxerBox, handler: NodeHandlerWrapper) -> Self { Connection { muxing: Muxing::new(muxer), handler, @@ -113,7 +107,7 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. - pub fn close(self) -> (THandler, Close) { + pub fn close(self) -> (NodeHandlerWrapper, Close) { (self.handler, self.muxing.close().0) } @@ -122,7 +116,12 @@ where pub fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll, ConnectionError>> { + ) -> Poll< + Result< + Event, + ConnectionError>, + >, + > { loop { let mut io_pending = false; @@ -154,10 +153,10 @@ where return Poll::Pending; // Nothing to do } } - Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => { + Poll::Ready(Ok(NodeHandlerWrapperEvent::OutboundSubstreamRequest(user_data))) => { self.muxing.open_substream(user_data); } - Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => { + Poll::Ready(Ok(NodeHandlerWrapperEvent::Custom(event))) => { return Poll::Ready(Ok(Event::Handler(event))); } Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))), diff --git a/swarm/src/connection/handler.rs b/swarm/src/connection/handler.rs deleted file mode 100644 index 4802b391..00000000 --- a/swarm/src/connection/handler.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use super::{Connected, SubstreamEndpoint}; -use crate::Multiaddr; -use std::{fmt::Debug, task::Context, task::Poll}; - -/// The interface of a connection handler. -/// -/// Each handler is responsible for a single connection. -pub trait ConnectionHandler { - /// The inbound type of events used to notify the handler through the `Network`. - /// - /// See also [`EstablishedConnection::notify_handler`](super::EstablishedConnection::notify_handler) - /// and [`ConnectionHandler::inject_event`]. - type InEvent: Debug + Send + 'static; - /// The outbound type of events that the handler emits to the `Network` - /// through [`ConnectionHandler::poll`]. - /// - /// See also [`PoolEvent::ConnectionEvent`](crate::connection::pool::PoolEvent::ConnectionEvent). - type OutEvent: Debug + Send + 'static; - /// The type of errors that the handler can produce when polled by the `Network`. - type Error: Debug + Send + 'static; - /// The type of the substream containing the data. - type Substream; - /// Information about a substream. Can be sent to the handler through a `SubstreamEndpoint`, - /// and will be passed back in `inject_substream` or `inject_outbound_closed`. - type OutboundOpenInfo; - - /// Sends a new substream to the handler. - /// - /// The handler is responsible for upgrading the substream to whatever protocol it wants. - /// - /// # Panic - /// - /// Implementations are allowed to panic in the case of dialing if the `user_data` in - /// `endpoint` doesn't correspond to what was returned earlier when polling, or is used - /// multiple times. - fn inject_substream( - &mut self, - substream: Self::Substream, - endpoint: SubstreamEndpoint, - ); - - /// Notifies the handler of an event. - fn inject_event(&mut self, event: Self::InEvent); - - /// Notifies the handler of a change in the address of the remote. - fn inject_address_change(&mut self, new_address: &Multiaddr); - - /// Polls the handler for events. - /// - /// Returning an error will close the connection to the remote. - fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; -} - -/// Prototype for a `ConnectionHandler`. -pub trait IntoConnectionHandler { - /// The node handler. - type Handler: ConnectionHandler; - - /// Builds the node handler. - /// - /// The implementation is given a `Connected` value that holds information about - /// the newly established connection for which a handler should be created. - fn into_handler(self, connected: &Connected) -> Self::Handler; -} - -impl IntoConnectionHandler for T -where - T: ConnectionHandler, -{ - type Handler = Self; - - fn into_handler(self, _: &Connected) -> Self { - self - } -} - -pub(crate) type THandlerInEvent = - <::Handler as ConnectionHandler>::InEvent; -pub(crate) type THandlerOutEvent = - <::Handler as ConnectionHandler>::OutEvent; -pub(crate) type THandlerError = - <::Handler as ConnectionHandler>::Error; - -/// Event produced by a handler. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ConnectionHandlerEvent { - /// Require a new outbound substream to be opened with the remote. - OutboundSubstreamRequest(TOutboundOpenInfo), - - /// Other event. - Custom(TCustom), -} diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 139f4f8c..d9903f61 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -20,14 +20,14 @@ // DEALINGS IN THE SOFTWARE. use crate::{ + behaviour::{THandlerInEvent, THandlerOutEvent}, connection::{ - handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, - Connected, ConnectionError, ConnectionHandler, ConnectionLimit, IncomingInfo, - IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError, - PendingOutboundConnectionError, Substream, + Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, + PendingInboundConnectionError, PendingOutboundConnectionError, }, + protocols_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError}, transport::{Transport, TransportError}, - ConnectedPoint, Executor, Multiaddr, PeerId, + ConnectedPoint, Executor, IntoProtocolsHandler, Multiaddr, PeerId, ProtocolsHandler, }; use concurrent_dial::ConcurrentDial; use fnv::FnvHashMap; @@ -39,7 +39,7 @@ use futures::{ stream::FuturesUnordered, }; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; -use libp2p_core::muxing::StreamMuxer; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -55,7 +55,7 @@ mod concurrent_dial; mod task; /// A connection `Pool` manages a set of connections for each peer. -pub struct Pool +pub struct Pool where TTrans: Transport, { @@ -67,7 +67,10 @@ where /// The managed connections of each peer that are currently considered established. established: FnvHashMap< PeerId, - FnvHashMap>>, + FnvHashMap< + ConnectionId, + EstablishedConnectionInfo<::InEvent>, + >, >, /// The pending connections that are currently being negotiated. @@ -100,10 +103,12 @@ where /// Sender distributed to established tasks for reporting events back /// to the pool. - established_connection_events_tx: mpsc::Sender>, + established_connection_events_tx: + mpsc::Sender>, /// Receiver for events reported from established tasks. - established_connection_events_rx: mpsc::Receiver>, + established_connection_events_rx: + mpsc::Receiver>, } #[derive(Debug)] @@ -133,13 +138,13 @@ struct PendingConnectionInfo { /// [`PeerId`] of the remote peer. peer_id: Option, /// Handler to handle connection once no longer pending but established. - handler: THandler, + handler: NodeHandlerWrapperBuilder, endpoint: PendingPoint, /// When dropped, notifies the task which then knows to terminate. abort_notifier: Option>, } -impl fmt::Debug for Pool { +impl fmt::Debug for Pool { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("Pool") .field("counters", &self.counters) @@ -148,13 +153,13 @@ impl fmt::Debug for Pool +pub enum PoolEvent<'a, THandler: IntoProtocolsHandler, TTrans> where TTrans: Transport, { /// A new connection has been established. ConnectionEstablished { - connection: EstablishedConnection<'a, THandlerInEvent>, + connection: EstablishedConnection<'a, ::InEvent>, /// List of other connections to the same peer. /// /// Note: Does not include the connection reported through this event. @@ -182,12 +187,16 @@ where connected: Connected, /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. - error: Option>>, + error: Option< + ConnectionError< + NodeHandlerWrapperError<::Error>, + >, + >, /// A reference to the pool that used to manage the connection. pool: &'a mut Pool, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, - handler: THandler::Handler, + handler: NodeHandlerWrapper, }, /// An outbound connection attempt failed. @@ -197,7 +206,7 @@ where /// The error that occurred. error: PendingOutboundConnectionError, /// The handler that was supposed to handle the connection. - handler: THandler, + handler: NodeHandlerWrapperBuilder, /// The (expected) peer of the failed connection. peer: Option, }, @@ -213,7 +222,7 @@ where /// The error that occurred. error: PendingInboundConnectionError, /// The handler that was supposed to handle the connection. - handler: THandler, + handler: NodeHandlerWrapperBuilder, }, /// A node has produced an event. @@ -235,7 +244,7 @@ where }, } -impl<'a, THandler: IntoConnectionHandler, TTrans> fmt::Debug for PoolEvent<'a, THandler, TTrans> +impl<'a, THandler: IntoProtocolsHandler, TTrans> fmt::Debug for PoolEvent<'a, THandler, TTrans> where TTrans: Transport, TTrans::Error: fmt::Debug, @@ -304,7 +313,7 @@ where impl Pool where - THandler: IntoConnectionHandler, + THandler: IntoProtocolsHandler, TTrans: Transport, { /// Creates a new empty `Pool`. @@ -457,7 +466,7 @@ where impl Pool where - THandler: IntoConnectionHandler, + THandler: IntoProtocolsHandler, TTrans: Transport + 'static, TTrans::Output: Send + 'static, TTrans::Error: Send + 'static, @@ -472,10 +481,10 @@ where transport: TTrans, addresses: impl Iterator + Send + 'static, peer: Option, - handler: THandler, + handler: NodeHandlerWrapperBuilder, role_override: Endpoint, dial_concurrency_factor_override: Option, - ) -> Result + ) -> Result)> where TTrans: Clone + Send, TTrans::Dial: Send + 'static, @@ -529,9 +538,9 @@ where pub fn add_incoming( &mut self, future: TFut, - handler: THandler, + handler: NodeHandlerWrapperBuilder, info: IncomingInfo<'_>, - ) -> Result + ) -> Result)> where TFut: Future> + Send + 'static, { @@ -571,18 +580,12 @@ where /// /// > **Note**: We use a regular `poll` method instead of implementing `Stream`, /// > because we want the `Pool` to stay borrowed if necessary. - pub fn poll<'a, TMuxer>( - &'a mut self, - cx: &mut Context<'_>, - ) -> Poll> + pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> where - TTrans: Transport, - TMuxer: StreamMuxer + Send + Sync + 'static, - TMuxer::Error: std::fmt::Debug, - TMuxer::OutboundSubstream: Send, - THandler: IntoConnectionHandler + 'static, - THandler::Handler: ConnectionHandler> + Send, - ::OutboundOpenInfo: Send, + TTrans: Transport, + THandler: IntoProtocolsHandler + 'static, + THandler::Handler: ProtocolsHandler + Send, + ::OutboundOpenInfo: Send, { // Poll for events of established connections. // @@ -896,17 +899,17 @@ where } /// A connection in a [`Pool`]. -pub enum PoolConnection<'a, THandler: IntoConnectionHandler> { +pub enum PoolConnection<'a, THandler: IntoProtocolsHandler> { Pending(PendingConnection<'a, THandler>), Established(EstablishedConnection<'a, THandlerInEvent>), } /// A pending connection in a pool. -pub struct PendingConnection<'a, THandler: IntoConnectionHandler> { +pub struct PendingConnection<'a, THandler: IntoProtocolsHandler> { entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo>, } -impl PendingConnection<'_, THandler> { +impl PendingConnection<'_, THandler> { /// Aborts the connection attempt, closing the connection. pub fn abort(mut self) { if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { @@ -1259,7 +1262,7 @@ impl PoolConfig { /// delivery to the connection handler. /// /// When the buffer for a particular connection is full, `notify_handler` will no - /// longer be able to deliver events to the associated `ConnectionHandler`, + /// longer be able to deliver events to the associated [`Connection`](super::Connection), /// thus exerting back-pressure on the connection and peer API. pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self { self.task_command_buffer_size = n.get() - 1; diff --git a/swarm/src/connection/pool/task.rs b/swarm/src/connection/pool/task.rs index d3ea7bee..60bdd87b 100644 --- a/swarm/src/connection/pool/task.rs +++ b/swarm/src/connection/pool/task.rs @@ -24,13 +24,11 @@ use super::concurrent_dial::ConcurrentDial; use crate::{ connection::{ - self, - handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, - ConnectionError, ConnectionHandler, IntoConnectionHandler, PendingInboundConnectionError, - PendingOutboundConnectionError, Substream, + self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, + protocols_handler::{NodeHandlerWrapper, NodeHandlerWrapperError}, transport::{Transport, TransportError}, - Multiaddr, PeerId, + Multiaddr, PeerId, ProtocolsHandler, }; use futures::{ channel::{mpsc, oneshot}, @@ -38,7 +36,6 @@ use futures::{ SinkExt, StreamExt, }; use libp2p_core::connection::ConnectionId; -use libp2p_core::muxing::StreamMuxer; use std::pin::Pin; use void::Void; @@ -76,7 +73,7 @@ where } #[derive(Debug)] -pub enum EstablishedConnectionEvent { +pub enum EstablishedConnectionEvent { /// A node we are connected to has changed its address. AddressChange { id: ConnectionId, @@ -87,7 +84,7 @@ pub enum EstablishedConnectionEvent { Notify { id: ConnectionId, peer_id: PeerId, - event: THandlerOutEvent, + event: THandler::OutEvent, }, /// A connection closed, possibly due to an error. /// @@ -96,8 +93,8 @@ pub enum EstablishedConnectionEvent { Closed { id: ConnectionId, peer_id: PeerId, - error: Option>>, - handler: THandler::Handler, + error: Option>>, + handler: NodeHandlerWrapper, }, } @@ -180,16 +177,14 @@ pub async fn new_for_pending_incoming_connection( } } -pub async fn new_for_established_connection( +pub async fn new_for_established_connection( connection_id: ConnectionId, peer_id: PeerId, - mut connection: crate::connection::Connection, - mut command_receiver: mpsc::Receiver>>, + mut connection: crate::connection::Connection, + mut command_receiver: mpsc::Receiver>, mut events: mpsc::Sender>, ) where - TMuxer: StreamMuxer, - THandler: IntoConnectionHandler, - THandler::Handler: ConnectionHandler>, + THandler: ProtocolsHandler, { loop { match futures::future::select( diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 206bf7a1..faf142fa 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -79,8 +79,8 @@ use crate::connection::IncomingInfo; use crate::connection::{pool::PoolEvent, ListenersEvent, ListenersStream}; use connection::pool::{ConnectionCounters, ConnectionLimits, Pool, PoolConfig}; use connection::{ - ConnectionError, ConnectionHandler, ConnectionLimit, EstablishedConnection, - IntoConnectionHandler, PendingOutboundConnectionError, Substream, + ConnectionError, ConnectionLimit, EstablishedConnection, PendingOutboundConnectionError, + Substream, }; use dial_opts::{DialOpts, PeerCondition}; use either::Either; @@ -95,7 +95,7 @@ use libp2p_core::{ upgrade::ProtocolName, Executor, Multiaddr, Negotiated, PeerId, Transport, }; -use protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; +use protocols_handler::NodeHandlerWrapperError; use registry::{AddressIntoIter, Addresses}; use smallvec::SmallVec; use std::collections::HashSet; @@ -264,10 +264,7 @@ where listeners: ListenersStream>, /// The nodes currently active. - pool: Pool< - NodeHandlerWrapperBuilder>, - transport::Boxed<(PeerId, StreamMuxerBox)>, - >, + pool: Pool, transport::Boxed<(PeerId, StreamMuxerBox)>>, /// The local peer ID. local_peer_id: PeerId, @@ -1147,8 +1144,8 @@ where TTrans: Transport, TTrans::Error: Send + 'static, TBehaviour: NetworkBehaviour, - THandler: IntoConnectionHandler, - THandler::Handler: ConnectionHandler< + THandler: IntoProtocolsHandler, + THandler::Handler: ProtocolsHandler< InEvent = THandlerInEvent, OutEvent = THandlerOutEvent, >, diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 9d6a2fae..d4ebec1d 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -55,7 +55,10 @@ use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration}; pub use dummy::DummyProtocolsHandler; pub use map_in::MapInEvent; pub use map_out::MapOutEvent; -pub use node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; +pub use node_handler::{ + NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError, + NodeHandlerWrapperEvent, NodeHandlerWrapperOutboundOpenInfo, +}; pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 112ba62b..22efbff6 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -18,10 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::connection::{ - Connected, ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, Substream, - SubstreamEndpoint, -}; +use crate::connection::{Connected, Substream, SubstreamEndpoint}; use crate::protocols_handler::{ IntoProtocolsHandler, KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, @@ -72,15 +69,12 @@ where } } -impl IntoConnectionHandler - for NodeHandlerWrapperBuilder +impl NodeHandlerWrapperBuilder where TIntoProtoHandler: IntoProtocolsHandler, TProtoHandler: ProtocolsHandler, { - type Handler = NodeHandlerWrapper; - - fn into_handler(self, connected: &Connected) -> Self::Handler { + pub fn into_handler(self, connected: &Connected) -> NodeHandlerWrapper { NodeHandlerWrapper { handler: self .handler @@ -95,8 +89,12 @@ where } } -// A `ConnectionHandler` for an underlying `ProtocolsHandler`. -/// Wraps around an implementation of `ProtocolsHandler`, and implements `NodeHandler`. +/// A wrapper for an underlying [`ProtocolsHandler`]. +/// +/// It extends [`ProtocolsHandler`] with: +/// - Enforced substream upgrade timeouts +/// - Driving substream upgrades +/// - Handling connection timeout // TODO: add a caching system for protocols that are supported or not pub struct NodeHandlerWrapper where @@ -135,6 +133,21 @@ where substream_upgrade_protocol_override: Option, } +impl std::fmt::Debug for NodeHandlerWrapper { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NodeHandlerWrapper") + .field("negotiating_in", &self.negotiating_in) + .field("negotiating_out", &self.negotiating_out) + .field("unique_dial_upgrade_id", &self.unique_dial_upgrade_id) + .field("shutdown", &self.shutdown) + .field( + "substream_upgrade_protocol_override", + &self.substream_upgrade_protocol_override, + ) + .finish() + } +} + impl NodeHandlerWrapper { pub(crate) fn into_protocols_handler(self) -> TProtoHandler { self.handler @@ -199,6 +212,7 @@ where /// A planned shutdown is always postponed for as long as there are ingoing /// or outgoing substreams being negotiated, i.e. it is a graceful, "idle" /// shutdown. +#[derive(Debug)] enum Shutdown { /// No shutdown is planned. None, @@ -249,22 +263,22 @@ where } } -impl ConnectionHandler for NodeHandlerWrapper +pub type NodeHandlerWrapperOutboundOpenInfo = ( + u64, + ::OutboundOpenInfo, + Duration, +); + +impl NodeHandlerWrapper where TProtoHandler: ProtocolsHandler, { - type InEvent = TProtoHandler::InEvent; - type OutEvent = TProtoHandler::OutEvent; - type Error = NodeHandlerWrapperError; - type Substream = Substream; - // The first element of the tuple is the unique upgrade identifier - // (see `unique_dial_upgrade_id`). - type OutboundOpenInfo = (u64, TProtoHandler::OutboundOpenInfo, Duration); - - fn inject_substream( + pub fn inject_substream( &mut self, - substream: Self::Substream, - endpoint: SubstreamEndpoint, + substream: Substream, + // The first element of the tuple is the unique upgrade identifier + // (see `unique_dial_upgrade_id`). + endpoint: SubstreamEndpoint>, ) { match endpoint { SubstreamEndpoint::Listener => { @@ -315,19 +329,26 @@ where } } - fn inject_event(&mut self, event: Self::InEvent) { + pub fn inject_event(&mut self, event: TProtoHandler::InEvent) { self.handler.inject_event(event); } - fn inject_address_change(&mut self, new_address: &Multiaddr) { + pub fn inject_address_change(&mut self, new_address: &Multiaddr) { self.handler.inject_address_change(new_address); } - fn poll( + pub fn poll( &mut self, cx: &mut Context<'_>, - ) -> Poll, Self::Error>> - { + ) -> Poll< + Result< + NodeHandlerWrapperEvent< + NodeHandlerWrapperOutboundOpenInfo, + TProtoHandler::OutEvent, + >, + NodeHandlerWrapperError, + >, + > { while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { match res { Ok(upgrade) => self @@ -372,7 +393,7 @@ where match poll_result { Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { - return Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))); + return Poll::Ready(Ok(NodeHandlerWrapperEvent::Custom(event))); } Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { let id = self.unique_dial_upgrade_id; @@ -380,7 +401,7 @@ where self.unique_dial_upgrade_id += 1; let (upgrade, info) = protocol.into_upgrade(); self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); - return Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(( + return Poll::Ready(Ok(NodeHandlerWrapperEvent::OutboundSubstreamRequest(( id, info, timeout, )))); } @@ -408,3 +429,13 @@ where Poll::Pending } } + +/// Event produced by a [`NodeHandlerWrapper`]. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum NodeHandlerWrapperEvent { + /// Require a new outbound substream to be opened with the remote. + OutboundSubstreamRequest(TOutboundOpenInfo), + + /// Other event. + Custom(TCustom), +}