swarm/src: Remove ConnectionHandler (#2519)

The `ConnectionHandler` trait is not exposed to users. The only
implementor of `ConnectionHandler` is `NodeHandlerWrapper`. Thus
`ConnectionHandler` is a superfluous abstraction. This commit removes
`ConnectionHandler`.

Next to this large change, this commit removes the `Tmuxer` trait
parameter. `Swarm` enforces dynamic dispatching via `StreamMuxerBox`
anyways, thus the trait parameter is useless.

As a follow up to this commit one could rename `ProtocolsHandler` to
`ConnectionHandler` and `NodeHandlerWrapper` to
`ConnectionHandlerWrapper` or just `Wrapper`.
This commit is contained in:
Max Inden
2022-02-16 13:15:52 +01:00
committed by GitHub
parent df4905d798
commit 8ffa84e786
8 changed files with 155 additions and 239 deletions

View File

@ -31,9 +31,12 @@ use libp2p_core::{
use std::{task::Context, task::Poll}; use std::{task::Context, task::Poll};
/// Custom event that can be received by the [`ProtocolsHandler`]. /// Custom event that can be received by the [`ProtocolsHandler`].
type THandlerInEvent<THandler> = pub(crate) type THandlerInEvent<THandler> =
<<THandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent; <<THandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent;
pub(crate) type THandlerOutEvent<THandler> =
<<THandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent;
/// A [`NetworkBehaviour`] defines the behaviour of the local node on the network. /// A [`NetworkBehaviour`] defines the behaviour of the local node on the network.
/// ///
/// In contrast to [`Transport`](libp2p_core::Transport) which defines **how** to send bytes on the /// In contrast to [`Transport`](libp2p_core::Transport) which defines **how** to send bytes on the

View File

@ -19,17 +19,19 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
mod error; mod error;
pub(crate) mod handler;
mod listeners; mod listeners;
mod substream; mod substream;
pub(crate) mod pool; pub(crate) mod pool;
use crate::protocols_handler::{
NodeHandlerWrapper, NodeHandlerWrapperError, NodeHandlerWrapperEvent,
NodeHandlerWrapperOutboundOpenInfo, ProtocolsHandler,
};
pub use error::{ pub use error::{
ConnectionError, PendingConnectionError, PendingInboundConnectionError, ConnectionError, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError, PendingOutboundConnectionError,
}; };
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenersEvent, ListenersStream}; pub use listeners::{ListenersEvent, ListenersStream};
pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{ConnectionCounters, ConnectionLimits};
pub use pool::{EstablishedConnection, PendingConnection}; pub use pool::{EstablishedConnection, PendingConnection};
@ -37,7 +39,7 @@ pub use substream::{Close, Substream, SubstreamEndpoint};
use libp2p_core::connection::ConnectedPoint; use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Multiaddr; use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::StreamMuxer; use libp2p_core::muxing::StreamMuxerBox;
use libp2p_core::PeerId; use libp2p_core::PeerId;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use substream::{Muxing, SubstreamEvent}; use substream::{Muxing, SubstreamEvent};
@ -54,28 +56,26 @@ pub struct Connected {
/// Event generated by a [`Connection`]. /// Event generated by a [`Connection`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Event<T> { pub enum Event<T> {
/// Event generated by the [`ConnectionHandler`]. /// Event generated by the [`NodeHandlerWrapper`].
Handler(T), Handler(T),
/// Address of the remote has changed. /// Address of the remote has changed.
AddressChange(Multiaddr), AddressChange(Multiaddr),
} }
/// A multiplexed connection to a peer with an associated `ConnectionHandler`. /// A multiplexed connection to a peer with an associated [`NodeHandlerWrapper`].
pub struct Connection<TMuxer, THandler> pub struct Connection<THandler>
where where
TMuxer: StreamMuxer, THandler: ProtocolsHandler,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{ {
/// Node that handles the muxing. /// Node that handles the muxing.
muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>, muxing: substream::Muxing<StreamMuxerBox, NodeHandlerWrapperOutboundOpenInfo<THandler>>,
/// Handler that processes substreams. /// Handler that processes substreams.
handler: THandler, handler: NodeHandlerWrapper<THandler>,
} }
impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler> impl<THandler> fmt::Debug for Connection<THandler>
where where
TMuxer: StreamMuxer, THandler: ProtocolsHandler + fmt::Debug,
THandler: ConnectionHandler<Substream = Substream<TMuxer>> + fmt::Debug,
{ {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection") f.debug_struct("Connection")
@ -85,21 +85,15 @@ where
} }
} }
impl<TMuxer, THandler> Unpin for Connection<TMuxer, THandler> impl<THandler> Unpin for Connection<THandler> where THandler: ProtocolsHandler {}
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
}
impl<TMuxer, THandler> Connection<TMuxer, THandler> impl<THandler> Connection<THandler>
where where
TMuxer: StreamMuxer, THandler: ProtocolsHandler,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{ {
/// Builds a new `Connection` from the given substream multiplexer /// Builds a new `Connection` from the given substream multiplexer
/// and connection handler. /// and connection handler.
pub fn new(muxer: TMuxer, handler: THandler) -> Self { pub fn new(muxer: StreamMuxerBox, handler: NodeHandlerWrapper<THandler>) -> Self {
Connection { Connection {
muxing: Muxing::new(muxer), muxing: Muxing::new(muxer),
handler, handler,
@ -113,7 +107,7 @@ where
/// Begins an orderly shutdown of the connection, returning the connection /// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete. /// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (THandler, Close<TMuxer>) { pub fn close(self) -> (NodeHandlerWrapper<THandler>, Close<StreamMuxerBox>) {
(self.handler, self.muxing.close().0) (self.handler, self.muxing.close().0)
} }
@ -122,7 +116,12 @@ where
pub fn poll( pub fn poll(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> { ) -> Poll<
Result<
Event<THandler::OutEvent>,
ConnectionError<NodeHandlerWrapperError<THandler::Error>>,
>,
> {
loop { loop {
let mut io_pending = false; let mut io_pending = false;
@ -154,10 +153,10 @@ where
return Poll::Pending; // Nothing to do return Poll::Pending; // Nothing to do
} }
} }
Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => { Poll::Ready(Ok(NodeHandlerWrapperEvent::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data); self.muxing.open_substream(user_data);
} }
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => { Poll::Ready(Ok(NodeHandlerWrapperEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event))); return Poll::Ready(Ok(Event::Handler(event)));
} }
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))), Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),

