swarm/: Make NodeHandlerWrapper an impl detail of connection (#2523)

Previously one would wrap a `ProtocolsHandler` into a
`NodeHandlerWrapper` as early as possible, even though the functionality
of `NodeHandlerWrapper` is only needed within `mod connection`.

This commit makes `NodeHandlerWrapper` an implementation detail of `mod
connection`, thus neither `mod protocols_handler`, `mod pool` nor the
root level (`libp2p-swarm`) need to bother about the abstraction.

In addition to the above, this commit:

- Renames `NodeHandlerWrapper` to `HandlerWrapper`. The word `Node` is
  outdated.
- Removes `NodeHandlerWrapperBuilder`. With this simplification it is no
  longer needed.
- Folds `NodeHandlerWrapperError` into `ConnectionError`. No need for
  upper layers to be aware of the fact that `ProtocolHandler`s are
  wrapped.
This commit is contained in:
Max Inden
2022-02-18 11:32:58 +01:00
committed by GitHub
parent eeb3504d5f
commit 2ff9acee22
7 changed files with 123 additions and 195 deletions

View File

@ -19,15 +19,12 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
mod error; mod error;
mod handler_wrapper;
mod listeners; mod listeners;
mod substream; mod substream;
pub(crate) mod pool; pub(crate) mod pool;
use crate::protocols_handler::{
NodeHandlerWrapper, NodeHandlerWrapperError, NodeHandlerWrapperEvent,
NodeHandlerWrapperOutboundOpenInfo, ProtocolsHandler,
};
pub use error::{ pub use error::{
ConnectionError, PendingConnectionError, PendingInboundConnectionError, ConnectionError, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError, PendingOutboundConnectionError,
@ -37,9 +34,12 @@ pub use pool::{ConnectionCounters, ConnectionLimits};
pub use pool::{EstablishedConnection, PendingConnection}; pub use pool::{EstablishedConnection, PendingConnection};
pub use substream::{Close, Substream, SubstreamEndpoint}; pub use substream::{Close, Substream, SubstreamEndpoint};
use crate::protocols_handler::ProtocolsHandler;
use handler_wrapper::HandlerWrapper;
use libp2p_core::connection::ConnectedPoint; use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Multiaddr; use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::StreamMuxerBox; use libp2p_core::muxing::StreamMuxerBox;
use libp2p_core::upgrade;
use libp2p_core::PeerId; use libp2p_core::PeerId;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use substream::{Muxing, SubstreamEvent}; use substream::{Muxing, SubstreamEvent};
@ -56,21 +56,21 @@ pub struct Connected {
/// Event generated by a [`Connection`]. /// Event generated by a [`Connection`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Event<T> { pub enum Event<T> {
/// Event generated by the [`NodeHandlerWrapper`]. /// Event generated by the [`ProtocolsHandler`].
Handler(T), Handler(T),
/// Address of the remote has changed. /// Address of the remote has changed.
AddressChange(Multiaddr), AddressChange(Multiaddr),
} }
/// A multiplexed connection to a peer with an associated [`NodeHandlerWrapper`]. /// A multiplexed connection to a peer with an associated [`ProtocolsHandler`].
pub struct Connection<THandler> pub struct Connection<THandler>
where where
THandler: ProtocolsHandler, THandler: ProtocolsHandler,
{ {
/// Node that handles the muxing. /// Node that handles the muxing.
muxing: substream::Muxing<StreamMuxerBox, NodeHandlerWrapperOutboundOpenInfo<THandler>>, muxing: substream::Muxing<StreamMuxerBox, handler_wrapper::OutboundOpenInfo<THandler>>,
/// Handler that processes substreams. /// Handler that processes substreams.
handler: NodeHandlerWrapper<THandler>, handler: HandlerWrapper<THandler>,
} }
impl<THandler> fmt::Debug for Connection<THandler> impl<THandler> fmt::Debug for Connection<THandler>
@ -93,10 +93,15 @@ where
{ {
/// Builds a new `Connection` from the given substream multiplexer /// Builds a new `Connection` from the given substream multiplexer
/// and connection handler. /// and connection handler.
pub fn new(muxer: StreamMuxerBox, handler: NodeHandlerWrapper<THandler>) -> Self { pub fn new(
muxer: StreamMuxerBox,
handler: THandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
) -> Self {
let wrapped_handler = HandlerWrapper::new(handler, substream_upgrade_protocol_override);
Connection { Connection {
muxing: Muxing::new(muxer), muxing: Muxing::new(muxer),
handler, handler: wrapped_handler,
} }
} }
@ -107,8 +112,8 @@ where
/// Begins an orderly shutdown of the connection, returning the connection /// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete. /// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (NodeHandlerWrapper<THandler>, Close<StreamMuxerBox>) { pub fn close(self) -> (THandler, Close<StreamMuxerBox>) {
(self.handler, self.muxing.close().0) (self.handler.into_protocols_handler(), self.muxing.close().0)
} }
/// Polls the connection for events produced by the associated handler /// Polls the connection for events produced by the associated handler
@ -116,12 +121,7 @@ where
pub fn poll( pub fn poll(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll< ) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
Result<
Event<THandler::OutEvent>,
ConnectionError<NodeHandlerWrapperError<THandler::Error>>,
>,
> {
loop { loop {
let mut io_pending = false; let mut io_pending = false;
@ -153,13 +153,13 @@ where
return Poll::Pending; // Nothing to do return Poll::Pending; // Nothing to do
} }
} }
Poll::Ready(Ok(NodeHandlerWrapperEvent::OutboundSubstreamRequest(user_data))) => { Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data); self.muxing.open_substream(user_data);
} }
Poll::Ready(Ok(NodeHandlerWrapperEvent::Custom(event))) => { Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event))); return Poll::Ready(Ok(Event::Handler(event)));
} }
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))), Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
} }
} }
} }

