Multiple connections per peer (#1440)

* Allow multiple connections per peer in libp2p-core.

Instead of trying to enforce a single connection per peer,
which involves quite a bit of additional complexity e.g.
to prioritise simultaneously opened connections and can
have other undesirable consequences [1], we now
make multiple connections per peer a feature.

The gist of these changes is as follows:

The concept of a "node" with an implicit 1-1 correspondence
to a connection has been replaced with the "first-class"
concept of a "connection". The code from `src/nodes` has moved
(with varying degrees of modification) to `src/connection`.
A `HandledNode` has become a `Connection`, a `NodeHandler` a
`ConnectionHandler`, the `CollectionStream` was the basis for
the new `connection::Pool`, and so forth.

Conceptually, a `Network` contains a `connection::Pool` which
in turn internally employs the `connection::Manager` for
handling the background `connection::manager::Task`s, one
per connection, as before. These are all considered implementation
details. On the public API, `Peer`s are managed as before through
the `Network`, except now the API has changed with the shift of focus
to (potentially multiple) connections per peer. The `NetworkEvent`s have
accordingly also undergone changes.

The Swarm APIs remain largely unchanged, except for the fact that
`inject_replaced` is no longer called. It may now practically happen
that multiple `ProtocolsHandler`s are associated with a single
`NetworkBehaviour`, one per connection. If implementations of
`NetworkBehaviour` rely somehow on communicating with exactly
one `ProtocolsHandler`, this may cause issues, but it is unlikely.

[1]: https://github.com/paritytech/substrate/issues/4272

* Fix intra-rustdoc links.

* Update core/src/connection/pool.rs

Co-Authored-By: Max Inden <mail@max-inden.de>

* Address some review feedback and fix doc links.

* Allow responses to be sent on the same connection.

* Remove unnecessary remainders of inject_replaced.

* Update swarm/src/behaviour.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Update swarm/src/lib.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Update core/src/connection/manager.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Update core/src/connection/manager.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Update core/src/connection/pool.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Incorporate more review feedback.

* Move module declaration below imports.

* Update core/src/connection/manager.rs

Co-Authored-By: Toralf Wittner <tw@dtex.org>

* Update core/src/connection/manager.rs

Co-Authored-By: Toralf Wittner <tw@dtex.org>

* Simplify as per review.

* Fix rustoc link.

* Add try_notify_handler and simplify.

* Relocate DialingConnection and DialingAttempt.

For better visibility constraints.

* Small cleanup.

* Small cleanup. More robust EstablishedConnectionIter.

* Clarify semantics of `DialingPeer::connect`.

* Don't call inject_disconnected on InvalidPeerId.

To preserve the previous behavior and ensure calls to
`inject_disconnected` are always paired with calls to
`inject_connected`.

* Provide public ConnectionId constructor.

Mainly needed for testing purposes, e.g. in substrate.

* Move the established connection limit check to the right place.

* Clean up connection error handling.

Separate connection errors into those occuring during
connection setup or upon rejecting a newly established
connection (the `PendingConnectionError`) and those
errors occurring on previously established connections,
i.e. for which a `ConnectionEstablished` event has
been emitted by the connection pool earlier.

* Revert change in log level and clarify an invariant.

* Remove inject_replaced entirely.

* Allow notifying all connection handlers.

Thereby simplify by introducing a new enum `NotifyHandler`,
used with a single constructor `NetworkBehaviourAction::NotifyHandler`.

* Finishing touches.

Small API simplifications and code deduplication.
Some more useful debug logging.

Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
Co-authored-by: Toralf Wittner <tw@dtex.org>
This commit is contained in:
Roman Borschel
2020-03-04 13:49:25 +01:00
committed by GitHub
parent 7cbb3cf8f3
commit 8337687b3a
45 changed files with 4560 additions and 4704 deletions

View File

@ -0,0 +1,115 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::connection::ConnectionLimit;
use crate::transport::TransportError;
use std::{io, fmt};
/// Errors that can occur in the context of an established `Connection`.
#[derive(Debug)]
pub enum ConnectionError<THandlerErr> {
/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
/// The connection handler produced an error.
Handler(THandlerErr),
}
impl<THandlerErr> fmt::Display
for ConnectionError<THandlerErr>
where
THandlerErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionError::IO(err) =>
write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) =>
write!(f, "Connection error: Handler error: {}", err),
}
}
}
impl<THandlerErr> std::error::Error
for ConnectionError<THandlerErr>
where
THandlerErr: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
}
}
}
/// Errors that can occur in the context of a pending `Connection`.
#[derive(Debug)]
pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s).
Transport(TransportError<TTransErr>),
/// The peer identity obtained on the connection did not
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,
/// The pending connection was successfully negotiated but dropped
/// because the connection limit for a peer has been reached.
ConnectionLimit(ConnectionLimit),
/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
}
impl<TTransErr> fmt::Display
for PendingConnectionError<TTransErr>
where
TTransErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PendingConnectionError::IO(err) =>
write!(f, "Pending connection: I/O error: {}", err),
PendingConnectionError::Transport(err) =>
write!(f, "Pending connection: Transport error: {}", err),
PendingConnectionError::InvalidPeerId =>
write!(f, "Pending connection: Invalid peer ID."),
PendingConnectionError::ConnectionLimit(l) =>
write!(f, "Pending connection: Connection limit: {}.", l)
}
}
}
impl<TTransErr> std::error::Error
for PendingConnectionError<TTransErr>
where
TTransErr: std::error::Error + 'static
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::ConnectionLimit(..) => None,
}
}
}

View File

@ -0,0 +1,127 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::PeerId;
use std::{task::Context, task::Poll};
use super::{Connected, SubstreamEndpoint};
/// The interface of a connection handler.
///
/// Each handler is responsible for a single connection.
pub trait ConnectionHandler {
/// The inbound type of events used to notify the handler through the `Network`.
///
/// See also [`EstablishedConnection::notify_handler`](super::EstablishedConnection::notify_handler)
/// and [`ConnectionHandler::inject_event`].
type InEvent;
/// The outbound type of events that the handler emits to the `Network`
/// through [`ConnectionHandler::poll`].
///
/// See also [`NetworkEvent::ConnectionEvent`](crate::network::NetworkEvent::ConnectionEvent).
type OutEvent;
/// The type of errors that the handler can produce when polled by the `Network`.
type Error;
/// The type of the substream containing the data.
type Substream;
/// Information about a substream. Can be sent to the handler through a `SubstreamEndpoint`,
/// and will be passed back in `inject_substream` or `inject_outbound_closed`.
type OutboundOpenInfo;
/// Sends a new substream to the handler.
///
/// The handler is responsible for upgrading the substream to whatever protocol it wants.
///
/// # Panic
///
/// Implementations are allowed to panic in the case of dialing if the `user_data` in
/// `endpoint` doesn't correspond to what was returned earlier when polling, or is used
/// multiple times.
fn inject_substream(&mut self, substream: Self::Substream, endpoint: SubstreamEndpoint<Self::OutboundOpenInfo>);
/// Notifies the handler of an event.
fn inject_event(&mut self, event: Self::InEvent);
/// Polls the handler for events.
///
/// Returning an error will close the connection to the remote.
fn poll(&mut self, cx: &mut Context)
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>;
}
/// Prototype for a `ConnectionHandler`.
pub trait IntoConnectionHandler<TConnInfo = PeerId> {
/// The node handler.
type Handler: ConnectionHandler;
/// Builds the node handler.
///
/// The implementation is given a `Connected` value that holds information about
/// the newly established connection for which a handler should be created.
fn into_handler(self, connected: &Connected<TConnInfo>) -> Self::Handler;
}
impl<T, TConnInfo> IntoConnectionHandler<TConnInfo> for T
where
T: ConnectionHandler
{
type Handler = Self;
fn into_handler(self, _: &Connected<TConnInfo>) -> Self {
self
}
}
/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote.
OutboundSubstreamRequest(TOutboundOpenInfo),
/// Other event.
Custom(TCustom),
}
/// Event produced by a handler.
impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
/// If this is `OutboundSubstreamRequest`, maps the content to something else.
pub fn map_outbound_open_info<F, I>(self, map: F) -> ConnectionHandlerEvent<I, TCustom>
where F: FnOnce(TOutboundOpenInfo) -> I
{
match self {
ConnectionHandlerEvent::OutboundSubstreamRequest(val) => {
ConnectionHandlerEvent::OutboundSubstreamRequest(map(val))
},
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val),
}
}
/// If this is `Custom`, maps the content to something else.
pub fn map_custom<F, I>(self, map: F) -> ConnectionHandlerEvent<TOutboundOpenInfo, I>
where F: FnOnce(TCustom) -> I
{
match self {
ConnectionHandlerEvent::OutboundSubstreamRequest(val) => {
ConnectionHandlerEvent::OutboundSubstreamRequest(val)
},
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(map(val)),
}
}
}