View File

@ -1,115 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use super::{Connected, SubstreamEndpoint};
use crate::Multiaddr;
use std::{fmt::Debug, task::Context, task::Poll};
/// The interface of a connection handler.
///
/// Each handler is responsible for a single connection.
pub trait ConnectionHandler {
/// The inbound type of events used to notify the handler through the `Network`.
///
/// See also [`EstablishedConnection::notify_handler`](super::EstablishedConnection::notify_handler)
/// and [`ConnectionHandler::inject_event`].
type InEvent: Debug + Send + 'static;
/// The outbound type of events that the handler emits to the `Network`
/// through [`ConnectionHandler::poll`].
///
/// See also [`PoolEvent::ConnectionEvent`](crate::connection::pool::PoolEvent::ConnectionEvent).
type OutEvent: Debug + Send + 'static;
/// The type of errors that the handler can produce when polled by the `Network`.
type Error: Debug + Send + 'static;
/// The type of the substream containing the data.
type Substream;
/// Information about a substream. Can be sent to the handler through a `SubstreamEndpoint`,
/// and will be passed back in `inject_substream` or `inject_outbound_closed`.
type OutboundOpenInfo;
/// Sends a new substream to the handler.
///
/// The handler is responsible for upgrading the substream to whatever protocol it wants.
///
/// # Panic
///
/// Implementations are allowed to panic in the case of dialing if the `user_data` in
/// `endpoint` doesn't correspond to what was returned earlier when polling, or is used
/// multiple times.
fn inject_substream(
&mut self,
substream: Self::Substream,
endpoint: SubstreamEndpoint<Self::OutboundOpenInfo>,
);
/// Notifies the handler of an event.
fn inject_event(&mut self, event: Self::InEvent);
/// Notifies the handler of a change in the address of the remote.
fn inject_address_change(&mut self, new_address: &Multiaddr);
/// Polls the handler for events.
///
/// Returning an error will close the connection to the remote.
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>;
}
/// Prototype for a `ConnectionHandler`.
pub trait IntoConnectionHandler {
/// The node handler.
type Handler: ConnectionHandler;
/// Builds the node handler.
///
/// The implementation is given a `Connected` value that holds information about
/// the newly established connection for which a handler should be created.
fn into_handler(self, connected: &Connected) -> Self::Handler;
}
impl<T> IntoConnectionHandler for T
where
T: ConnectionHandler,
{
type Handler = Self;
fn into_handler(self, _: &Connected) -> Self {
self
}
}
pub(crate) type THandlerInEvent<THandler> =
<<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;
pub(crate) type THandlerOutEvent<THandler> =
<<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent;
pub(crate) type THandlerError<THandler> =
<<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::Error;
/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote.
OutboundSubstreamRequest(TOutboundOpenInfo),
/// Other event.
Custom(TCustom),
}