View File

@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use super::handler_wrapper;
use crate::transport::TransportError; use crate::transport::TransportError;
use crate::Multiaddr; use crate::Multiaddr;
use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId};
@ -30,6 +31,9 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error? // TODO: Eventually this should also be a custom error?
IO(io::Error), IO(io::Error),
/// The connection keep-alive timeout expired.
KeepAliveTimeout,
/// The connection handler produced an error. /// The connection handler produced an error.
Handler(THandlerErr), Handler(THandlerErr),
} }
@ -41,6 +45,9 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err), ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::KeepAliveTimeout => {
write!(f, "Connection closed due to expired keep-alive timeout.")
}
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err), ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
} }
} }
@ -53,11 +60,21 @@ where
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self { match self {
ConnectionError::IO(err) => Some(err), ConnectionError::IO(err) => Some(err),
ConnectionError::KeepAliveTimeout => None,
ConnectionError::Handler(err) => Some(err), ConnectionError::Handler(err) => Some(err),
} }
} }
} }
impl<THandlerErr> From<handler_wrapper::Error<THandlerErr>> for ConnectionError<THandlerErr> {
fn from(error: handler_wrapper::Error<THandlerErr>) -> Self {
match error {
handler_wrapper::Error::Handler(e) => Self::Handler(e),
handler_wrapper::Error::KeepAliveTimeout => Self::KeepAliveTimeout,
}
}
}
/// Errors that can occur in the context of a pending outgoing `Connection`. /// Errors that can occur in the context of a pending outgoing `Connection`.
/// ///
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to /// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to

View File