View File

@ -0,0 +1,479 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Manage listening on multiple multiaddresses at once.
use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
use futures::{prelude::*, task::Context, task::Poll};
use log::debug;
use smallvec::SmallVec;
use std::{collections::VecDeque, fmt, pin::Pin};
/// Implementation of `futures::Stream` that allows listening on multiaddresses.
///
/// To start using a `ListenersStream`, create one with `new` by passing an implementation of
/// `Transport`. This `Transport` will be used to start listening, therefore you want to pass
/// a `Transport` that supports the protocols you wish you listen on.
///
/// Then, call `ListenerStream::listen_on` for all addresses you want to start listening on.
///
/// The `ListenersStream` never ends and never produces errors. If a listener errors or closes,
/// an event is generated on the stream and the listener is then dropped, but the `ListenersStream`
/// itself continues.
///
/// # Example
///
/// ```no_run
/// # fn main() {
/// use futures::prelude::*;
/// use libp2p_core::connection::{ListenersEvent, ListenersStream};
///
/// let mut listeners = ListenersStream::new(libp2p_tcp::TcpConfig::new());
///
/// // Ask the `listeners` to start listening on the given multiaddress.
/// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
///
/// // The `listeners` will now generate events when polled.
/// futures::executor::block_on(async move {
/// while let Some(event) = listeners.next().await {
/// match event {
/// ListenersEvent::NewAddress { listener_id, listen_addr } => {
/// println!("Listener {:?} is listening at address {}", listener_id, listen_addr);
/// },
/// ListenersEvent::AddressExpired { listener_id, listen_addr } => {
/// println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr);
/// },
/// ListenersEvent::Closed { listener_id, .. } => {
/// println!("Listener {:?} has been closed", listener_id);
/// },
/// ListenersEvent::Error { listener_id, error } => {
/// println!("Listener {:?} has experienced an error: {}", listener_id, error);
/// },
/// ListenersEvent::Incoming { listener_id, upgrade, local_addr, .. } => {
/// println!("Listener {:?} has a new connection on {}", listener_id, local_addr);
/// // We don't do anything with the newly-opened connection, but in a real-life
/// // program you probably want to use it!
/// drop(upgrade);
/// },
/// }
/// }
/// })
/// # }
/// ```
pub struct ListenersStream<TTrans>
where
TTrans: Transport,
{
/// Transport used to spawn listeners.
transport: TTrans,
/// All the active listeners.
/// The `Listener` struct contains a stream that we want to be pinned. Since the `VecDeque`
/// can be resized, the only way is to use a `Pin<Box<>>`.
listeners: VecDeque<Pin<Box<Listener<TTrans>>>>,
/// The next listener ID to assign.
next_id: ListenerId
}
/// The ID of a single listener.
///
/// It is part of most [`ListenersEvent`]s and can be used to remove
/// individual listeners from the [`ListenersStream`].
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);
/// A single active listener.
#[pin_project::pin_project]
#[derive(Debug)]
struct Listener<TTrans>
where
TTrans: Transport,
{
/// The ID of this listener.
id: ListenerId,
/// The object that actually listens.
#[pin]
listener: TTrans::Listener,
/// Addresses it is listening on.
addresses: SmallVec<[Multiaddr; 4]>
}
/// Event that can happen on the `ListenersStream`.
pub enum ListenersEvent<TTrans>
where
TTrans: Transport,
{
/// A new address is being listened on.
NewAddress {
/// The listener that is listening on the new address.
listener_id: ListenerId,
/// The new address that is being listened on.
listen_addr: Multiaddr
},
/// An address is no longer being listened on.
AddressExpired {
/// The listener that is no longer listening on the address.
listener_id: ListenerId,
/// The new address that is being listened on.
listen_addr: Multiaddr
},
/// A connection is incoming on one of the listeners.
Incoming {
/// The listener that produced the upgrade.
listener_id: ListenerId,
/// The produced upgrade.
upgrade: TTrans::ListenerUpgrade,
/// Local connection address.
local_addr: Multiaddr,
/// Address used to send back data to the incoming client.
send_back_addr: Multiaddr,
},
/// A listener closed.
Closed {
/// The ID of the listener that closed.
listener_id: ListenerId,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), TTrans::Error>,
},
/// A listener errored.
///
/// The listener will continue to be polled for new events and the event
/// is for informational purposes only.
Error {
/// The ID of the listener that errored.
listener_id: ListenerId,
/// The error value.
error: TTrans::Error,
}
}
impl<TTrans> ListenersStream<TTrans>
where
TTrans: Transport,
{
/// Starts a new stream of listeners.
pub fn new(transport: TTrans) -> Self {
ListenersStream {
transport,
listeners: VecDeque::new(),
next_id: ListenerId(1)
}
}
/// Same as `new`, but pre-allocates enough memory for the given number of
/// simultaneous listeners.
pub fn with_capacity(transport: TTrans, capacity: usize) -> Self {
ListenersStream {
transport,
listeners: VecDeque::with_capacity(capacity),
next_id: ListenerId(1)
}
}
/// Start listening on a multiaddress.
///
/// Returns an error if the transport doesn't support the given multiaddress.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>>
where
TTrans: Clone,
{
let listener = self.transport.clone().listen_on(addr)?;
self.listeners.push_back(Box::pin(Listener {
id: self.next_id,
listener,
addresses: SmallVec::new()
}));
let id = self.next_id;
self.next_id = ListenerId(self.next_id.0 + 1);
Ok(id)
}
/// Remove the listener matching the given `ListenerId`.
///
/// Return `Ok(())` if a listener with this ID was in the list.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
self.listeners.remove(i);
Ok(())
} else {
Err(())
}
}
/// Returns the transport passed when building this object.
pub fn transport(&self) -> &TTrans {
&self.transport
}
/// Returns an iterator that produces the list of addresses we're listening on.
pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.iter().flat_map(|l| l.addresses.iter())
}
/// Provides an API similar to `Stream`, except that it cannot end.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<ListenersEvent<TTrans>> {
// We remove each element from `listeners` one by one and add them back.
let mut remaining = self.listeners.len();
while let Some(mut listener) = self.listeners.pop_back() {
let mut listener_project = listener.as_mut().project();
match TryStream::try_poll_next(listener_project.listener.as_mut(), cx) {
Poll::Pending => {
self.listeners.push_front(listener);
remaining -= 1;
if remaining == 0 { break }
}
Poll::Ready(Some(Ok(ListenerEvent::Upgrade { upgrade, local_addr, remote_addr }))) => {
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::Incoming {
listener_id: id,
upgrade,
local_addr,
send_back_addr: remote_addr
})
}
Poll::Ready(Some(Ok(ListenerEvent::NewAddress(a)))) => {
if listener_project.addresses.contains(&a) {
debug!("Transport has reported address {} multiple times", a)
}
if !listener_project.addresses.contains(&a) {
listener_project.addresses.push(a.clone());
}
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::NewAddress {
listener_id: id,
listen_addr: a
})
}
Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(a)))) => {
listener_project.addresses.retain(|x| x != &a);
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::AddressExpired {
listener_id: id,
listen_addr: a
})
}
Poll::Ready(Some(Ok(ListenerEvent::Error(error)))) => {
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::Error {
listener_id: id,
error,
})
}
Poll::Ready(None) => {
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
reason: Ok(()),
})
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
reason: Err(err),
})
}
}
}
// We register the current task to be woken up if a new listener is added.
Poll::Pending
}
}
impl<TTrans> Stream for ListenersStream<TTrans>
where
TTrans: Transport,
{
type Item = ListenersEvent<TTrans>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
ListenersStream::poll(self, cx).map(Option::Some)
}
}
impl<TTrans> Unpin for ListenersStream<TTrans>
where
TTrans: Transport,
{
}
impl<TTrans> fmt::Debug for ListenersStream<TTrans>
where
TTrans: Transport + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ListenersStream")
.field("transport", &self.transport)
.field("listen_addrs", &self.listen_addrs().collect::<Vec<_>>())
.finish()
}
}
impl<TTrans> fmt::Debug for ListenersEvent<TTrans>
where
TTrans: Transport,
TTrans::Error: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
ListenersEvent::NewAddress { listener_id, listen_addr } => f
.debug_struct("ListenersEvent::NewAddress")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::AddressExpired { listener_id, listen_addr } => f
.debug_struct("ListenersEvent::AddressExpired")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::Incoming { listener_id, local_addr, .. } => f
.debug_struct("ListenersEvent::Incoming")
.field("listener_id", listener_id)
.field("local_addr", local_addr)
.finish(),
ListenersEvent::Closed { listener_id, reason } => f
.debug_struct("ListenersEvent::Closed")
.field("listener_id", listener_id)
.field("reason", reason)
.finish(),
ListenersEvent::Error { listener_id, error } => f
.debug_struct("ListenersEvent::Error")
.field("listener_id", listener_id)
.field("error", error)
.finish()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::transport;
#[test]
fn incoming_event() {
async_std::task::block_on(async move {
let mem_transport = transport::MemoryTransport::default();
let mut listeners = ListenersStream::new(mem_transport);
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
let address = {
let event = listeners.next().await.unwrap();
if let ListenersEvent::NewAddress { listen_addr, .. } = event {
listen_addr
} else {
panic!("Was expecting the listen address to be reported")
}
};
let address2 = address.clone();
async_std::task::spawn(async move {
mem_transport.dial(address2).unwrap().await.unwrap();
});
match listeners.next().await.unwrap() {
ListenersEvent::Incoming { local_addr, send_back_addr, .. } => {
assert_eq!(local_addr, address);
assert_eq!(send_back_addr, address);
},
_ => panic!()
}
});
}
#[test]
fn listener_event_error_isnt_fatal() {
// Tests that a listener continues to be polled even after producing
// a `ListenerEvent::Error`.
#[derive(Clone)]
struct DummyTrans;
impl transport::Transport for DummyTrans {
type Output = ();
type Error = std::io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
Ok(Box::pin(stream::unfold((), |()| async move {
Some((Ok(ListenerEvent::Error(std::io::Error::from(std::io::ErrorKind::Other))), ()))
})))
}
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
}
async_std::task::block_on(async move {
let transport = DummyTrans;
let mut listeners = ListenersStream::new(transport);
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
for _ in 0..10 {
match listeners.next().await.unwrap() {
ListenersEvent::Error { .. } => {},
_ => panic!()
}
}
});
}
#[test]
fn listener_error_is_fatal() {
// Tests that a listener stops after producing an error on the stream itself.
#[derive(Clone)]
struct DummyTrans;
impl transport::Transport for DummyTrans {
type Output = ();
type Error = std::io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
Ok(Box::pin(stream::unfold((), |()| async move {
Some((Err(std::io::Error::from(std::io::ErrorKind::Other)), ()))
})))
}
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
}
async_std::task::block_on(async move {
let transport = DummyTrans;
let mut listeners = ListenersStream::new(transport);
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
match listeners.next().await.unwrap() {
ListenersEvent::Closed { .. } => {},
_ => panic!()
}
});
}
}