View File

@ -20,14 +20,14 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{ use crate::{
behaviour::{THandlerInEvent, THandlerOutEvent},
connection::{ connection::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError,
Connected, ConnectionError, ConnectionHandler, ConnectionLimit, IncomingInfo, PendingInboundConnectionError, PendingOutboundConnectionError,
IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError, Substream,
}, },
protocols_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError},
transport::{Transport, TransportError}, transport::{Transport, TransportError},
ConnectedPoint, Executor, Multiaddr, PeerId, ConnectedPoint, Executor, IntoProtocolsHandler, Multiaddr, PeerId, ProtocolsHandler,
}; };
use concurrent_dial::ConcurrentDial; use concurrent_dial::ConcurrentDial;
use fnv::FnvHashMap; use fnv::FnvHashMap;
@ -39,7 +39,7 @@ use futures::{
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint};
use libp2p_core::muxing::StreamMuxer; use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox};
use std::{ use std::{
collections::{hash_map, HashMap}, collections::{hash_map, HashMap},
convert::TryFrom as _, convert::TryFrom as _,
@ -55,7 +55,7 @@ mod concurrent_dial;
mod task; mod task;
/// A connection `Pool` manages a set of connections for each peer. /// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<THandler: IntoConnectionHandler, TTrans> pub struct Pool<THandler: IntoProtocolsHandler, TTrans>
where where
TTrans: Transport, TTrans: Transport,
{ {
@ -67,7 +67,10 @@ where
/// The managed connections of each peer that are currently considered established. /// The managed connections of each peer that are currently considered established.
established: FnvHashMap< established: FnvHashMap<
PeerId, PeerId,
FnvHashMap<ConnectionId, EstablishedConnectionInfo<THandlerInEvent<THandler>>>, FnvHashMap<
ConnectionId,
EstablishedConnectionInfo<<THandler::Handler as ProtocolsHandler>::InEvent>,
>,
>, >,
/// The pending connections that are currently being negotiated. /// The pending connections that are currently being negotiated.
@ -100,10 +103,12 @@ where
/// Sender distributed to established tasks for reporting events back /// Sender distributed to established tasks for reporting events back
/// to the pool. /// to the pool.
established_connection_events_tx: mpsc::Sender<task::EstablishedConnectionEvent<THandler>>, established_connection_events_tx:
mpsc::Sender<task::EstablishedConnectionEvent<THandler::Handler>>,
/// Receiver for events reported from established tasks. /// Receiver for events reported from established tasks.
established_connection_events_rx: mpsc::Receiver<task::EstablishedConnectionEvent<THandler>>, established_connection_events_rx:
mpsc::Receiver<task::EstablishedConnectionEvent<THandler::Handler>>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -133,13 +138,13 @@ struct PendingConnectionInfo<THandler> {
/// [`PeerId`] of the remote peer. /// [`PeerId`] of the remote peer.
peer_id: Option<PeerId>, peer_id: Option<PeerId>,
/// Handler to handle connection once no longer pending but established. /// Handler to handle connection once no longer pending but established.
handler: THandler, handler: NodeHandlerWrapperBuilder<THandler>,
endpoint: PendingPoint, endpoint: PendingPoint,
/// When dropped, notifies the task which then knows to terminate. /// When dropped, notifies the task which then knows to terminate.
abort_notifier: Option<oneshot::Sender<Void>>, abort_notifier: Option<oneshot::Sender<Void>>,
} }
impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> { impl<THandler: IntoProtocolsHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Pool") f.debug_struct("Pool")
.field("counters", &self.counters) .field("counters", &self.counters)
@ -148,13 +153,13 @@ impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THa
} }
/// Event that can happen on the `Pool`. /// Event that can happen on the `Pool`.
pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTrans> pub enum PoolEvent<'a, THandler: IntoProtocolsHandler, TTrans>
where where
TTrans: Transport, TTrans: Transport,
{ {
/// A new connection has been established. /// A new connection has been established.
ConnectionEstablished { ConnectionEstablished {
connection: EstablishedConnection<'a, THandlerInEvent<THandler>>, connection: EstablishedConnection<'a, <THandler::Handler as ProtocolsHandler>::InEvent>,
/// List of other connections to the same peer. /// List of other connections to the same peer.
/// ///
/// Note: Does not include the connection reported through this event. /// Note: Does not include the connection reported through this event.
@ -182,12 +187,16 @@ where
connected: Connected, connected: Connected,
/// The error that occurred, if any. If `None`, the connection /// The error that occurred, if any. If `None`, the connection
/// was closed by the local peer. /// was closed by the local peer.
error: Option<ConnectionError<THandlerError<THandler>>>, error: Option<
ConnectionError<
NodeHandlerWrapperError<<THandler::Handler as ProtocolsHandler>::Error>,
>,
>,
/// A reference to the pool that used to manage the connection. /// A reference to the pool that used to manage the connection.
pool: &'a mut Pool<THandler, TTrans>, pool: &'a mut Pool<THandler, TTrans>,
/// The remaining established connections to the same peer. /// The remaining established connections to the same peer.
remaining_established_connection_ids: Vec<ConnectionId>, remaining_established_connection_ids: Vec<ConnectionId>,
handler: THandler::Handler, handler: NodeHandlerWrapper<THandler::Handler>,
}, },
/// An outbound connection attempt failed. /// An outbound connection attempt failed.
@ -197,7 +206,7 @@ where
/// The error that occurred. /// The error that occurred.
error: PendingOutboundConnectionError<TTrans::Error>, error: PendingOutboundConnectionError<TTrans::Error>,
/// The handler that was supposed to handle the connection. /// The handler that was supposed to handle the connection.
handler: THandler, handler: NodeHandlerWrapperBuilder<THandler>,
/// The (expected) peer of the failed connection. /// The (expected) peer of the failed connection.
peer: Option<PeerId>, peer: Option<PeerId>,
}, },
@ -213,7 +222,7 @@ where
/// The error that occurred. /// The error that occurred.
error: PendingInboundConnectionError<TTrans::Error>, error: PendingInboundConnectionError<TTrans::Error>,
/// The handler that was supposed to handle the connection. /// The handler that was supposed to handle the connection.
handler: THandler, handler: NodeHandlerWrapperBuilder<THandler>,
}, },
/// A node has produced an event. /// A node has produced an event.
@ -235,7 +244,7 @@ where
}, },
} }
impl<'a, THandler: IntoConnectionHandler, TTrans> fmt::Debug for PoolEvent<'a, THandler, TTrans> impl<'a, THandler: IntoProtocolsHandler, TTrans> fmt::Debug for PoolEvent<'a, THandler, TTrans>
where where
TTrans: Transport, TTrans: Transport,
TTrans::Error: fmt::Debug, TTrans::Error: fmt::Debug,
@ -304,7 +313,7 @@ where
impl<THandler, TTrans> Pool<THandler, TTrans> impl<THandler, TTrans> Pool<THandler, TTrans>
where where
THandler: IntoConnectionHandler, THandler: IntoProtocolsHandler,
TTrans: Transport, TTrans: Transport,
{ {
/// Creates a new empty `Pool`. /// Creates a new empty `Pool`.
@ -457,7 +466,7 @@ where
impl<THandler, TTrans> Pool<THandler, TTrans> impl<THandler, TTrans> Pool<THandler, TTrans>
where where
THandler: IntoConnectionHandler, THandler: IntoProtocolsHandler,
TTrans: Transport + 'static, TTrans: Transport + 'static,
TTrans::Output: Send + 'static, TTrans::Output: Send + 'static,
TTrans::Error: Send + 'static, TTrans::Error: Send + 'static,
@ -472,10 +481,10 @@ where
transport: TTrans, transport: TTrans,
addresses: impl Iterator<Item = Multiaddr> + Send + 'static, addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
peer: Option<PeerId>, peer: Option<PeerId>,
handler: THandler, handler: NodeHandlerWrapperBuilder<THandler>,
role_override: Endpoint, role_override: Endpoint,
dial_concurrency_factor_override: Option<NonZeroU8>, dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, (ConnectionLimit, THandler)> ) -> Result<ConnectionId, (ConnectionLimit, NodeHandlerWrapperBuilder<THandler>)>
where where
TTrans: Clone + Send, TTrans: Clone + Send,
TTrans::Dial: Send + 'static, TTrans::Dial: Send + 'static,
@ -529,9 +538,9 @@ where
pub fn add_incoming<TFut>( pub fn add_incoming<TFut>(
&mut self, &mut self,
future: TFut, future: TFut,
handler: THandler, handler: NodeHandlerWrapperBuilder<THandler>,
info: IncomingInfo<'_>, info: IncomingInfo<'_>,
) -> Result<ConnectionId, (ConnectionLimit, THandler)> ) -> Result<ConnectionId, (ConnectionLimit, NodeHandlerWrapperBuilder<THandler>)>
where where
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static, TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
{ {
@ -571,18 +580,12 @@ where
/// ///
/// > **Note**: We use a regular `poll` method instead of implementing `Stream`, /// > **Note**: We use a regular `poll` method instead of implementing `Stream`,
/// > because we want the `Pool` to stay borrowed if necessary. /// > because we want the `Pool` to stay borrowed if necessary.
pub fn poll<'a, TMuxer>( pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<'a, THandler, TTrans>>
&'a mut self,
cx: &mut Context<'_>,
) -> Poll<PoolEvent<'a, THandler, TTrans>>
where where
TTrans: Transport<Output = (PeerId, TMuxer)>, TTrans: Transport<Output = (PeerId, StreamMuxerBox)>,
TMuxer: StreamMuxer + Send + Sync + 'static, THandler: IntoProtocolsHandler + 'static,
TMuxer::Error: std::fmt::Debug, THandler::Handler: ProtocolsHandler + Send,
TMuxer::OutboundSubstream: Send, <THandler::Handler as ProtocolsHandler>::OutboundOpenInfo: Send,
THandler: IntoConnectionHandler + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>> + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
{ {
// Poll for events of established connections. // Poll for events of established connections.
// //
@ -896,17 +899,17 @@ where
} }
/// A connection in a [`Pool`]. /// A connection in a [`Pool`].
pub enum PoolConnection<'a, THandler: IntoConnectionHandler> { pub enum PoolConnection<'a, THandler: IntoProtocolsHandler> {
Pending(PendingConnection<'a, THandler>), Pending(PendingConnection<'a, THandler>),
Established(EstablishedConnection<'a, THandlerInEvent<THandler>>), Established(EstablishedConnection<'a, THandlerInEvent<THandler>>),
} }
/// A pending connection in a pool. /// A pending connection in a pool.
pub struct PendingConnection<'a, THandler: IntoConnectionHandler> { pub struct PendingConnection<'a, THandler: IntoProtocolsHandler> {
entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo<THandler>>, entry: hash_map::OccupiedEntry<'a, ConnectionId, PendingConnectionInfo<THandler>>,
} }
impl<THandler: IntoConnectionHandler> PendingConnection<'_, THandler> { impl<THandler: IntoProtocolsHandler> PendingConnection<'_, THandler> {
/// Aborts the connection attempt, closing the connection. /// Aborts the connection attempt, closing the connection.
pub fn abort(mut self) { pub fn abort(mut self) {
if let Some(notifier) = self.entry.get_mut().abort_notifier.take() { if let Some(notifier) = self.entry.get_mut().abort_notifier.take() {
@ -1259,7 +1262,7 @@ impl PoolConfig {
/// delivery to the connection handler. /// delivery to the connection handler.
/// ///
/// When the buffer for a particular connection is full, `notify_handler` will no /// When the buffer for a particular connection is full, `notify_handler` will no
/// longer be able to deliver events to the associated `ConnectionHandler`, /// longer be able to deliver events to the associated [`Connection`](super::Connection),
/// thus exerting back-pressure on the connection and peer API. /// thus exerting back-pressure on the connection and peer API.
pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self { pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.task_command_buffer_size = n.get() - 1; self.task_command_buffer_size = n.get() - 1;

View File

@ -24,13 +24,11 @@
use super::concurrent_dial::ConcurrentDial; use super::concurrent_dial::ConcurrentDial;
use crate::{ use crate::{
connection::{ connection::{
self, self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
ConnectionError, ConnectionHandler, IntoConnectionHandler, PendingInboundConnectionError,
PendingOutboundConnectionError, Substream,
}, },
protocols_handler::{NodeHandlerWrapper, NodeHandlerWrapperError},
transport::{Transport, TransportError}, transport::{Transport, TransportError},
Multiaddr, PeerId, Multiaddr, PeerId, ProtocolsHandler,
}; };
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
@ -38,7 +36,6 @@ use futures::{
SinkExt, StreamExt, SinkExt, StreamExt,
}; };
use libp2p_core::connection::ConnectionId; use libp2p_core::connection::ConnectionId;
use libp2p_core::muxing::StreamMuxer;
use std::pin::Pin; use std::pin::Pin;
use void::Void; use void::Void;
@ -76,7 +73,7 @@ where
} }
#[derive(Debug)] #[derive(Debug)]
pub enum EstablishedConnectionEvent<THandler: IntoConnectionHandler> { pub enum EstablishedConnectionEvent<THandler: ProtocolsHandler> {
/// A node we are connected to has changed its address. /// A node we are connected to has changed its address.
AddressChange { AddressChange {
id: ConnectionId, id: ConnectionId,
@ -87,7 +84,7 @@ pub enum EstablishedConnectionEvent<THandler: IntoConnectionHandler> {
Notify { Notify {
id: ConnectionId, id: ConnectionId,
peer_id: PeerId, peer_id: PeerId,
event: THandlerOutEvent<THandler>, event: THandler::OutEvent,
}, },
/// A connection closed, possibly due to an error. /// A connection closed, possibly due to an error.
/// ///
@ -96,8 +93,8 @@ pub enum EstablishedConnectionEvent<THandler: IntoConnectionHandler> {
Closed { Closed {
id: ConnectionId, id: ConnectionId,
peer_id: PeerId, peer_id: PeerId,
error: Option<ConnectionError<THandlerError<THandler>>>, error: Option<ConnectionError<NodeHandlerWrapperError<THandler::Error>>>,
handler: THandler::Handler, handler: NodeHandlerWrapper<THandler>,
}, },
} }
@ -180,16 +177,14 @@ pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
} }
} }
pub async fn new_for_established_connection<TMuxer, THandler>( pub async fn new_for_established_connection<THandler>(
connection_id: ConnectionId, connection_id: ConnectionId,
peer_id: PeerId, peer_id: PeerId,
mut connection: crate::connection::Connection<TMuxer, THandler::Handler>, mut connection: crate::connection::Connection<THandler>,
mut command_receiver: mpsc::Receiver<Command<THandlerInEvent<THandler>>>, mut command_receiver: mpsc::Receiver<Command<THandler::InEvent>>,
mut events: mpsc::Sender<EstablishedConnectionEvent<THandler>>, mut events: mpsc::Sender<EstablishedConnectionEvent<THandler>>,
) where ) where
TMuxer: StreamMuxer, THandler: ProtocolsHandler,
THandler: IntoConnectionHandler,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>>,
{ {
loop { loop {
match futures::future::select( match futures::future::select(

View File

@ -79,8 +79,8 @@ use crate::connection::IncomingInfo;
use crate::connection::{pool::PoolEvent, ListenersEvent, ListenersStream}; use crate::connection::{pool::PoolEvent, ListenersEvent, ListenersStream};
use connection::pool::{ConnectionCounters, ConnectionLimits, Pool, PoolConfig}; use connection::pool::{ConnectionCounters, ConnectionLimits, Pool, PoolConfig};
use connection::{ use connection::{
ConnectionError, ConnectionHandler, ConnectionLimit, EstablishedConnection, ConnectionError, ConnectionLimit, EstablishedConnection, PendingOutboundConnectionError,
IntoConnectionHandler, PendingOutboundConnectionError, Substream, Substream,
}; };
use dial_opts::{DialOpts, PeerCondition}; use dial_opts::{DialOpts, PeerCondition};
use either::Either; use either::Either;
@ -95,7 +95,7 @@ use libp2p_core::{
upgrade::ProtocolName, upgrade::ProtocolName,
Executor, Multiaddr, Negotiated, PeerId, Transport, Executor, Multiaddr, Negotiated, PeerId, Transport,
}; };
use protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; use protocols_handler::NodeHandlerWrapperError;
use registry::{AddressIntoIter, Addresses}; use registry::{AddressIntoIter, Addresses};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::HashSet; use std::collections::HashSet;
@ -264,10 +264,7 @@ where
listeners: ListenersStream<transport::Boxed<(PeerId, StreamMuxerBox)>>, listeners: ListenersStream<transport::Boxed<(PeerId, StreamMuxerBox)>>,
/// The nodes currently active. /// The nodes currently active.
pool: Pool< pool: Pool<THandler<TBehaviour>, transport::Boxed<(PeerId, StreamMuxerBox)>>,
NodeHandlerWrapperBuilder<THandler<TBehaviour>>,
transport::Boxed<(PeerId, StreamMuxerBox)>,
>,
/// The local peer ID. /// The local peer ID.
local_peer_id: PeerId, local_peer_id: PeerId,
@ -1147,8 +1144,8 @@ where
TTrans: Transport, TTrans: Transport,
TTrans::Error: Send + 'static, TTrans::Error: Send + 'static,
TBehaviour: NetworkBehaviour, TBehaviour: NetworkBehaviour,
THandler: IntoConnectionHandler, THandler: IntoProtocolsHandler,
THandler::Handler: ConnectionHandler< THandler::Handler: ProtocolsHandler<
InEvent = THandlerInEvent<TBehaviour>, InEvent = THandlerInEvent<TBehaviour>,
OutEvent = THandlerOutEvent<TBehaviour>, OutEvent = THandlerOutEvent<TBehaviour>,
>, >,

View File

@ -55,7 +55,10 @@ use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration};
pub use dummy::DummyProtocolsHandler; pub use dummy::DummyProtocolsHandler;
pub use map_in::MapInEvent; pub use map_in::MapInEvent;
pub use map_out::MapOutEvent; pub use map_out::MapOutEvent;
pub use node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; pub use node_handler::{
NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError,
NodeHandlerWrapperEvent, NodeHandlerWrapperOutboundOpenInfo,
};
pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect};

View File

@ -18,10 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::connection::{ use crate::connection::{Connected, Substream, SubstreamEndpoint};
Connected, ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, Substream,
SubstreamEndpoint,
};
use crate::protocols_handler::{ use crate::protocols_handler::{
IntoProtocolsHandler, KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, IntoProtocolsHandler, KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr, ProtocolsHandlerUpgrErr,
@ -72,15 +69,12 @@ where
} }
} }
impl<TIntoProtoHandler, TProtoHandler> IntoConnectionHandler impl<TIntoProtoHandler, TProtoHandler> NodeHandlerWrapperBuilder<TIntoProtoHandler>
for NodeHandlerWrapperBuilder<TIntoProtoHandler>
where where
TIntoProtoHandler: IntoProtocolsHandler<Handler = TProtoHandler>, TIntoProtoHandler: IntoProtocolsHandler<Handler = TProtoHandler>,
TProtoHandler: ProtocolsHandler, TProtoHandler: ProtocolsHandler,
{ {
type Handler = NodeHandlerWrapper<TIntoProtoHandler::Handler>; pub fn into_handler(self, connected: &Connected) -> NodeHandlerWrapper<TProtoHandler> {
fn into_handler(self, connected: &Connected) -> Self::Handler {
NodeHandlerWrapper { NodeHandlerWrapper {
handler: self handler: self
.handler .handler
@ -95,8 +89,12 @@ where
} }
} }
// A `ConnectionHandler` for an underlying `ProtocolsHandler`. /// A wrapper for an underlying [`ProtocolsHandler`].
/// Wraps around an implementation of `ProtocolsHandler`, and implements `NodeHandler`. ///
/// It extends [`ProtocolsHandler`] with:
/// - Enforced substream upgrade timeouts
/// - Driving substream upgrades
/// - Handling connection timeout
// TODO: add a caching system for protocols that are supported or not // TODO: add a caching system for protocols that are supported or not
pub struct NodeHandlerWrapper<TProtoHandler> pub struct NodeHandlerWrapper<TProtoHandler>
where where
@ -135,6 +133,21 @@ where
substream_upgrade_protocol_override: Option<upgrade::Version>, substream_upgrade_protocol_override: Option<upgrade::Version>,
} }
impl<TProtoHandler: ProtocolsHandler> std::fmt::Debug for NodeHandlerWrapper<TProtoHandler> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeHandlerWrapper")
.field("negotiating_in", &self.negotiating_in)
.field("negotiating_out", &self.negotiating_out)
.field("unique_dial_upgrade_id", &self.unique_dial_upgrade_id)
.field("shutdown", &self.shutdown)
.field(
"substream_upgrade_protocol_override",
&self.substream_upgrade_protocol_override,
)
.finish()
}
}
impl<TProtoHandler: ProtocolsHandler> NodeHandlerWrapper<TProtoHandler> { impl<TProtoHandler: ProtocolsHandler> NodeHandlerWrapper<TProtoHandler> {
pub(crate) fn into_protocols_handler(self) -> TProtoHandler { pub(crate) fn into_protocols_handler(self) -> TProtoHandler {
self.handler self.handler
@ -199,6 +212,7 @@ where
/// A planned shutdown is always postponed for as long as there are ingoing /// A planned shutdown is always postponed for as long as there are ingoing
/// or outgoing substreams being negotiated, i.e. it is a graceful, "idle" /// or outgoing substreams being negotiated, i.e. it is a graceful, "idle"
/// shutdown. /// shutdown.
#[derive(Debug)]
enum Shutdown { enum Shutdown {
/// No shutdown is planned. /// No shutdown is planned.
None, None,
@ -249,22 +263,22 @@ where
} }
} }
impl<TProtoHandler> ConnectionHandler for NodeHandlerWrapper<TProtoHandler> pub type NodeHandlerWrapperOutboundOpenInfo<TProtoHandler> = (
u64,
<TProtoHandler as ProtocolsHandler>::OutboundOpenInfo,
Duration,
);
impl<TProtoHandler> NodeHandlerWrapper<TProtoHandler>
where where
TProtoHandler: ProtocolsHandler, TProtoHandler: ProtocolsHandler,
{ {
type InEvent = TProtoHandler::InEvent; pub fn inject_substream(
type OutEvent = TProtoHandler::OutEvent;
type Error = NodeHandlerWrapperError<TProtoHandler::Error>;
type Substream = Substream<StreamMuxerBox>;
// The first element of the tuple is the unique upgrade identifier
// (see `unique_dial_upgrade_id`).
type OutboundOpenInfo = (u64, TProtoHandler::OutboundOpenInfo, Duration);
fn inject_substream(
&mut self, &mut self,
substream: Self::Substream, substream: Substream<StreamMuxerBox>,
endpoint: SubstreamEndpoint<Self::OutboundOpenInfo>, // The first element of the tuple is the unique upgrade identifier
// (see `unique_dial_upgrade_id`).
endpoint: SubstreamEndpoint<NodeHandlerWrapperOutboundOpenInfo<TProtoHandler>>,
) { ) {
match endpoint { match endpoint {
SubstreamEndpoint::Listener => { SubstreamEndpoint::Listener => {
@ -315,19 +329,26 @@ where
} }
} }
fn inject_event(&mut self, event: Self::InEvent) { pub fn inject_event(&mut self, event: TProtoHandler::InEvent) {
self.handler.inject_event(event); self.handler.inject_event(event);
} }
fn inject_address_change(&mut self, new_address: &Multiaddr) { pub fn inject_address_change(&mut self, new_address: &Multiaddr) {
self.handler.inject_address_change(new_address); self.handler.inject_address_change(new_address);
} }
fn poll( pub fn poll(
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>> ) -> Poll<
{ Result<
NodeHandlerWrapperEvent<
NodeHandlerWrapperOutboundOpenInfo<TProtoHandler>,
TProtoHandler::OutEvent,
>,
NodeHandlerWrapperError<TProtoHandler::Error>,
>,
> {
while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) {
match res { match res {
Ok(upgrade) => self Ok(upgrade) => self
@ -372,7 +393,7 @@ where
match poll_result { match poll_result {
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))); return Poll::Ready(Ok(NodeHandlerWrapperEvent::Custom(event)));
} }
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
let id = self.unique_dial_upgrade_id; let id = self.unique_dial_upgrade_id;
@ -380,7 +401,7 @@ where
self.unique_dial_upgrade_id += 1; self.unique_dial_upgrade_id += 1;
let (upgrade, info) = protocol.into_upgrade(); let (upgrade, info) = protocol.into_upgrade();
self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); self.queued_dial_upgrades.push((id, SendWrapper(upgrade)));
return Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(( return Poll::Ready(Ok(NodeHandlerWrapperEvent::OutboundSubstreamRequest((
id, info, timeout, id, info, timeout,
)))); ))));
} }
@ -408,3 +429,13 @@ where
Poll::Pending Poll::Pending
} }
} }
/// Event produced by a [`NodeHandlerWrapper`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NodeHandlerWrapperEvent<TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote.
OutboundSubstreamRequest(TOutboundOpenInfo),
/// Other event.
Custom(TCustom),
}