@ -18,10 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::connection::{Connected, Substream, SubstreamEndpoint}; use crate::connection::{Substream, SubstreamEndpoint};
use crate::protocols_handler::{ use crate::protocols_handler::{
IntoProtocolsHandler, KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr,
ProtocolsHandlerUpgrErr,
}; };
use crate::upgrade::SendWrapper; use crate::upgrade::SendWrapper;
@ -36,59 +35,6 @@ use libp2p_core::{
}; };
use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration};
/// Prototype for a `NodeHandlerWrapper`.
pub struct NodeHandlerWrapperBuilder<TIntoProtoHandler> {
/// The underlying handler.
handler: TIntoProtoHandler,
/// The substream upgrade protocol override, if any.
substream_upgrade_protocol_override: Option<upgrade::Version>,
}
impl<TIntoProtoHandler> NodeHandlerWrapperBuilder<TIntoProtoHandler>
where
TIntoProtoHandler: IntoProtocolsHandler,
{
/// Builds a `NodeHandlerWrapperBuilder`.
pub(crate) fn new(handler: TIntoProtoHandler) -> Self {
NodeHandlerWrapperBuilder {
handler,
substream_upgrade_protocol_override: None,
}
}
pub(crate) fn with_substream_upgrade_protocol_override(
mut self,
version: Option<upgrade::Version>,
) -> Self {
self.substream_upgrade_protocol_override = version;
self
}
pub(crate) fn into_protocols_handler(self) -> TIntoProtoHandler {
self.handler
}
}
impl<TIntoProtoHandler, TProtoHandler> NodeHandlerWrapperBuilder<TIntoProtoHandler>
where
TIntoProtoHandler: IntoProtocolsHandler<Handler = TProtoHandler>,
TProtoHandler: ProtocolsHandler,
{
pub fn into_handler(self, connected: &Connected) -> NodeHandlerWrapper<TProtoHandler> {
NodeHandlerWrapper {
handler: self
.handler
.into_handler(&connected.peer_id, &connected.endpoint),
negotiating_in: Default::default(),
negotiating_out: Default::default(),
queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0,
shutdown: Shutdown::None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
}
}
}
/// A wrapper for an underlying [`ProtocolsHandler`]. /// A wrapper for an underlying [`ProtocolsHandler`].
/// ///
/// It extends [`ProtocolsHandler`] with: /// It extends [`ProtocolsHandler`] with:
@ -96,7 +42,7 @@ where
/// - Driving substream upgrades /// - Driving substream upgrades
/// - Handling connection timeout /// - Handling connection timeout
// TODO: add a caching system for protocols that are supported or not // TODO: add a caching system for protocols that are supported or not
pub struct NodeHandlerWrapper<TProtoHandler> pub struct HandlerWrapper<TProtoHandler>
where where
TProtoHandler: ProtocolsHandler, TProtoHandler: ProtocolsHandler,
{ {
@ -133,9 +79,9 @@ where
substream_upgrade_protocol_override: Option<upgrade::Version>, substream_upgrade_protocol_override: Option<upgrade::Version>,
} }
impl<TProtoHandler: ProtocolsHandler> std::fmt::Debug for NodeHandlerWrapper<TProtoHandler> { impl<TProtoHandler: ProtocolsHandler> std::fmt::Debug for HandlerWrapper<TProtoHandler> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeHandlerWrapper") f.debug_struct("HandlerWrapper")
.field("negotiating_in", &self.negotiating_in) .field("negotiating_in", &self.negotiating_in)
.field("negotiating_out", &self.negotiating_out) .field("negotiating_out", &self.negotiating_out)
.field("unique_dial_upgrade_id", &self.unique_dial_upgrade_id) .field("unique_dial_upgrade_id", &self.unique_dial_upgrade_id)
@ -148,7 +94,22 @@ impl<TProtoHandler: ProtocolsHandler> std::fmt::Debug for NodeHandlerWrapper<TPr
} }
} }
impl<TProtoHandler: ProtocolsHandler> NodeHandlerWrapper<TProtoHandler> { impl<TProtoHandler: ProtocolsHandler> HandlerWrapper<TProtoHandler> {
pub(crate) fn new(
handler: TProtoHandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
) -> Self {
Self {
handler,
negotiating_in: Default::default(),
negotiating_out: Default::default(),
queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0,
shutdown: Shutdown::None,
substream_upgrade_protocol_override,
}
}
pub(crate) fn into_protocols_handler(self) -> TProtoHandler { pub(crate) fn into_protocols_handler(self) -> TProtoHandler {
self.handler self.handler
} }
@ -222,54 +183,54 @@ enum Shutdown {
Later(Delay, Instant), Later(Delay, Instant),
} }
/// Error generated by the `NodeHandlerWrapper`. /// Error generated by the [`HandlerWrapper`].
#[derive(Debug)] #[derive(Debug)]
pub enum NodeHandlerWrapperError<TErr> { pub enum Error<TErr> {
/// The connection handler encountered an error. /// The connection handler encountered an error.
Handler(TErr), Handler(TErr),
/// The connection keep-alive timeout expired. /// The connection keep-alive timeout expired.
KeepAliveTimeout, KeepAliveTimeout,
} }
impl<TErr> From<TErr> for NodeHandlerWrapperError<TErr> { impl<TErr> From<TErr> for Error<TErr> {
fn from(err: TErr) -> NodeHandlerWrapperError<TErr> { fn from(err: TErr) -> Error<TErr> {
NodeHandlerWrapperError::Handler(err) Error::Handler(err)
} }
} }
impl<TErr> fmt::Display for NodeHandlerWrapperError<TErr> impl<TErr> fmt::Display for Error<TErr>
where where
TErr: fmt::Display, TErr: fmt::Display,
{ {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
NodeHandlerWrapperError::Handler(err) => write!(f, "{}", err), Error::Handler(err) => write!(f, "{}", err),
NodeHandlerWrapperError::KeepAliveTimeout => { Error::KeepAliveTimeout => {
write!(f, "Connection closed due to expired keep-alive timeout.") write!(f, "Connection closed due to expired keep-alive timeout.")
} }
} }
} }
} }
impl<TErr> error::Error for NodeHandlerWrapperError<TErr> impl<TErr> error::Error for Error<TErr>
where where
TErr: error::Error + 'static, TErr: error::Error + 'static,
{ {
fn source(&self) -> Option<&(dyn error::Error + 'static)> { fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self { match self {
NodeHandlerWrapperError::Handler(err) => Some(err), Error::Handler(err) => Some(err),
NodeHandlerWrapperError::KeepAliveTimeout => None, Error::KeepAliveTimeout => None,
} }
} }
} }
pub type NodeHandlerWrapperOutboundOpenInfo<TProtoHandler> = ( pub type OutboundOpenInfo<TProtoHandler> = (
u64, u64,
<TProtoHandler as ProtocolsHandler>::OutboundOpenInfo, <TProtoHandler as ProtocolsHandler>::OutboundOpenInfo,
Duration, Duration,
); );
impl<TProtoHandler> NodeHandlerWrapper<TProtoHandler> impl<TProtoHandler> HandlerWrapper<TProtoHandler>
where where
TProtoHandler: ProtocolsHandler, TProtoHandler: ProtocolsHandler,
{ {
@ -278,7 +239,7 @@ where
substream: Substream<StreamMuxerBox>, substream: Substream<StreamMuxerBox>,
// The first element of the tuple is the unique upgrade identifier // The first element of the tuple is the unique upgrade identifier
// (see `unique_dial_upgrade_id`). // (see `unique_dial_upgrade_id`).
endpoint: SubstreamEndpoint<NodeHandlerWrapperOutboundOpenInfo<TProtoHandler>>, endpoint: SubstreamEndpoint<OutboundOpenInfo<TProtoHandler>>,
) { ) {
match endpoint { match endpoint {
SubstreamEndpoint::Listener => { SubstreamEndpoint::Listener => {
@ -342,11 +303,8 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll< ) -> Poll<
Result< Result<
NodeHandlerWrapperEvent< Event<OutboundOpenInfo<TProtoHandler>, TProtoHandler::OutEvent>,
NodeHandlerWrapperOutboundOpenInfo<TProtoHandler>, Error<TProtoHandler::Error>,
TProtoHandler::OutEvent,
>,
NodeHandlerWrapperError<TProtoHandler::Error>,
>, >,
> { > {
while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) {
@ -393,7 +351,7 @@ where
match poll_result { match poll_result {
Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => { Poll::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Poll::Ready(Ok(NodeHandlerWrapperEvent::Custom(event))); return Poll::Ready(Ok(Event::Custom(event)));
} }
Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => { Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol }) => {
let id = self.unique_dial_upgrade_id; let id = self.unique_dial_upgrade_id;
@ -401,9 +359,7 @@ where
self.unique_dial_upgrade_id += 1; self.unique_dial_upgrade_id += 1;
let (upgrade, info) = protocol.into_upgrade(); let (upgrade, info) = protocol.into_upgrade();
self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); self.queued_dial_upgrades.push((id, SendWrapper(upgrade)));
return Poll::Ready(Ok(NodeHandlerWrapperEvent::OutboundSubstreamRequest(( return Poll::Ready(Ok(Event::OutboundSubstreamRequest((id, info, timeout))));
id, info, timeout,
))));
} }
Poll::Ready(ProtocolsHandlerEvent::Close(err)) => return Poll::Ready(Err(err.into())), Poll::Ready(ProtocolsHandlerEvent::Close(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => (), Poll::Pending => (),
@ -414,13 +370,9 @@ where
if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {
match self.shutdown { match self.shutdown {
Shutdown::None => {} Shutdown::None => {}
Shutdown::Asap => { Shutdown::Asap => return Poll::Ready(Err(Error::KeepAliveTimeout)),
return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout))
}
Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) { Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) {
Poll::Ready(_) => { Poll::Ready(_) => return Poll::Ready(Err(Error::KeepAliveTimeout)),
return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout))
}
Poll::Pending => {} Poll::Pending => {}
}, },
} }
@ -430,9 +382,9 @@ where
} }
} }
/// Event produced by a [`NodeHandlerWrapper`]. /// Event produced by a [`HandlerWrapper`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum NodeHandlerWrapperEvent<TOutboundOpenInfo, TCustom> { pub enum Event<TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote. /// Require a new outbound substream to be opened with the remote.
OutboundSubstreamRequest(TOutboundOpenInfo), OutboundSubstreamRequest(TOutboundOpenInfo),