View File

@ -0,0 +1,490 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{
Executor,
muxing::StreamMuxer,
};
use fnv::FnvHashMap;
use futures::{
prelude::*,
channel::mpsc,
stream::FuturesUnordered
};
use std::{
collections::hash_map,
error,
fmt,
pin::Pin,
task::{Context, Poll},
};
use super::{
Connected,
Connection,
ConnectionError,
ConnectionHandler,
IntoConnectionHandler,
PendingConnectionError,
Substream
};
use task::{Task, TaskId};
mod task;
// Implementation Notes
// ====================
//
// A `Manager` is decoupled from the background tasks through channels.
// The state of a `Manager` therefore "lags behind" the progress of
// the tasks -- it is only made aware of progress in the background tasks
// when it is `poll()`ed.
//
// A `Manager` is ignorant of substreams and does not emit any events
// related to specific substreams.
//
// A `Manager` is unaware of any association between connections and peers
// / peer identities (i.e. the type parameter `C` is completely opaque).
//
// There is a 1-1 correspondence between (internal) task IDs and (public)
// connection IDs, i.e. the task IDs are "re-exported" as connection IDs
// by the manager. The notion of a (background) task is internal to the
// manager.
/// The result of a pending connection attempt.
type ConnectResult<C, M, TE> = Result<(Connected<C>, M), PendingConnectionError<TE>>;
/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConnectionId(TaskId);
impl ConnectionId {
/// Creates a `ConnectionId` from a non-negative integer.
///
/// This is primarily useful for creating connection IDs
/// in test environments. There is in general no guarantee
/// that all connection IDs are based on non-negative integers.
pub fn new(id: usize) -> Self {
ConnectionId(TaskId(id))
}
}
/// A connection `Manager` orchestrates the I/O of a set of connections.
pub struct Manager<I, O, H, E, HE, C> {
/// The tasks of the managed connections.
///
/// Each managed connection is associated with a (background) task
/// spawned onto an executor. Each `TaskInfo` in `tasks` is linked to such a
/// background task via a channel. Closing that channel (i.e. dropping
/// the sender in the associated `TaskInfo`) stops the background task,
/// which will attempt to gracefully close the connection.
tasks: FnvHashMap<TaskId, TaskInfo<I, C>>,
/// Next available identifier for a new connection / task.
next_task_id: TaskId,
/// The executor to use for running the background tasks. If `None`,
/// the tasks are kept in `local_spawns` instead and polled on the
/// current thread when the manager is polled for new events.
executor: Option<Box<dyn Executor + Send>>,
/// If no `executor` is configured, tasks are kept in this set and
/// polled on the current thread when the manager is polled for new events.
local_spawns: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
/// Sender distributed to managed tasks for reporting events back
/// to the manager.
events_tx: mpsc::Sender<task::Event<O, H, E, HE, C>>,
/// Receiver for events reported from managed tasks.
events_rx: mpsc::Receiver<task::Event<O, H, E, HE, C>>
}
impl<I, O, H, E, HE, C> fmt::Debug for Manager<I, O, H, E, HE, C>
where
C: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_map()
.entries(self.tasks.iter().map(|(id, task)| (id, &task.state)))
.finish()
}
}
/// Internal information about a running task.
///
/// Contains the sender to deliver event messages to the task, and
/// the associated user data.
#[derive(Debug)]
struct TaskInfo<I, C> {
/// channel endpoint to send messages to the task
sender: mpsc::Sender<task::Command<I>>,
/// The state of the task as seen by the `Manager`.
state: TaskState<C>,
}
/// Internal state of a running task as seen by the `Manager`.
#[derive(Debug, Clone, PartialEq, Eq)]
enum TaskState<C> {
/// The connection is being established.
Pending,
/// The connection is established.
Established(Connected<C>),
}
/// Events produced by the [`Manager`].
#[derive(Debug)]
pub enum Event<'a, I, O, H, TE, HE, C> {
/// A connection attempt has failed.
PendingConnectionError {
/// The connection ID.
///
/// As a result of the error, the pending connection has been removed
/// from the `Manager` and is being closed. Hence this ID will
/// no longer resolve to a valid entry in the manager.
id: ConnectionId,
/// What happened.
error: PendingConnectionError<TE>,
/// The handler that was supposed to handle the failed connection.
handler: H
},
/// An established connection has encountered an error.
ConnectionError {
/// The connection ID.
///
/// As a result of the error, the connection has been removed
/// from the `Manager` and is being closed. Hence this ID will
/// no longer resolve to a valid entry in the manager.
id: ConnectionId,
/// Information about the connection that encountered the error.
connected: Connected<C>,
/// The error that occurred.
error: ConnectionError<HE>,
},
/// A connection has been established.
ConnectionEstablished {
/// The entry associated with the new connection.
entry: EstablishedEntry<'a, I, C>,
},
/// A connection handler has produced an event.
ConnectionEvent {
/// The entry associated with the connection that produced the event.
entry: EstablishedEntry<'a, I, C>,
/// The produced event.
event: O
}
}
impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
/// Creates a new connection manager.
pub fn new(executor: Option<Box<dyn Executor + Send>>) -> Self {
let (tx, rx) = mpsc::channel(1);
Self {
tasks: FnvHashMap::default(),
next_task_id: TaskId(0),
executor,
local_spawns: FuturesUnordered::new(),
events_tx: tx,
events_rx: rx
}
}
/// Adds to the manager a future that tries to reach a node.
///
/// This method spawns a task dedicated to resolving this future and
/// processing the node's events.
pub fn add_pending<F, M>(&mut self, future: F, handler: H) -> ConnectionId
where
I: Send + 'static,
O: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
C: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
F: Future<Output = ConnectResult<C, M, TE>> + Send + 'static,
H: IntoConnectionHandler<C> + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
OutEvent = O,
Error = HE
> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
let (tx, rx) = mpsc::channel(4);
self.tasks.insert(task_id, TaskInfo { sender: tx, state: TaskState::Pending });
let task = Box::pin(Task::pending(task_id, self.events_tx.clone(), rx, future, handler));
if let Some(executor) = &mut self.executor {
executor.exec(task);
} else {
self.local_spawns.push(task);
}
ConnectionId(task_id)
}
/// Adds an existing connection to the manager.
pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected<C>) -> ConnectionId
where
H: IntoConnectionHandler<C> + Send + 'static,
H::Handler: ConnectionHandler<
Substream = Substream<M>,
InEvent = I,
OutEvent = O,
Error = HE
> + Send + 'static,
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TE: error::Error + Send + 'static,
HE: error::Error + Send + 'static,
I: Send + 'static,
O: Send + 'static,
M: StreamMuxer + Send + Sync + 'static,
M::OutboundSubstream: Send + 'static,
C: Send + 'static
{
let task_id = self.next_task_id;
self.next_task_id.0 += 1;
let (tx, rx) = mpsc::channel(4);
self.tasks.insert(task_id, TaskInfo {
sender: tx, state: TaskState::Established(info)
});
let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _, _, _, _>>> =
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));
if let Some(executor) = &mut self.executor {
executor.exec(task);
} else {
self.local_spawns.push(task);
}
ConnectionId(task_id)
}
/// Notifies the handlers of all managed connections of an event.
///
/// This function is "atomic", in the sense that if `Poll::Pending` is
/// returned then no event has been sent.
#[must_use]
pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()>
where
I: Clone
{
for task in self.tasks.values_mut() {
if let Poll::Pending = task.sender.poll_ready(cx) { // (*)
return Poll::Pending;
}
}
for (id, task) in self.tasks.iter_mut() {
let cmd = task::Command::NotifyHandler(event.clone());
match task.sender.start_send(cmd) {
Ok(()) => {},
Err(e) if e.is_full() => unreachable!("by (*)"),
Err(e) if e.is_disconnected() => {
// The background task ended. The manager will eventually be
// informed through an `Error` event from the task.
log::trace!("Connection dropped: {:?}", id);
},
Err(e) => {
log::error!("Unexpected error: {:?}", e);
}
}
}
Poll::Ready(())
}
/// Gets an entry for a managed connection, if it exists.
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I, C>> {
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
Some(Entry::new(task))
} else {
None
}
}
/// Checks whether an established connection with the given ID is currently managed.
pub fn is_established(&self, id: &ConnectionId) -> bool {
match self.tasks.get(&id.0) {
Some(TaskInfo { state: TaskState::Established(..), .. }) => true,
_ => false
}
}
/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context) -> Poll<Event<'a, I, O, H, TE, HE, C>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {}
// Poll for the first event for which the manager still has a registered task, if any.
let event = loop {
match Stream::poll_next(Pin::new(&mut self.events_rx), cx) {
Poll::Ready(Some(event)) => {
if self.tasks.contains_key(event.id()) { // (1)
break event
}
}
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => unreachable!("Manager holds both sender and receiver."),
}
};
if let hash_map::Entry::Occupied(mut task) = self.tasks.entry(*event.id()) {
Poll::Ready(match event {
task::Event::Notify { id: _, event } =>
Event::ConnectionEvent {
entry: EstablishedEntry { task },
event
},
task::Event::Established { id: _, info } => { // (2)
task.get_mut().state = TaskState::Established(info); // (3)
Event::ConnectionEstablished {
entry: EstablishedEntry { task },
}
}
task::Event::Failed { id, error, handler } => {
let id = ConnectionId(id);
let _ = task.remove();
Event::PendingConnectionError { id, error, handler }
}
task::Event::Error { id, error } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) =>
Event::ConnectionError { id, connected, error },
TaskState::Pending => unreachable!(
"`Event::Error` implies (2) occurred on that task and thus (3)."
),
}
}
})
} else {
unreachable!("By (1)")
}
}
}
/// An entry for a connection in the manager.
#[derive(Debug)]
pub enum Entry<'a, I, C> {
Pending(PendingEntry<'a, I, C>),
Established(EstablishedEntry<'a, I, C>)
}
impl<'a, I, C> Entry<'a, I, C> {
fn new(task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I, C>>) -> Self {
match &task.get().state {
TaskState::Pending => Entry::Pending(PendingEntry { task }),
TaskState::Established(_) => Entry::Established(EstablishedEntry { task })
}
}
}
/// An entry for a managed connection that is considered established.
#[derive(Debug)]
pub struct EstablishedEntry<'a, I, C> {
task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I, C>>,
}
impl<'a, I, C> EstablishedEntry<'a, I, C> {
/// (Asynchronously) sends an event to the connection handler.
///
/// If the handler is not ready to receive the event, either because
/// it is busy or the connection is about to close, the given event
/// is returned with an `Err`.
///
/// If execution of this method is preceded by successful execution of
/// `poll_ready_notify_handler` without another intervening execution
/// of `notify_handler`, it only fails if the connection is now about
/// to close.
///
/// > **Note**: As this method does not take a `Context`, the current
/// > task _may not be notified_ if sending the event fails due to
/// > the connection handler not being ready at this time.
pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
let cmd = task::Command::NotifyHandler(event);
self.task.get_mut().sender.try_send(cmd)
.map_err(|e| match e.into_inner() {
task::Command::NotifyHandler(event) => event
})
}
/// Checks if `notify_handler` is ready to accept an event.
///
/// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
///
/// Returns `Err(())` if the background task associated with the connection
/// is terminating and the connection is about to close.
pub fn poll_ready_notify_handler(&mut self, cx: &mut Context) -> Poll<Result<(),()>> {
self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
}
/// Obtains information about the established connection.
pub fn connected(&self) -> &Connected<C> {
match &self.task.get().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
}
}
/// Closes the connection represented by this entry,
/// returning the connection information.
pub fn close(self) -> Connected<C> {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
}
}
/// Returns the connection id.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
}
}
/// An entry for a managed connection that is currently being established
/// (i.e. pending).
#[derive(Debug)]
pub struct PendingEntry<'a, I, C> {
task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I, C>>
}
impl<'a, I, C> PendingEntry<'a, I, C> {
/// Returns the connection id.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
}
/// Aborts the pending connection attempt.
pub fn abort(self) {
self.task.remove();
}
}

