[core] Simplify accepting incoming connections. (#1732)

* Simplify incoming connection handling.

Instead of handing out a mutable borrow to the connection
pool in the `IncomingConnectionEvent`, so one can call
`IncomingConnectionEvent::accept()`, just provide
`Network::accept()`.

* Update docs.

* Update CHANGELOG.
This commit is contained in:
Roman Borschel 2020-09-09 11:21:37 +02:00 committed by GitHub
parent f86bc01e20
commit f0133a0213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 124 deletions

View File

@ -1,5 +1,10 @@
# 0.22.0 [unreleased] # 0.22.0 [unreleased]
- Simplify incoming connection handling. The `IncomingConnectionEvent`
has been removed. Instead, pass the `IncomingConnection` obtained
from `NetworkEvent::IncomingConnection` to `Network::accept()`.
[PR 1732](https://github.com/libp2p/rust-libp2p/pull/1732).
- Allow any closure to be passed as an executor. - Allow any closure to be passed as an executor.
[PR 1686](https://github.com/libp2p/rust-libp2p/pull/1686) [PR 1686](https://github.com/libp2p/rust-libp2p/pull/1686)

View File

@ -21,7 +21,7 @@
mod event; mod event;
pub mod peer; pub mod peer;
pub use event::{NetworkEvent, IncomingConnectionEvent}; pub use event::{NetworkEvent, IncomingConnection};
pub use peer::Peer; pub use peer::Peer;
use crate::{ use crate::{
@ -334,6 +334,35 @@ where
Peer::new(self, peer_id) Peer::new(self, peer_id)
} }
/// Accepts a pending incoming connection obtained via [`NetworkEvent::IncomingConnection`],
/// adding it to the `Network`s connection pool subject to the configured limits.
///
/// Once the connection is established and all transport protocol upgrades
/// completed, the connection is associated with the provided `handler`.
pub fn accept(
&mut self,
connection: IncomingConnection<TTrans::ListenerUpgrade>,
handler: THandler,
) -> Result<ConnectionId, ConnectionLimit>
where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TPeerId: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
{
let upgrade = connection.upgrade.map_err(|err|
PendingConnectionError::Transport(TransportError::Other(err)));
let info = IncomingInfo {
local_addr: &connection.local_addr,
send_back_addr: &connection.send_back_addr,
};
self.pool.add_incoming(upgrade, handler, info)
}
/// Provides an API similar to `Stream`, except that it cannot error. /// Provides an API similar to `Stream`, except that it cannot error.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>> pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>>
where where
@ -360,14 +389,14 @@ where
local_addr, local_addr,
send_back_addr send_back_addr
}) => { }) => {
return Poll::Ready(NetworkEvent::IncomingConnection( return Poll::Ready(NetworkEvent::IncomingConnection {
IncomingConnectionEvent {
listener_id, listener_id,
connection: IncomingConnection {
upgrade, upgrade,
local_addr, local_addr,
send_back_addr, send_back_addr,
pool: &mut self.pool, }
})) })
} }
Poll::Ready(ListenersEvent::NewAddress { listener_id, listen_addr }) => { Poll::Ready(ListenersEvent::NewAddress { listener_id, listen_addr }) => {
return Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr }) return Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr })

View File