View File

@ -25,7 +25,6 @@ use crate::{
Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError,
PendingInboundConnectionError, PendingOutboundConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
}, },
protocols_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError},
transport::{Transport, TransportError}, transport::{Transport, TransportError},
ConnectedPoint, Executor, IntoProtocolsHandler, Multiaddr, PeerId, ProtocolsHandler, ConnectedPoint, Executor, IntoProtocolsHandler, Multiaddr, PeerId, ProtocolsHandler,
}; };
@ -85,6 +84,9 @@ where
/// Number of addresses concurrently dialed for a single outbound connection attempt. /// Number of addresses concurrently dialed for a single outbound connection attempt.
dial_concurrency_factor: NonZeroU8, dial_concurrency_factor: NonZeroU8,
/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
/// The executor to use for running the background tasks. If `None`, /// The executor to use for running the background tasks. If `None`,
/// the tasks are kept in `local_spawns` instead and polled on the /// the tasks are kept in `local_spawns` instead and polled on the
/// current thread when the [`Pool`] is polled for new events. /// current thread when the [`Pool`] is polled for new events.
@ -138,7 +140,7 @@ struct PendingConnectionInfo<THandler> {
/// [`PeerId`] of the remote peer. /// [`PeerId`] of the remote peer.
peer_id: Option<PeerId>, peer_id: Option<PeerId>,
/// Handler to handle connection once no longer pending but established. /// Handler to handle connection once no longer pending but established.
handler: NodeHandlerWrapperBuilder<THandler>, handler: THandler,
endpoint: PendingPoint, endpoint: PendingPoint,
/// When dropped, notifies the task which then knows to terminate. /// When dropped, notifies the task which then knows to terminate.
abort_notifier: Option<oneshot::Sender<Void>>, abort_notifier: Option<oneshot::Sender<Void>>,
@ -187,16 +189,12 @@ where
connected: Connected, connected: Connected,
/// The error that occurred, if any. If `None`, the connection /// The error that occurred, if any. If `None`, the connection
/// was closed by the local peer. /// was closed by the local peer.
error: Option< error: Option<ConnectionError<<THandler::Handler as ProtocolsHandler>::Error>>,
ConnectionError<
NodeHandlerWrapperError<<THandler::Handler as ProtocolsHandler>::Error>,
>,
>,
/// A reference to the pool that used to manage the connection. /// A reference to the pool that used to manage the connection.
pool: &'a mut Pool<THandler, TTrans>, pool: &'a mut Pool<THandler, TTrans>,
/// The remaining established connections to the same peer. /// The remaining established connections to the same peer.
remaining_established_connection_ids: Vec<ConnectionId>, remaining_established_connection_ids: Vec<ConnectionId>,
handler: NodeHandlerWrapper<THandler::Handler>, handler: THandler::Handler,
}, },
/// An outbound connection attempt failed. /// An outbound connection attempt failed.
@ -206,7 +204,7 @@ where
/// The error that occurred. /// The error that occurred.
error: PendingOutboundConnectionError<TTrans::Error>, error: PendingOutboundConnectionError<TTrans::Error>,
/// The handler that was supposed to handle the connection. /// The handler that was supposed to handle the connection.
handler: NodeHandlerWrapperBuilder<THandler>, handler: THandler,
/// The (expected) peer of the failed connection. /// The (expected) peer of the failed connection.
peer: Option<PeerId>, peer: Option<PeerId>,
}, },
@ -222,7 +220,7 @@ where
/// The error that occurred. /// The error that occurred.
error: PendingInboundConnectionError<TTrans::Error>, error: PendingInboundConnectionError<TTrans::Error>,
/// The handler that was supposed to handle the connection. /// The handler that was supposed to handle the connection.
handler: NodeHandlerWrapperBuilder<THandler>, handler: THandler,
}, },
/// A node has produced an event. /// A node has produced an event.
@ -330,6 +328,7 @@ where
next_connection_id: ConnectionId::new(0), next_connection_id: ConnectionId::new(0),
task_command_buffer_size: config.task_command_buffer_size, task_command_buffer_size: config.task_command_buffer_size,
dial_concurrency_factor: config.dial_concurrency_factor, dial_concurrency_factor: config.dial_concurrency_factor,
substream_upgrade_protocol_override: config.substream_upgrade_protocol_override,
executor: config.executor, executor: config.executor,
local_spawns: FuturesUnordered::new(), local_spawns: FuturesUnordered::new(),
pending_connection_events_tx, pending_connection_events_tx,
@ -481,10 +480,10 @@ where
transport: TTrans, transport: TTrans,
addresses: impl Iterator<Item = Multiaddr> + Send + 'static, addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
peer: Option<PeerId>, peer: Option<PeerId>,
handler: NodeHandlerWrapperBuilder<THandler>, handler: THandler,
role_override: Endpoint, role_override: Endpoint,
dial_concurrency_factor_override: Option<NonZeroU8>, dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, (ConnectionLimit, NodeHandlerWrapperBuilder<THandler>)> ) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where where
TTrans: Clone + Send, TTrans: Clone + Send,
TTrans::Dial: Send + 'static, TTrans::Dial: Send + 'static,
@ -538,9 +537,9 @@ where
pub fn add_incoming<TFut>( pub fn add_incoming<TFut>(
&mut self, &mut self,
future: TFut, future: TFut,
handler: NodeHandlerWrapperBuilder<THandler>, handler: THandler,
info: IncomingInfo<'_>, info: IncomingInfo<'_>,
) -> Result<ConnectionId, (ConnectionLimit, NodeHandlerWrapperBuilder<THandler>)> ) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where where
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static, TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
{ {
@ -816,13 +815,11 @@ where
}, },
); );
let connected = Connected { let connection = super::Connection::new(
peer_id: obtained_peer_id, muxer,
endpoint, handler.into_handler(&obtained_peer_id, &endpoint),
}; self.substream_upgrade_protocol_override,
);
let connection =
super::Connection::new(muxer, handler.into_handler(&connected));
self.spawn( self.spawn(
task::new_for_established_connection( task::new_for_established_connection(
id, id,
@ -1226,6 +1223,9 @@ pub struct PoolConfig {
/// Number of addresses concurrently dialed for a single outbound connection attempt. /// Number of addresses concurrently dialed for a single outbound connection attempt.
pub dial_concurrency_factor: NonZeroU8, pub dial_concurrency_factor: NonZeroU8,
/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
} }
impl Default for PoolConfig { impl Default for PoolConfig {
@ -1236,6 +1236,7 @@ impl Default for PoolConfig {
task_command_buffer_size: 7, task_command_buffer_size: 7,
// By default, addresses of a single connection attempt are dialed in sequence. // By default, addresses of a single connection attempt are dialed in sequence.
dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"), dial_concurrency_factor: NonZeroU8::new(1).expect("1 > 0"),
substream_upgrade_protocol_override: None,
} }
} }
} }
@ -1285,6 +1286,15 @@ impl PoolConfig {
self.dial_concurrency_factor = factor; self.dial_concurrency_factor = factor;
self self
} }
/// Configures an override for the substream upgrade protocol to use.
pub fn with_substream_upgrade_protocol_override(
mut self,
v: libp2p_core::upgrade::Version,
) -> Self {
self.substream_upgrade_protocol_override = Some(v);
self
}
} }
trait EntryExt<'a, K, V> { trait EntryExt<'a, K, V> {

View File

@ -26,7 +26,6 @@ use crate::{
connection::{ connection::{
self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
}, },
protocols_handler::{NodeHandlerWrapper, NodeHandlerWrapperError},
transport::{Transport, TransportError}, transport::{Transport, TransportError},
Multiaddr, PeerId, ProtocolsHandler, Multiaddr, PeerId, ProtocolsHandler,
}; };
@ -93,8 +92,8 @@ pub enum EstablishedConnectionEvent<THandler: ProtocolsHandler> {
Closed { Closed {
id: ConnectionId, id: ConnectionId,
peer_id: PeerId, peer_id: PeerId,
error: Option<ConnectionError<NodeHandlerWrapperError<THandler::Error>>>, error: Option<ConnectionError<THandler::Error>>,
handler: NodeHandlerWrapper<THandler>, handler: THandler,
}, },
} }