View File

@ -0,0 +1,341 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{
muxing::StreamMuxer,
connection::{
Close,
Connected,
Connection,
ConnectionError,
ConnectionHandler,
IntoConnectionHandler,
PendingConnectionError,
Substream,
},
};
use futures::{prelude::*, channel::mpsc, stream};
use std::{pin::Pin, task::Context, task::Poll};
use super::ConnectResult;
/// Identifier of a [`Task`] in a [`Manager`](super::Manager).
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(pub(super) usize);
/// Commands that can be sent to a [`Task`].
#[derive(Debug)]
pub enum Command<T> {
/// Notify the connection handler of an event.
NotifyHandler(T),
}
/// Events that a task can emit to its manager.
#[derive(Debug)]
pub enum Event<T, H, TE, HE, C> {
/// A connection to a node has succeeded.
Established { id: TaskId, info: Connected<C> },
/// An established connection produced an error.
Error { id: TaskId, error: ConnectionError<HE> },
/// A pending connection failed.
Failed { id: TaskId, error: PendingConnectionError<TE>, handler: H },
/// Notify the manager of an event from the connection.
Notify { id: TaskId, event: T }
}
impl<T, H, TE, HE, C> Event<T, H, TE, HE, C> {
pub fn id(&self) -> &TaskId {
match self {
Event::Established { id, .. } => id,
Event::Error { id, .. } => id,
Event::Notify { id, .. } => id,
Event::Failed { id, .. } => id,
}
}
}
/// A `Task` is a [`Future`] that handles a single connection.
pub struct Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
/// The ID of this task.
id: TaskId,
/// Sender to emit events to the manager of this task.
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>>,
/// Receiver for commands sent by the manager of this task.
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,
/// Inner state of this `Task`.
state: State<F, M, H, I, O, E, C>,
}
impl<F, M, H, I, O, E, C> Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
/// Create a new task to connect and handle some node.
pub fn pending(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>>,
commands: mpsc::Receiver<Command<I>>,
future: F,
handler: H
) -> Self {
Task {
id,
events,
commands: commands.fuse(),
state: State::Pending {
future: Box::pin(future),
handler,
events: Vec::new()
},
}
}
/// Create a task for an existing node we are already connected to.
pub fn established(
id: TaskId,
events: mpsc::Sender<Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>>,
commands: mpsc::Receiver<Command<I>>,
connection: Connection<M, H::Handler>
) -> Self {
Task {
id,
events,
commands: commands.fuse(),
state: State::EstablishedPending(connection),
}
}
}
/// The state associated with the `Task` of a connection.
enum State<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
/// The task is waiting for the connection to be established.
Pending {
/// The future that will attempt to reach the node.
// TODO: don't pin this Future; this requires deeper changes though
future: Pin<Box<F>>,
/// The intended handler for the established connection.
handler: H,
/// While we are dialing the future, we need to buffer the events received via
/// `Command::NotifyHandler` so that they get delivered to the `handler`
/// once the connection is established. We can't leave these in `Task::receiver`
/// because we have to detect if the connection attempt has been aborted (by
/// dropping the corresponding `sender` owned by the manager).
events: Vec<I>
},
/// The connection is established and a new event is ready to be emitted.
EstablishedReady {
/// The node, if available.
connection: Option<Connection<M, H::Handler>>,
/// The actual event message to send.
event: Event<O, H, E, <H::Handler as ConnectionHandler>::Error, C>
},
/// The connection is established and pending a new event to occur.
EstablishedPending(Connection<M, H::Handler>),
/// The task is closing the connection.
Closing(Close<M>),
/// The task has finished.
Done
}
impl<F, M, H, I, O, E, C> Unpin for Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
H: IntoConnectionHandler<C>,
H::Handler: ConnectionHandler<Substream = Substream<M>>
{
}
impl<F, M, H, I, O, E, C> Future for Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
F: Future<Output = ConnectResult<C, M, E>>,
H: IntoConnectionHandler<C>,
H::Handler: ConnectionHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
{
type Output = ();
// NOTE: It is imperative to always consume all incoming commands from
// the manager first, in order to not prevent it from making progress because
// it is blocked on the channel capacity.
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
let this = &mut *self;
let id = this.id;
'poll: loop {
match std::mem::replace(&mut this.state, State::Done) {
State::Pending { mut future, handler, mut events } => {
// Process commands from the manager.
loop {
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
Poll::Pending => break,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(Command::NotifyHandler(event))) =>
events.push(event),
}
}
// Check if the connection succeeded.
match Future::poll(Pin::new(&mut future), cx) {
Poll::Ready(Ok((info, muxer))) => {
let mut c = Connection::new(muxer, handler.into_handler(&info));
for event in events {
c.inject_event(event)
}
this.state = State::EstablishedReady {
connection: Some(c),
event: Event::Established { id, info }
}
}
Poll::Pending => {
this.state = State::Pending { future, handler, events };
return Poll::Pending
}
Poll::Ready(Err(error)) => {
let event = Event::Failed { id, handler, error };
this.state = State::EstablishedReady { connection: None, event }
}
}
}
State::EstablishedPending(mut connection) => {
// Start by handling commands received from the manager, if any.
loop {
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
Poll::Pending => break,
Poll::Ready(Some(Command::NotifyHandler(event))) =>
connection.inject_event(event),
Poll::Ready(None) => {
// The manager has dropped the task, thus initiate a
// graceful shutdown of the connection.
this.state = State::Closing(connection.close());
continue 'poll
}
}
}
// Poll the connection for new events.
loop {
match Connection::poll(Pin::new(&mut connection), cx) {
Poll::Pending => {
this.state = State::EstablishedPending(connection);
return Poll::Pending
}
Poll::Ready(Ok(event)) => {
this.state = State::EstablishedReady {
connection: Some(connection),
event: Event::Notify { id, event }
};
continue 'poll
}
Poll::Ready(Err(error)) => {
// Notify the manager of the error via an event,
// dropping the connection.
let event = Event::Error { id, error };
this.state = State::EstablishedReady { connection: None, event };
continue 'poll
}
}
}
}
// Deliver an event to the manager.
State::EstablishedReady { mut connection, event } => {
// Process commands received from the manager, if any.
loop {
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
Poll::Pending => break,
Poll::Ready(Some(Command::NotifyHandler(event))) =>
if let Some(ref mut c) = connection {
c.inject_event(event)
}
Poll::Ready(None) =>
// The manager has dropped the task, thus initiate a
// graceful shutdown of the connection, if given.
if let Some(c) = connection {
this.state = State::Closing(c.close());
continue 'poll
} else {
return Poll::Ready(())
}
}
}
// Send the event to the manager.
match this.events.poll_ready(cx) {
Poll::Pending => {
self.state = State::EstablishedReady { connection, event };
return Poll::Pending
}
Poll::Ready(Ok(())) => {
// We assume that if `poll_ready` has succeeded, then sending the event
// will succeed as well. If it turns out that it didn't, we will detect
// the closing at the next loop iteration.
let _ = this.events.start_send(event);
if let Some(c) = connection {
this.state = State::EstablishedPending(c)
} else {
// The connection has been dropped, thus this was the last event
// to send to the manager and the task is done.
return Poll::Ready(())
}
},
Poll::Ready(Err(_)) => {
// The manager is no longer reachable, maybe due to
// application shutdown. Try a graceful shutdown of the
// connection, if available, and end the task.
if let Some(c) = connection {
this.state = State::Closing(c.close());
continue 'poll
}
return Poll::Ready(())
}
}
}
State::Closing(mut closing) =>
match Future::poll(Pin::new(&mut closing), cx) {
Poll::Ready(_) => return Poll::Ready(()), // end task
Poll::Pending => {
this.state = State::Closing(closing);
return Poll::Pending
}
}
State::Done => panic!("`Task::poll()` called after completion.")
}
}
}
}