@ -27,22 +27,15 @@ use crate::{
ConnectedPoint, ConnectedPoint,
ConnectionError, ConnectionError,
ConnectionHandler, ConnectionHandler,
ConnectionInfo,
ConnectionLimit,
Connected, Connected,
EstablishedConnection, EstablishedConnection,
IncomingInfo,
IntoConnectionHandler, IntoConnectionHandler,
ListenerId, ListenerId,
PendingConnectionError, PendingConnectionError,
Substream,
pool::Pool,
}, },
muxing::StreamMuxer, transport::Transport,
transport::{Transport, TransportError},
}; };
use futures::prelude::*; use std::{fmt, num::NonZeroU32};
use std::{error, fmt, hash::Hash, num::NonZeroU32};
/// Event that can happen on the `Network`. /// Event that can happen on the `Network`.
pub enum NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> pub enum NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
@ -86,7 +79,14 @@ where
}, },
/// A new connection arrived on a listener. /// A new connection arrived on a listener.
IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>), ///
/// To accept the connection, see [`Network::accept`](crate::Network::accept).
IncomingConnection {
/// The listener who received the connection.
listener_id: ListenerId,
/// The pending incoming connection.
connection: IncomingConnection<TTrans::ListenerUpgrade>,
},
/// An error happened on a connection during its initial handshake. /// An error happened on a connection during its initial handshake.
/// ///
@ -215,10 +215,10 @@ where
.field("error", error) .field("error", error)
.finish() .finish()
} }
NetworkEvent::IncomingConnection(event) => { NetworkEvent::IncomingConnection { connection, .. } => {
f.debug_struct("IncomingConnection") f.debug_struct("IncomingConnection")
.field("local_addr", &event.local_addr) .field("local_addr", &connection.local_addr)
.field("send_back_addr", &event.send_back_addr) .field("send_back_addr", &connection.send_back_addr)
.finish() .finish()
} }
NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error } => { NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error } => {
@ -271,103 +271,12 @@ where
} }
} }
/// A new connection arrived on a listener. /// A pending incoming connection produced by a listener.
pub struct IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> pub struct IncomingConnection<TUpgrade> {
where /// The connection upgrade.
TTrans: Transport, pub(crate) upgrade: TUpgrade,
THandler: IntoConnectionHandler<TConnInfo>,
{
/// The listener who received the connection.
pub(super) listener_id: ListenerId,
/// The produced upgrade.
pub(super) upgrade: TTrans::ListenerUpgrade,
/// Local connection address. /// Local connection address.
pub(super) local_addr: Multiaddr, pub local_addr: Multiaddr,
/// Address used to send back data to the remote. /// Address used to send back data to the remote.
pub(super) send_back_addr: Multiaddr, pub send_back_addr: Multiaddr,
/// Reference to the `peers` field of the `Network`.
pub(super) pool: &'a mut Pool<
TInEvent,
TOutEvent,
THandler,
TTrans::Error,
<THandler::Handler as ConnectionHandler>::Error,
TConnInfo,
TPeerId
>,
}
impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, TConnInfo, TPeerId>
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
{
/// The ID of the listener with the incoming connection.
pub fn listener_id(&self) -> ListenerId {
self.listener_id
}
/// Starts processing the incoming connection and sets the handler to use for it.
pub fn accept(self, handler: THandler) -> Result<ConnectionId, ConnectionLimit> {
self.accept_with_builder(|_| handler)
}
/// Same as `accept`, but accepts a closure that turns a `IncomingInfo` into a handler.
pub fn accept_with_builder<TBuilder>(self, builder: TBuilder)
-> Result<ConnectionId, ConnectionLimit>
where
TBuilder: FnOnce(IncomingInfo<'_>) -> THandler
{
let handler = builder(self.info());
let upgrade = self.upgrade
.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
let info = IncomingInfo {
local_addr: &self.local_addr,
send_back_addr: &self.send_back_addr,
};
self.pool.add_incoming(upgrade, handler, info)
}
}
impl<TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
IncomingConnectionEvent<'_, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
where
TTrans: Transport,
THandler: IntoConnectionHandler<TConnInfo>,
{
/// Returns the `IncomingInfo` corresponding to this incoming connection.
pub fn info(&self) -> IncomingInfo<'_> {
IncomingInfo {
local_addr: &self.local_addr,
send_back_addr: &self.send_back_addr,
}
}
/// Local connection address.
pub fn local_addr(&self) -> &Multiaddr {
&self.local_addr
}
/// Address used to send back data to the dialer.
pub fn send_back_addr(&self) -> &Multiaddr {
&self.send_back_addr
}
/// Builds the `ConnectedPoint` corresponding to the incoming connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
self.info().to_connected_point()
}
} }

View File

@ -96,7 +96,7 @@ fn deny_incoming_connec() {
async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> { async_std::task::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
match swarm1.poll(cx) { match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => drop(inc), Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => drop(connection),
Poll::Ready(_) => unreachable!(), Poll::Ready(_) => unreachable!(),
Poll::Pending => (), Poll::Pending => (),
} }
@ -175,9 +175,9 @@ fn dial_self() {
return Poll::Ready(Ok(())) return Poll::Ready(Ok(()))
} }
}, },
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => { Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
assert_eq!(*inc.local_addr(), local_address); assert_eq!(&connection.local_addr, &local_address);
inc.accept(TestHandler()).unwrap(); swarm.accept(connection, TestHandler()).unwrap();
}, },
Poll::Ready(ev) => { Poll::Ready(ev) => {
panic!("Unexpected event: {:?}", ev) panic!("Unexpected event: {:?}", ev)

View File

@ -550,11 +550,11 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
num_established, num_established,
}); });
}, },
Poll::Ready(NetworkEvent::IncomingConnection(incoming)) => { Poll::Ready(NetworkEvent::IncomingConnection { connection, .. }) => {
let handler = this.behaviour.new_handler(); let handler = this.behaviour.new_handler();
let local_addr = incoming.local_addr().clone(); let local_addr = connection.local_addr.clone();
let send_back_addr = incoming.send_back_addr().clone(); let send_back_addr = connection.send_back_addr.clone();
if let Err(e) = incoming.accept(handler.into_node_handler_builder()) { if let Err(e) = this.network.accept(connection, handler.into_node_handler_builder()) {
log::warn!("Incoming connection rejected: {:?}", e); log::warn!("Incoming connection rejected: {:?}", e);
} }
return Poll::Ready(SwarmEvent::IncomingConnection { return Poll::Ready(SwarmEvent::IncomingConnection {