View File

@ -93,7 +93,6 @@ use libp2p_core::{
upgrade::ProtocolName, upgrade::ProtocolName,
Executor, Multiaddr, Negotiated, PeerId, Transport, Executor, Multiaddr, Negotiated, PeerId, Transport,
}; };
use protocols_handler::NodeHandlerWrapperError;
use registry::{AddressIntoIter, Addresses}; use registry::{AddressIntoIter, Addresses};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::HashSet; use std::collections::HashSet;
@ -163,7 +162,7 @@ pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
num_established: u32, num_established: u32,
/// Reason for the disconnection, if it was not a successful /// Reason for the disconnection, if it was not a successful
/// active close. /// active close.
cause: Option<ConnectionError<NodeHandlerWrapperError<THandlerErr>>>, cause: Option<ConnectionError<THandlerErr>>,
}, },
/// A new connection arrived on a listener and is in the process of protocol negotiation. /// A new connection arrived on a listener and is in the process of protocol negotiation.
/// ///
@ -294,9 +293,6 @@ where
/// (or dropped if the peer disconnected) before the `behaviour` /// (or dropped if the peer disconnected) before the `behaviour`
/// can be polled again. /// can be polled again.
pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>, pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
} }
impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {} impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}
@ -506,10 +502,6 @@ where
} }
}; };
let handler = handler
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(self.substream_upgrade_protocol_override);
match self.pool.add_outgoing( match self.pool.add_outgoing(
self.listeners.transport().clone(), self.listeners.transport().clone(),
addresses, addresses,
@ -521,8 +513,7 @@ where
Ok(_connection_id) => Ok(()), Ok(_connection_id) => Ok(()),
Err((connection_limit, handler)) => { Err((connection_limit, handler)) => {
let error = DialError::ConnectionLimit(connection_limit); let error = DialError::ConnectionLimit(connection_limit);
self.behaviour self.behaviour.inject_dial_failure(None, handler, &error);
.inject_dial_failure(None, handler.into_protocols_handler(), &error);
return Err(error); return Err(error);
} }
} }
@ -675,13 +666,7 @@ where
local_addr, local_addr,
send_back_addr, send_back_addr,
}) => { }) => {
let handler = this let handler = this.behaviour.new_handler();
.behaviour
.new_handler()
.into_node_handler_builder()
.with_substream_upgrade_protocol_override(
this.substream_upgrade_protocol_override,
);
match this.pool.add_incoming( match this.pool.add_incoming(
upgrade, upgrade,
handler, handler,
@ -700,7 +685,7 @@ where
this.behaviour.inject_listen_failure( this.behaviour.inject_listen_failure(
&local_addr, &local_addr,
&send_back_addr, &send_back_addr,
handler.into_protocols_handler(), handler,
); );
log::warn!("Incoming connection rejected: {:?}", connection_limit); log::warn!("Incoming connection rejected: {:?}", connection_limit);
} }
@ -828,11 +813,7 @@ where
}) => { }) => {
let error = error.into(); let error = error.into();
this.behaviour.inject_dial_failure( this.behaviour.inject_dial_failure(peer, handler, &error);
peer,
handler.into_protocols_handler(),
&error,
);
if let Some(peer) = peer { if let Some(peer) = peer {
log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
@ -853,11 +834,8 @@ where
handler, handler,
}) => { }) => {
log::debug!("Incoming connection failed: {:?}", error); log::debug!("Incoming connection failed: {:?}", error);
this.behaviour.inject_listen_failure( this.behaviour
&local_addr, .inject_listen_failure(&local_addr, &send_back_addr, handler);
&send_back_addr,
handler.into_protocols_handler(),
);
return Poll::Ready(SwarmEvent::IncomingConnectionError { return Poll::Ready(SwarmEvent::IncomingConnectionError {
local_addr, local_addr,
send_back_addr, send_back_addr,
@ -900,7 +878,7 @@ where
&peer_id, &peer_id,
&id, &id,
&endpoint, &endpoint,
handler.into_protocols_handler(), handler,
remaining_non_banned, remaining_non_banned,
); );
} }
@ -1242,7 +1220,6 @@ pub struct SwarmBuilder<TBehaviour> {
behaviour: TBehaviour, behaviour: TBehaviour,
pool_config: PoolConfig, pool_config: PoolConfig,
connection_limits: ConnectionLimits, connection_limits: ConnectionLimits,
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
} }
impl<TBehaviour> SwarmBuilder<TBehaviour> impl<TBehaviour> SwarmBuilder<TBehaviour>
@ -1263,7 +1240,6 @@ where
behaviour, behaviour,
pool_config: Default::default(), pool_config: Default::default(),
connection_limits: Default::default(), connection_limits: Default::default(),
substream_upgrade_protocol_override: None,
} }
} }
@ -1341,7 +1317,7 @@ where
/// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour` /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
/// > are ignored. /// > are ignored.
pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self { pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
self.substream_upgrade_protocol_override = Some(v); self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
self self
} }
@ -1382,7 +1358,6 @@ where
banned_peers: HashSet::new(), banned_peers: HashSet::new(),
banned_peer_connections: HashSet::new(), banned_peer_connections: HashSet::new(),
pending_event: None, pending_event: None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
} }
} }
} }