875
core/src/connection/pool.rs Normal file
View File

@ -0,0 +1,875 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{
Executor,
ConnectedPoint,
PeerId,
connection::{
self,
Connected,
Connection,
ConnectionId,
ConnectionLimit,
ConnectionError,
ConnectionHandler,
ConnectionInfo,
IncomingInfo,
IntoConnectionHandler,
OutgoingInfo,
Substream,
PendingConnectionError,
manager::{self, Manager},
},
muxing::StreamMuxer,
};
use either::Either;
use fnv::FnvHashMap;
use futures::prelude::*;
use smallvec::SmallVec;
use std::{error, fmt, hash::Hash, task::Context, task::Poll};
/// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo = PeerId, TPeerId = PeerId> {
local_id: TPeerId,
/// The configuration of the pool.
limits: PoolLimits,
/// The connection manager that handles the connection I/O for both
/// established and pending connections.
///
/// For every established connection there is a corresponding entry in `established`.
manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo>,
/// The managed connections of each peer that are currently considered
/// established, as witnessed by the associated `ConnectedPoint`.
established: FnvHashMap<TPeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
/// The pending connections that are currently being negotiated.
pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<TPeerId>)>,
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> fmt::Debug
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
// TODO: More useful debug impl?
f.debug_struct("Pool")
.field("limits", &self.limits)
.finish()
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> Unpin
for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {}
/// Event that can happen on the `Pool`.
pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
/// A new connection has been established.
ConnectionEstablished {
connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>,
num_established: usize,
},
/// An established connection has encountered an error.
ConnectionError {
id: ConnectionId,
/// Information about the connection that errored.
connected: Connected<TConnInfo>,
/// The error that occurred.
error: ConnectionError<THandlerErr>,
/// A reference to the pool that used to manage the connection.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
/// The remaining number of established connections to the same peer.
num_established: usize,
},
/// A connection attempt failed.
PendingConnectionError {
/// The ID of the failed connection.
id: ConnectionId,
/// The local endpoint of the failed connection.
endpoint: ConnectedPoint,
/// The error that occurred.
error: PendingConnectionError<TTransErr>,
/// The handler that was supposed to handle the connection,
/// if the connection failed before the handler was consumed.
handler: Option<THandler>,
/// The (expected) peer of the failed connection.
peer: Option<TPeerId>,
/// A reference to the pool that managed the connection.
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
},
/// A node has produced an event.
ConnectionEvent {
/// The connection that has generated the event.
connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>,
/// The produced event.
event: TOutEvent,
},
}
impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> fmt::Debug
for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
TOutEvent: fmt::Debug,
TTransErr: fmt::Debug,
THandlerErr: fmt::Debug,
TConnInfo: fmt::Debug,
TInEvent: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
PoolEvent::ConnectionEstablished { ref connection, .. } => {
f.debug_tuple("PoolEvent::ConnectionEstablished")
.field(connection)
.finish()
},
PoolEvent::ConnectionError { ref id, ref connected, ref error, .. } => {
f.debug_struct("PoolEvent::ConnectionError")
.field("id", id)
.field("connected", connected)
.field("error", error)
.finish()
},
PoolEvent::PendingConnectionError { ref id, ref error, .. } => {
f.debug_struct("PoolEvent::PendingConnectionError")
.field("id", id)
.field("error", error)
.finish()
},
PoolEvent::ConnectionEvent { ref connection, ref event } => {
f.debug_struct("PoolEvent::ConnectionEvent")
.field("conn_info", connection.info())
.field("event", event)
.finish()
},
}
}
}
impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
TPeerId: Eq + Hash,
{
/// Creates a new empty `Pool`.
pub fn new(
local_id: TPeerId,
executor: Option<Box<dyn Executor + Send>>,
limits: PoolLimits
) -> Self {
Pool {
local_id,
limits,
manager: Manager::new(executor),
established: Default::default(),
pending: Default::default(),
}
}
/// Gets the configured connection limits of the pool.
pub fn limits(&self) -> &PoolLimits {
&self.limits
}
/// Adds a pending incoming connection to the pool in the form of a
/// `Future` that establishes and negotiates the connection.
///
/// Returns an error if the limit of pending incoming connections
/// has been reached.
pub fn add_incoming<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
info: IncomingInfo,
) -> Result<ConnectionId, ConnectionLimit>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_pending_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
}
}
Ok(self.add_pending(future, handler, endpoint, None))
}
/// Adds a pending outgoing connection to the pool in the form of a `Future`
/// that establishes and negotiates the connection.
///
/// Returns an error if the limit of pending outgoing connections
/// has been reached.
pub fn add_outgoing<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
info: OutgoingInfo<TPeerId>,
) -> Result<ConnectionId, ConnectionLimit>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
let endpoint = info.to_connected_point();
Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
}
/// Adds a pending connection to the pool in the form of a
/// `Future` that establishes and negotiates the connection.
fn add_pending<TFut, TMuxer>(
&mut self,
future: TFut,
handler: THandler,
endpoint: ConnectedPoint,
peer: Option<TPeerId>,
) -> ConnectionId
where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TFut: Future<
Output = Result<(TConnInfo, TMuxer), PendingConnectionError<TTransErr>>
> + Send + 'static,
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
{
let future = future.and_then({
let endpoint = endpoint.clone();
move |(info, muxer)| {
let connected = Connected { info, endpoint };
future::ready(Ok((connected, muxer)))
}
});
let id = self.manager.add_pending(future, handler);
self.pending.insert(id, (endpoint, peer));
id
}
/// Sends an event to all nodes.
///
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
/// has been sent to any node yet.
#[must_use]
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
where
TInEvent: Clone
{
self.manager.poll_broadcast(event, cx)
}
/// Adds an existing established connection to the pool.
///
/// Returns the assigned connection ID on success. An error is returned
/// if the configured maximum number of established connections for the
/// connected peer has been reached.
pub fn add<TMuxer>(&mut self, c: Connection<TMuxer, THandler::Handler>, i: Connected<TConnInfo>)
-> Result<ConnectionId, ConnectionLimit>
where
THandler: IntoConnectionHandler<TConnInfo> + Send + 'static,
THandler::Handler: ConnectionHandler<
Substream = connection::Substream<TMuxer>,
InEvent = TInEvent,
OutEvent = TOutEvent,
Error = THandlerErr
> + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
TTransErr: error::Error + Send + 'static,
THandlerErr: error::Error + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TConnInfo: Clone + Send + 'static,
TPeerId: Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
{
if let Some(limit) = self.limits.max_established_per_peer {
let current = self.num_peer_established(i.peer_id());
if limit >= current {
return Err(ConnectionLimit { limit, current })
}
}
let id = self.manager.add(c, i.clone());
self.established.entry(i.peer_id().clone()).or_default().insert(id, i.endpoint);
Ok(id)
}
/// Gets an entry representing a connection in the pool.
///
/// Returns `None` if the pool has no connection with the given ID.
pub fn get(&mut self, id: ConnectionId)
-> Option<PoolConnection<'_, TInEvent, TConnInfo, TPeerId>>
{
match self.manager.entry(id) {
Some(manager::Entry::Established(entry)) =>
Some(PoolConnection::Established(EstablishedConnection {
entry,
established: &mut self.established,
})),
Some(manager::Entry::Pending(entry)) =>
Some(PoolConnection::Pending(PendingConnection {
entry,
pending: &mut self.pending,
})),
None => None
}
}
/// Gets an established connection from the pool by ID.
pub fn get_established(&mut self, id: ConnectionId)
-> Option<EstablishedConnection<'_, TInEvent, TConnInfo, TPeerId>>
{
match self.get(id) {
Some(PoolConnection::Established(c)) => Some(c),
_ => None
}
}
/// Gets a pending outgoing connection by ID.
pub fn get_outgoing(&mut self, id: ConnectionId)
-> Option<PendingConnection<'_, TInEvent, TConnInfo, TPeerId>>
{
match self.pending.get(&id) {
Some((ConnectedPoint::Dialer { .. }, _peer)) =>
match self.manager.entry(id) {
Some(manager::Entry::Pending(entry)) =>
Some(PendingConnection {
entry,
pending: &mut self.pending,
}),
_ => unreachable!("by consistency of `self.pending` with `self.manager`")
}
_ => None
}
}
/// Returns true if we are connected to the given peer.
///
/// This will return true only after a `NodeReached` event has been produced by `poll()`.
pub fn is_connected(&self, id: &TPeerId) -> bool {
self.established.contains_key(id)
}
/// Returns the number of connected peers, i.e. those with at least one
/// established connection in the pool.
pub fn num_connected(&self) -> usize {
self.established.len()
}
/// Close all connections to the given peer.
pub fn disconnect(&mut self, peer: &TPeerId) {
if let Some(conns) = self.established.get(peer) {
for id in conns.keys() {
match self.manager.entry(*id) {
Some(manager::Entry::Established(e)) => { e.close(); },
_ => {}
}
}
}
for (id, (_endpoint, peer2)) in &self.pending {
if Some(peer) == peer2.as_ref() {
match self.manager.entry(*id) {
Some(manager::Entry::Pending(e)) => { e.abort(); },
_ => {}
}
}
}
}
/// Counts the number of established connections in the pool.
pub fn num_established(&self) -> usize {
self.established.iter().fold(0, |n, (_, conns)| n + conns.len())
}
/// Counts the number of pending connections in the pool.
pub fn num_pending(&self) -> usize {
self.iter_pending_info().count()
}
/// Counts the number of established connections to the given peer.
pub fn num_peer_established(&self, peer: &TPeerId) -> usize {
self.established.get(peer).map_or(0, |conns| conns.len())
}
/// Returns an iterator over all established connections of `peer`.
pub fn iter_peer_established<'a>(&'a mut self, peer: &TPeerId)
-> EstablishedConnectionIter<'a,
impl Iterator<Item = ConnectionId>,
TInEvent,
TOutEvent,
THandler,
TTransErr,
THandlerErr,
TConnInfo,
TPeerId>
{
let ids = self.iter_peer_established_info(peer)
.map(|(id, _endpoint)| *id)
.collect::<SmallVec<[ConnectionId; 10]>>()
.into_iter();
EstablishedConnectionIter { pool: self, ids }
}
/// Returns an iterator for information on all pending incoming connections.
pub fn iter_pending_incoming(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
self.iter_pending_info()
.filter_map(|(_, ref endpoint, _)| {
match endpoint {
ConnectedPoint::Listener { local_addr, send_back_addr } => {
Some(IncomingInfo { local_addr, send_back_addr })
},
ConnectedPoint::Dialer { .. } => None,
}
})
}
/// Returns an iterator for information on all pending outgoing connections.
pub fn iter_pending_outgoing(&self) -> impl Iterator<Item = OutgoingInfo<'_, TPeerId>> {
self.iter_pending_info()
.filter_map(|(_, ref endpoint, ref peer_id)| {
match endpoint {
ConnectedPoint::Listener { .. } => None,
ConnectedPoint::Dialer { address } =>
Some(OutgoingInfo { address, peer_id: peer_id.as_ref() }),
}
})
}
/// Returns an iterator over all connection IDs and associated endpoints
/// of established connections to `peer` known to the pool.
pub fn iter_peer_established_info(&self, peer: &TPeerId)
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint)> + fmt::Debug + '_
{
match self.established.get(peer) {
Some(conns) => Either::Left(conns.iter()),
None => Either::Right(std::iter::empty())
}
}
/// Returns an iterator over all pending connection IDs together
/// with associated endpoints and expected peer IDs in the pool.
pub fn iter_pending_info(&self)
-> impl Iterator<Item = (&ConnectionId, &ConnectedPoint, &Option<TPeerId>)> + '_
{
self.pending.iter().map(|(id, (endpoint, info))| (id, endpoint, info))
}
/// Returns an iterator over all connected peers, i.e. those that have
/// at least one established connection in the pool.
pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a TPeerId> + 'a {
self.established.keys()
}
/// Polls the connection pool for events.
///
/// > **Note**: We use a regular `poll` method instead of implementing `Stream`,
/// > because we want the `Pool` to stay borrowed if necessary.
pub fn poll<'a>(&'a mut self, cx: &mut Context) -> Poll<
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
> where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone,
TPeerId: Clone,
{
loop {
let item = match self.manager.poll(cx) {
Poll::Ready(item) => item,
Poll::Pending => return Poll::Pending,
};
match item {
manager::Event::PendingConnectionError { id, error, handler } => {
if let Some((endpoint, peer)) = self.pending.remove(&id) {
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint,
error,
handler: Some(handler),
peer,
pool: self
})
}
},
manager::Event::ConnectionError { id, connected, error } => {
let num_established =
if let Some(conns) = self.established.get_mut(connected.peer_id()) {
conns.remove(&id);
conns.len()
} else {
0
};
if num_established == 0 {
self.established.remove(connected.peer_id());
}
return Poll::Ready(PoolEvent::ConnectionError {
id, connected, error, num_established, pool: self
})
},
manager::Event::ConnectionEstablished { entry } => {
let id = entry.id();
if let Some((endpoint, peer)) = self.pending.remove(&id) {
// Check connection limit.
let established = &self.established;
let current = || established.get(entry.connected().peer_id())
.map_or(0, |conns| conns.len());
if let Err(e) = self.limits.check_established(current) {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::ConnectionLimit(e),
pool: self,
handler: None,
})
}
// Check peer ID.
if let Some(peer) = peer {
if &peer != entry.connected().peer_id() {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::InvalidPeerId,
pool: self,
handler: None,
})
}
}
if &self.local_id == entry.connected().peer_id() {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::InvalidPeerId,
pool: self,
handler: None,
})
}
// Add the connection to the pool.
let peer = entry.connected().peer_id().clone();
let conns = self.established.entry(peer).or_default();
let num_established = conns.len() + 1;
conns.insert(id, endpoint);
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::ConnectionEstablished {
connection, num_established
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
}
},
manager::Event::ConnectionEvent { entry, event } => {
let id = entry.id();
match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::ConnectionEvent {
connection,
event,
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
}
}
}
}
}
/// A connection in a [`Pool`].
pub enum PoolConnection<'a, TInEvent, TConnInfo, TPeerId> {
Pending(PendingConnection<'a, TInEvent, TConnInfo, TPeerId>),
Established(EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>),
}
/// A pending connection in a [`Pool`].
pub struct PendingConnection<'a, TInEvent, TConnInfo, TPeerId> {
entry: manager::PendingEntry<'a, TInEvent, TConnInfo>,
pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<TPeerId>)>,
}
impl<TInEvent, TConnInfo, TPeerId>
PendingConnection<'_, TInEvent, TConnInfo, TPeerId>
{
/// Returns the local connection ID.
pub fn id(&self) -> ConnectionId {
self.entry.id()
}
/// Returns the (expected) identity of the remote peer, if known.
pub fn peer_id(&self) -> &Option<TPeerId> {
&self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").1
}
/// Returns information about this endpoint of the connection.
pub fn endpoint(&self) -> &ConnectedPoint {
&self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").0
}
/// Aborts the connection attempt, closing the connection.
pub fn abort(self) {
self.pending.remove(&self.entry.id());
self.entry.abort();
}
}
/// An established connection in a [`Pool`].
pub struct EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId> {
entry: manager::EstablishedEntry<'a, TInEvent, TConnInfo>,
established: &'a mut FnvHashMap<TPeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
}
impl<TInEvent, TConnInfo, TPeerId> fmt::Debug
for EstablishedConnection<'_, TInEvent, TConnInfo, TPeerId>
where
TInEvent: fmt::Debug,
TConnInfo: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("EstablishedConnection")
.field("entry", &self.entry)
.finish()
}
}
impl<TInEvent, TConnInfo, TPeerId>
EstablishedConnection<'_, TInEvent, TConnInfo, TPeerId>
{
pub fn connected(&self) -> &Connected<TConnInfo> {
self.entry.connected()
}
/// Returns information about the connected endpoint.
pub fn endpoint(&self) -> &ConnectedPoint {
&self.entry.connected().endpoint
}
/// Returns connection information obtained from the transport.
pub fn info(&self) -> &TConnInfo {
&self.entry.connected().info
}
}
impl<TInEvent, TConnInfo, TPeerId>
EstablishedConnection<'_, TInEvent, TConnInfo, TPeerId>
where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
/// Returns the local connection ID.
pub fn id(&self) -> ConnectionId {
self.entry.id()
}
/// Returns the identity of the connected peer.
pub fn peer_id(&self) -> &TPeerId {
self.info().peer_id()
}
/// (Asynchronously) sends an event to the connection handler.
///
/// If the handler is not ready to receive the event, either because
/// it is busy or the connection is about to close, the given event
/// is returned with an `Err`.
///
/// If execution of this method is preceded by successful execution of
/// `poll_ready_notify_handler` without another intervening execution
/// of `notify_handler`, it only fails if the connection is now about
/// to close.
pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
self.entry.notify_handler(event)
}
/// Checks if `notify_handler` is ready to accept an event.
///
/// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
///
/// Returns `Err(())` if the background task associated with the connection
/// is terminating and the connection is about to close.
pub fn poll_ready_notify_handler(&mut self, cx: &mut Context) -> Poll<Result<(),()>> {
self.entry.poll_ready_notify_handler(cx)
}
/// Closes the connection, returning the connection information.
pub fn close(self) -> Connected<TConnInfo> {
let id = self.entry.id();
let info = self.entry.close();
let empty =
if let Some(conns) = self.established.get_mut(info.peer_id()) {
conns.remove(&id);
conns.is_empty()
} else {
false
};
if empty {
self.established.remove(info.peer_id());
}
info
}
}
/// An iterator over established connections in a [`Pool`].
pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> {
pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>,
ids: I
}
// Note: Ideally this would be an implementation of `Iterator`, but that
// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and
// a different definition of `Iterator`.
impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
where
I: Iterator<Item = ConnectionId>
{
/// Obtains the next connection, if any.
pub fn next<'b>(&'b mut self) -> Option<EstablishedConnection<'b, TInEvent, TConnInfo, TPeerId>>
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) { // (*)
match self.pool.manager.entry(id) {
Some(manager::Entry::Established(entry)) => {
let established = &mut self.pool.established;
return Some(EstablishedConnection { entry, established })
}
_ => unreachable!("by (*)")
}
}
}
None
}
/// Turns the iterator into an iterator over just the connection IDs.
pub fn into_ids(self) -> impl Iterator<Item = ConnectionId> {
self.ids
}
/// Returns the first connection, if any, consuming the iterator.
pub fn into_first<'b>(mut self)
-> Option<EstablishedConnection<'b, TInEvent, TConnInfo, TPeerId>>
where 'a: 'b
{
while let Some(id) = self.ids.next() {
if self.pool.manager.is_established(&id) { // (*)
match self.pool.manager.entry(id) {
Some(manager::Entry::Established(entry)) => {
let established = &mut self.pool.established;
return Some(EstablishedConnection { entry, established })
}
_ => unreachable!("by (*)")
}
}
}
None
}
}
/// The configurable limits of a connection [`Pool`].
#[derive(Debug, Clone, Default)]
pub struct PoolLimits {
pub max_pending_outgoing: Option<usize>,
pub max_pending_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
}
impl PoolLimits {
fn check_established<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_established_per_peer)
}
fn check_outgoing<F>(&self, current: F) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
Self::check(current, self.max_pending_outgoing)
}
fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
where
F: FnOnce() -> usize
{
if let Some(limit) = limit {
let current = current();
if limit >= current {
return Err(ConnectionLimit { limit, current })
}
}
Ok(())
}
}

