diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index e96957b5..ee476695 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,5 +1,10 @@ # 0.22.0 [unreleased] +- Simplify incoming connection handling. The `IncomingConnectionEvent` + has been removed. Instead, pass the `IncomingConnection` obtained + from `NetworkEvent::IncomingConnection` to `Network::accept()`. + [PR 1732](https://github.com/libp2p/rust-libp2p/pull/1732). + - Allow any closure to be passed as an executor. [PR 1686](https://github.com/libp2p/rust-libp2p/pull/1686) diff --git a/core/src/network.rs b/core/src/network.rs index f7a5ee96..6695809f 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -21,7 +21,7 @@ mod event; pub mod peer; -pub use event::{NetworkEvent, IncomingConnectionEvent}; +pub use event::{NetworkEvent, IncomingConnection}; pub use peer::Peer; use crate::{ @@ -334,6 +334,35 @@ where Peer::new(self, peer_id) } + /// Accepts a pending incoming connection obtained via [`NetworkEvent::IncomingConnection`], + /// adding it to the `Network`s connection pool subject to the configured limits. + /// + /// Once the connection is established and all transport protocol upgrades + /// completed, the connection is associated with the provided `handler`. + pub fn accept( + &mut self, + connection: IncomingConnection, + handler: THandler, + ) -> Result + where + TInEvent: Send + 'static, + TOutEvent: Send + 'static, + TPeerId: Send + 'static, + TMuxer: StreamMuxer + Send + Sync + 'static, + TMuxer::OutboundSubstream: Send, + TTrans: Transport, + TTrans::Error: Send + 'static, + TTrans::ListenerUpgrade: Send + 'static, + { + let upgrade = connection.upgrade.map_err(|err| + PendingConnectionError::Transport(TransportError::Other(err))); + let info = IncomingInfo { + local_addr: &connection.local_addr, + send_back_addr: &connection.send_back_addr, + }; + self.pool.add_incoming(upgrade, handler, info) + } + /// Provides an API similar to `Stream`, except that it cannot error. pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> where @@ -360,14 +389,14 @@ where local_addr, send_back_addr }) => { - return Poll::Ready(NetworkEvent::IncomingConnection( - IncomingConnectionEvent { - listener_id, + return Poll::Ready(NetworkEvent::IncomingConnection { + listener_id, + connection: IncomingConnection { upgrade, local_addr, send_back_addr, - pool: &mut self.pool, - })) + } + }) } Poll::Ready(ListenersEvent::NewAddress { listener_id, listen_addr }) => { return Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) diff --git a/core/src/network/event.rs b/core/src/network/event.rs index c740cf0f..1638f1da 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -27,22 +27,15 @@ use crate::{ ConnectedPoint, ConnectionError, ConnectionHandler, - ConnectionInfo, - ConnectionLimit, Connected, EstablishedConnection, - IncomingInfo, IntoConnectionHandler, ListenerId, PendingConnectionError, - Substream, - pool::Pool, }, - muxing::StreamMuxer, - transport::{Transport, TransportError}, + transport::Transport, }; -use futures::prelude::*; -use std::{error, fmt, hash::Hash, num::NonZeroU32}; +use std::{fmt, num::NonZeroU32}; /// Event that can happen on the `Network`. pub enum NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> @@ -86,7 +79,14 @@ where }, /// A new connection arrived on a listener. - IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>), + /// + /// To accept the connection, see [`Network::accept`](crate::Network::accept). + IncomingConnection { + /// The listener who received the connection. + listener_id: ListenerId, + /// The pending incoming connection. + connection: IncomingConnection, + }, /// An error happened on a connection during its initial handshake. /// @@ -215,10 +215,10 @@ where .field("error", error) .finish() } - NetworkEvent::IncomingConnection(event) => { + NetworkEvent::IncomingConnection { connection, .. } => { f.debug_struct("IncomingConnection") - .field("local_addr", &event.local_addr) - .field("send_back_addr", &event.send_back_addr) + .field("local_addr", &connection.local_addr) + .field("send_back_addr", &connection.send_back_addr) .finish() } NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error } => { @@ -271,103 +271,12 @@ where } } -/// A new connection arrived on a listener. -pub struct IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> -where - TTrans: Transport, - THandler: IntoConnectionHandler, -{ - /// The listener who received the connection. - pub(super) listener_id: ListenerId, - /// The produced upgrade. - pub(super) upgrade: TTrans::ListenerUpgrade, +/// A pending incoming connection produced by a listener. +pub struct IncomingConnection { + /// The connection upgrade. + pub(crate) upgrade: TUpgrade, /// Local connection address. - pub(super) local_addr: Multiaddr, + pub local_addr: Multiaddr, /// Address used to send back data to the remote. - pub(super) send_back_addr: Multiaddr, - /// Reference to the `peers` field of the `Network`. - pub(super) pool: &'a mut Pool< - TInEvent, - TOutEvent, - THandler, - TTrans::Error, - ::Error, - TConnInfo, - TPeerId - >, -} - -impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, TConnInfo, TPeerId> - IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> -where - TTrans: Transport, - TTrans::Error: Send + 'static, - TTrans::ListenerUpgrade: Send + 'static, - THandler: IntoConnectionHandler + Send + 'static, - THandler::Handler: ConnectionHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static, - ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - ::Error: error::Error + Send + 'static, - TMuxer: StreamMuxer + Send + Sync + 'static, - TMuxer::OutboundSubstream: Send, - TMuxer::Substream: Send, - TInEvent: Send + 'static, - TOutEvent: Send + 'static, - TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, - TPeerId: Eq + Hash + Clone + Send + 'static, -{ - /// The ID of the listener with the incoming connection. - pub fn listener_id(&self) -> ListenerId { - self.listener_id - } - - /// Starts processing the incoming connection and sets the handler to use for it. - pub fn accept(self, handler: THandler) -> Result { - self.accept_with_builder(|_| handler) - } - - /// Same as `accept`, but accepts a closure that turns a `IncomingInfo` into a handler. - pub fn accept_with_builder(self, builder: TBuilder) - -> Result - where - TBuilder: FnOnce(IncomingInfo<'_>) -> THandler - { - let handler = builder(self.info()); - let upgrade = self.upgrade - .map_err(|err| PendingConnectionError::Transport(TransportError::Other(err))); - let info = IncomingInfo { - local_addr: &self.local_addr, - send_back_addr: &self.send_back_addr, - }; - self.pool.add_incoming(upgrade, handler, info) - } -} - -impl - IncomingConnectionEvent<'_, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> -where - TTrans: Transport, - THandler: IntoConnectionHandler, -{ - /// Returns the `IncomingInfo` corresponding to this incoming connection. - pub fn info(&self) -> IncomingInfo<'_> { - IncomingInfo { - local_addr: &self.local_addr, - send_back_addr: &self.send_back_addr, - } - } - - /// Local connection address. - pub fn local_addr(&self) -> &Multiaddr { - &self.local_addr - } - - /// Address used to send back data to the dialer. - pub fn send_back_addr(&self) -> &Multiaddr { - &self.send_back_addr - } - - /// Builds the `ConnectedPoint` corresponding to the incoming connection. - pub fn to_connected_point(&self) -> ConnectedPoint { - self.info().to_connected_point() - } + pub send_back_addr: Multiaddr, } diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 638b936f..94792898 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -96,7 +96,7 @@ fn deny_incoming_connec() { async_std::task::block_on(future::poll_fn(|cx| -> Poll> { match swarm1.poll(cx) { - Poll::Ready(NetworkEvent::IncomingConnection(inc)) => drop(inc), + Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => drop(connection), Poll::Ready(_) => unreachable!(), Poll::Pending => (), } @@ -175,9 +175,9 @@ fn dial_self() { return Poll::Ready(Ok(())) } }, - Poll::Ready(NetworkEvent::IncomingConnection(inc)) => { - assert_eq!(*inc.local_addr(), local_address); - inc.accept(TestHandler()).unwrap(); + Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => { + assert_eq!(&connection.local_addr, &local_address); + swarm.accept(connection, TestHandler()).unwrap(); }, Poll::Ready(ev) => { panic!("Unexpected event: {:?}", ev) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index b6bdf634..78149956 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -550,11 +550,11 @@ where TBehaviour: NetworkBehaviour, num_established, }); }, - Poll::Ready(NetworkEvent::IncomingConnection(incoming)) => { + Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => { let handler = this.behaviour.new_handler(); - let local_addr = incoming.local_addr().clone(); - let send_back_addr = incoming.send_back_addr().clone(); - if let Err(e) = incoming.accept(handler.into_node_handler_builder()) { + let local_addr = connection.local_addr.clone(); + let send_back_addr = connection.send_back_addr.clone(); + if let Err(e) = this.network.accept(connection, handler.into_node_handler_builder()) { log::warn!("Incoming connection rejected: {:?}", e); } return Poll::Ready(SwarmEvent::IncomingConnection {