View File

@ -42,7 +42,6 @@ pub mod either;
mod map_in; mod map_in;
mod map_out; mod map_out;
pub mod multi; pub mod multi;
mod node_handler;
mod one_shot; mod one_shot;
mod select; mod select;
@ -55,10 +54,6 @@ use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration};
pub use dummy::DummyProtocolsHandler; pub use dummy::DummyProtocolsHandler;
pub use map_in::MapInEvent; pub use map_in::MapInEvent;
pub use map_out::MapOutEvent; pub use map_out::MapOutEvent;
pub use node_handler::{
NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError,
NodeHandlerWrapperEvent, NodeHandlerWrapperOutboundOpenInfo,
};
pub use one_shot::{OneShotHandler, OneShotHandlerConfig}; pub use one_shot::{OneShotHandler, OneShotHandlerConfig};
pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect};
@ -222,17 +217,6 @@ pub trait ProtocolsHandler: Send + 'static {
{ {
ProtocolsHandlerSelect::new(self, other) ProtocolsHandlerSelect::new(self, other)
} }
/// Creates a builder that allows creating a `NodeHandler` that handles this protocol
/// exclusively.
///
/// > **Note**: This method should not be redefined in a custom `ProtocolsHandler`.
fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder<Self>
where
Self: Sized,
{
IntoProtocolsHandler::into_node_handler_builder(self)
}
} }
/// Configuration of inbound or outbound substream protocol(s) /// Configuration of inbound or outbound substream protocol(s)
@ -493,15 +477,6 @@ pub trait IntoProtocolsHandler: Send + 'static {
{ {
IntoProtocolsHandlerSelect::new(self, other) IntoProtocolsHandlerSelect::new(self, other)
} }
/// Creates a builder that will allow creating a `NodeHandler` that handles this protocol
/// exclusively.
fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder<Self>
where
Self: Sized,
{
NodeHandlerWrapperBuilder::new(self)
}
} }
impl<T> IntoProtocolsHandler for T impl<T> IntoProtocolsHandler for T