View File

@ -0,0 +1,265 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::muxing::{StreamMuxer, SubstreamRef, substream_from_ref};
use futures::prelude::*;
use smallvec::SmallVec;
use std::sync::Arc;
use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll};
/// Endpoint for a received substream.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum SubstreamEndpoint<TDialInfo> {
Dialer(TDialInfo),
Listener,
}
impl<TDialInfo> SubstreamEndpoint<TDialInfo> {
/// Returns true for `Dialer`.
pub fn is_dialer(&self) -> bool {
match self {
SubstreamEndpoint::Dialer(_) => true,
SubstreamEndpoint::Listener => false,
}
}
/// Returns true for `Listener`.
pub fn is_listener(&self) -> bool {
match self {
SubstreamEndpoint::Dialer(_) => false,
SubstreamEndpoint::Listener => true,
}
}
}
/// Implementation of `Stream` that handles substream multiplexing.
///
/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying
/// the `Muxing` will **not** close the existing substreams.
///
/// The stream will close once both the inbound and outbound channels are closed, and no more
/// outbound substream attempt is pending.
pub struct Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
/// The muxer used to manage substreams.
inner: Arc<TMuxer>,
/// List of substreams we are currently opening.
outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
}
/// Future that signals the remote that we have closed the connection.
pub struct Close<TMuxer> {
/// Muxer to close.
muxer: Arc<TMuxer>,
}
/// A successfully opened substream.
pub type Substream<TMuxer> = SubstreamRef<Arc<TMuxer>>;
/// Event that can happen on the `Muxing`.
pub enum SubstreamEvent<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
/// A new inbound substream arrived.
InboundSubstream {
/// The newly-opened substream. Will return EOF of an error if the `Muxing` is
/// destroyed or `close_graceful` is called.
substream: Substream<TMuxer>,
},
/// An outbound substream has successfully been opened.
OutboundSubstream {
/// User data that has been passed to the `open_substream` method.
user_data: TUserData,
/// The newly-opened substream. Will return EOF of an error if the `Muxing` is
/// destroyed or `close_graceful` is called.
substream: Substream<TMuxer>,
},
}
/// Identifier for a substream being opened.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct OutboundSubstreamId(usize);
impl<TMuxer, TUserData> Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
/// Creates a new node events stream.
pub fn new(muxer: TMuxer) -> Self {
Muxing {
inner: Arc::new(muxer),
outbound_substreams: SmallVec::new(),
}
}
/// Starts the process of opening a new outbound substream.
///
/// After calling this method, polling the stream should eventually produce either an
/// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has
/// been passed to this method.
pub fn open_substream(&mut self, user_data: TUserData) {
let raw = self.inner.open_outbound();
self.outbound_substreams.push((user_data, raw));
}
/// Returns `true` if the remote has shown any sign of activity after the muxer has been open.
///
/// See `StreamMuxer::is_remote_acknowledged`.
pub fn is_remote_acknowledged(&self) -> bool {
self.inner.is_remote_acknowledged()
}
/// Destroys the node stream and returns all the pending outbound substreams, plus an object
/// that signals the remote that we shut down the connection.
#[must_use]
pub fn close(mut self) -> (Close<TMuxer>, Vec<TUserData>) {
let substreams = self.cancel_outgoing();
let close = Close { muxer: self.inner.clone() };
(close, substreams)
}
/// Destroys all outbound streams and returns the corresponding user data.
pub fn cancel_outgoing(&mut self) -> Vec<TUserData> {
let mut out = Vec::with_capacity(self.outbound_substreams.len());
for (user_data, outbound) in self.outbound_substreams.drain(..) {
out.push(user_data);
self.inner.destroy_outbound(outbound);
}
out
}
/// Provides an API similar to `Future`.
pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> {
// Polling inbound substream.
match self.inner.poll_inbound(cx) {
Poll::Ready(Ok(substream)) => {
let substream = substream_from_ref(self.inner.clone(), substream);
return Poll::Ready(Ok(SubstreamEvent::InboundSubstream {
substream,
}));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => {}
}
// Polling outbound substreams.
// We remove each element from `outbound_substreams` one by one and add them back.
for n in (0..self.outbound_substreams.len()).rev() {
let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n);
match self.inner.poll_outbound(cx, &mut outbound) {
Poll::Ready(Ok(substream)) => {
let substream = substream_from_ref(self.inner.clone(), substream);
self.inner.destroy_outbound(outbound);
return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
user_data,
substream,
}));
}
Poll::Pending => {
self.outbound_substreams.push((user_data, outbound));
}
Poll::Ready(Err(err)) => {
self.inner.destroy_outbound(outbound);
return Poll::Ready(Err(err.into()));
}
}
}
// Nothing happened. Register our task to be notified and return.
Poll::Pending
}
}
impl<TMuxer, TUserData> fmt::Debug for Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Muxing")
.field("outbound_substreams", &self.outbound_substreams.len())
.finish()
}
}
impl<TMuxer, TUserData> Drop for Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
fn drop(&mut self) {
// The substreams that were produced will continue to work, as the muxer is held in an Arc.
// However we will no longer process any further inbound or outbound substream, and we
// therefore close everything.
for (_, outbound) in self.outbound_substreams.drain(..) {
self.inner.destroy_outbound(outbound);
}
}
}
impl<TMuxer> Future for Close<TMuxer>
where
TMuxer: StreamMuxer,
{
type Output = Result<(), IoError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.muxer.close(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
}
}
}
impl<TMuxer> fmt::Debug for Close<TMuxer>
where
TMuxer: StreamMuxer,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Close")
.finish()
}
}
impl<TMuxer, TUserData> fmt::Debug for SubstreamEvent<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
TMuxer::Substream: fmt::Debug,
TUserData: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubstreamEvent::InboundSubstream { substream } => {
f.debug_struct("SubstreamEvent::OutboundClosed")
.field("substream", substream)
.finish()
},
SubstreamEvent::OutboundSubstream { user_data, substream } => {
f.debug_struct("SubstreamEvent::OutboundSubstream")
.field("user_data", user_data)
.field("substream", substream)
.finish()
},
}
}
}