{core,swarm}: Remove Network abstraction (#2492)

This commit removes the `Network` abstraction, thus managing `Listeners`
and the connection `Pool` in `Swarm` directly. This is done under the
assumption that noone uses the `Network` abstraction directly, but
instead everyone always uses it through `Swarm`. Both `Listeners` and
`Pool` are moved from `libp2p-core` into `libp2p-swarm`. Given that they
are no longer exposed via `Network`, they can be treated as an
implementation detail of `libp2p-swarm` and `Swarm`.

This change does not include any behavioural changes.

This change has the followin benefits:

- Removal of `NetworkEvent`, which was mostly an isomorphism of
  `SwarmEvent`.
- Removal of the never-directly-used `Network` abstraction.
- Removal of now obsolete verbose `Peer` (`core/src/network/peer.rs`)
  construct.
- Removal of `libp2p-core` `DialOpts`, which is a direct mapping of
  `libp2p-swarm` `DialOpts`.
- Allowing breaking changes to the connection handling and `Swarm` API
  interface without a breaking change in `libp2p-core` and thus a
  without a breaking change in `/transport` protocols.

This change enables the following potential future changes:

- Removal of `NodeHandler` and `ConnectionHandler`. Thus allowing to
  rename `ProtocolsHandler` into `ConnectionHandler`.
- Moving `NetworkBehaviour` and `ProtocolsHandler` into `libp2p-core`,
  having `libp2p-xxx` protocol crates only depend on `libp2p-core` and
  thus allowing general breaking changes to `Swarm` without breaking all
  `libp2p-xxx` crates.
This commit is contained in:
Max Inden
2022-02-13 21:57:38 +01:00
committed by GitHub
parent 861e15dabb
commit 7fc342e6c0
29 changed files with 1332 additions and 3211 deletions

View File

@ -18,29 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod error;
pub(crate) mod handler;
mod listeners;
mod substream;
pub(crate) mod pool;
pub use error::{
ConnectionError, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
};
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersEvent, ListenersStream};
pub use pool::{ConnectionCounters, ConnectionLimits};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use substream::{Close, Substream, SubstreamEndpoint};
use crate::multiaddr::{Multiaddr, Protocol};
use crate::muxing::StreamMuxer;
use crate::PeerId;
use std::hash::Hash;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use substream::{Muxing, SubstreamEvent};
/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
@ -53,7 +31,34 @@ impl ConnectionId {
/// in test environments. There is in general no guarantee
/// that all connection IDs are based on non-negative integers.
pub fn new(id: usize) -> Self {
ConnectionId(id)
Self(id)
}
}
impl std::ops::Add<usize> for ConnectionId {
type Output = Self;
fn add(self, other: usize) -> Self {
Self(self.0 + other)
}
}
/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);
impl ListenerId {
/// Creates a `ListenerId` from a non-negative integer.
pub fn new(id: u64) -> Self {
Self(id)
}
}
impl std::ops::Add<u64> for ListenerId {
type Output = Self;
fn add(self, other: u64) -> Self {
Self(self.0 + other)
}
}
@ -236,181 +241,3 @@ impl ConnectedPoint {
}
}
}
/// Information about a successfully established connection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connected {
/// The connected endpoint, including network address information.
pub endpoint: ConnectedPoint,
/// Information obtained from the transport.
pub peer_id: PeerId,
}
/// Event generated by a [`Connection`].
#[derive(Debug, Clone)]
pub enum Event<T> {
/// Event generated by the [`ConnectionHandler`].
Handler(T),
/// Address of the remote has changed.
AddressChange(Multiaddr),
}
/// A multiplexed connection to a peer with an associated `ConnectionHandler`.
pub struct Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Node that handles the muxing.
muxing: substream::Muxing<TMuxer, THandler::OutboundOpenInfo>,
/// Handler that processes substreams.
handler: THandler,
}
impl<TMuxer, THandler> fmt::Debug for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection")
.field("muxing", &self.muxing)
.field("handler", &self.handler)
.finish()
}
}
impl<TMuxer, THandler> Unpin for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
}
impl<TMuxer, THandler> Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
/// Builds a new `Connection` from the given substream multiplexer
/// and connection handler.
pub fn new(muxer: TMuxer, handler: THandler) -> Self {
Connection {
muxing: Muxing::new(muxer),
handler,
}
}
/// Returns a reference to the `ConnectionHandler`
pub fn handler(&self) -> &THandler {
&self.handler
}
/// Returns a mutable reference to the `ConnectionHandler`
pub fn handler_mut(&mut self) -> &mut THandler {
&mut self.handler
}
/// Notifies the connection handler of an event.
pub fn inject_event(&mut self, event: THandler::InEvent) {
self.handler.inject_event(event);
}
/// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}
/// Polls the connection for events produced by the associated handler
/// as a result of I/O activity on the substream multiplexer.
pub fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
loop {
let mut io_pending = false;
// Perform I/O on the connection through the muxer, informing the handler
// of new substreams.
match self.muxing.poll(cx) {
Poll::Pending => io_pending = true,
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => self
.handler
.inject_substream(substream, SubstreamEndpoint::Listener),
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
user_data,
substream,
})) => {
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}
// Poll the handler for new events.
match self.handler.poll(cx) {
Poll::Pending => {
if io_pending {
return Poll::Pending; // Nothing to do
}
}
Poll::Ready(Ok(ConnectionHandlerEvent::OutboundSubstreamRequest(user_data))) => {
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
}
}
}
/// Borrowed information about an incoming connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct IncomingInfo<'a> {
/// Local connection address.
pub local_addr: &'a Multiaddr,
/// Address used to send back data to the remote.
pub send_back_addr: &'a Multiaddr,
}
impl<'a> IncomingInfo<'a> {
/// Builds the [`PendingPoint`] corresponding to the incoming connection.
pub fn to_pending_point(&self) -> PendingPoint {
PendingPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
/// Builds the [`ConnectedPoint`] corresponding to the incoming connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}
/// Information about a connection limit.
#[derive(Debug, Clone)]
pub struct ConnectionLimit {
/// The maximum number of connections.
pub limit: u32,
/// The current number of connections.
pub current: u32,
}
impl fmt::Display for ConnectionLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.current, self.limit)
}
}
/// A `ConnectionLimit` can represent an error if it has been exceeded.
impl Error for ConnectionLimit {}

View File

@ -1,156 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::transport::TransportError;
use crate::Multiaddr;
use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId};
use std::{fmt, io};
/// Errors that can occur in the context of an established `Connection`.
#[derive(Debug)]
pub enum ConnectionError<THandlerErr> {
/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
/// The connection handler produced an error.
Handler(THandlerErr),
}
impl<THandlerErr> fmt::Display for ConnectionError<THandlerErr>
where
THandlerErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
}
}
}
impl<THandlerErr> std::error::Error for ConnectionError<THandlerErr>
where
THandlerErr: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
}
}
}
/// Errors that can occur in the context of a pending outgoing `Connection`.
///
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to
/// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single
/// connection.
pub type PendingOutboundConnectionError<TTransErr> =
PendingConnectionError<Vec<(Multiaddr, TransportError<TTransErr>)>>;
/// Errors that can occur in the context of a pending incoming `Connection`.
pub type PendingInboundConnectionError<TTransErr> =
PendingConnectionError<TransportError<TTransErr>>;
/// Errors that can occur in the context of a pending `Connection`.
#[derive(Debug)]
pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s) on a connection.
Transport(TTransErr),
/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),
/// Pending connection attempt has been aborted.
Aborted,
/// The peer identity obtained on the connection did not
/// match the one that was expected or is the local one.
WrongPeerId {
obtained: PeerId,
endpoint: ConnectedPoint,
},
/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
}
impl<T> PendingConnectionError<T> {
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> PendingConnectionError<U> {
match self {
PendingConnectionError::Transport(t) => PendingConnectionError::Transport(f(t)),
PendingConnectionError::ConnectionLimit(l) => {
PendingConnectionError::ConnectionLimit(l)
}
PendingConnectionError::Aborted => PendingConnectionError::Aborted,
PendingConnectionError::WrongPeerId { obtained, endpoint } => {
PendingConnectionError::WrongPeerId { obtained, endpoint }
}
PendingConnectionError::IO(e) => PendingConnectionError::IO(e),
}
}
}
impl<TTransErr> fmt::Display for PendingConnectionError<TTransErr>
where
TTransErr: fmt::Display + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err),
PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."),
PendingConnectionError::Transport(err) => {
write!(
f,
"Pending connection: Transport error on connection: {}",
err
)
}
PendingConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
PendingConnectionError::WrongPeerId { obtained, endpoint } => {
write!(
f,
"Pending connection: Unexpected peer ID {} at {:?}.",
obtained, endpoint
)
}
}
}
}
impl<TTransErr> std::error::Error for PendingConnectionError<TTransErr>
where
TTransErr: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(_) => None,
PendingConnectionError::WrongPeerId { .. } => None,
PendingConnectionError::Aborted => None,
PendingConnectionError::ConnectionLimit(..) => None,
}
}
}

View File

@ -1,144 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use super::{Connected, SubstreamEndpoint};
use crate::Multiaddr;
use std::{fmt::Debug, task::Context, task::Poll};
/// The interface of a connection handler.
///
/// Each handler is responsible for a single connection.
pub trait ConnectionHandler {
/// The inbound type of events used to notify the handler through the `Network`.
///
/// See also [`EstablishedConnection::notify_handler`](super::EstablishedConnection::notify_handler)
/// and [`ConnectionHandler::inject_event`].
type InEvent: Debug + Send + 'static;
/// The outbound type of events that the handler emits to the `Network`
/// through [`ConnectionHandler::poll`].
///
/// See also [`NetworkEvent::ConnectionEvent`](crate::network::NetworkEvent::ConnectionEvent).
type OutEvent: Debug + Send + 'static;
/// The type of errors that the handler can produce when polled by the `Network`.
type Error: Debug + Send + 'static;
/// The type of the substream containing the data.
type Substream;
/// Information about a substream. Can be sent to the handler through a `SubstreamEndpoint`,
/// and will be passed back in `inject_substream` or `inject_outbound_closed`.
type OutboundOpenInfo;
/// Sends a new substream to the handler.
///
/// The handler is responsible for upgrading the substream to whatever protocol it wants.
///
/// # Panic
///
/// Implementations are allowed to panic in the case of dialing if the `user_data` in
/// `endpoint` doesn't correspond to what was returned earlier when polling, or is used
/// multiple times.
fn inject_substream(
&mut self,
substream: Self::Substream,
endpoint: SubstreamEndpoint<Self::OutboundOpenInfo>,
);
/// Notifies the handler of an event.
fn inject_event(&mut self, event: Self::InEvent);
/// Notifies the handler of a change in the address of the remote.
fn inject_address_change(&mut self, new_address: &Multiaddr);
/// Polls the handler for events.
///
/// Returning an error will close the connection to the remote.
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>;
}
/// Prototype for a `ConnectionHandler`.
pub trait IntoConnectionHandler {
/// The node handler.
type Handler: ConnectionHandler;
/// Builds the node handler.
///
/// The implementation is given a `Connected` value that holds information about
/// the newly established connection for which a handler should be created.
fn into_handler(self, connected: &Connected) -> Self::Handler;
}
impl<T> IntoConnectionHandler for T
where
T: ConnectionHandler,
{
type Handler = Self;
fn into_handler(self, _: &Connected) -> Self {
self
}
}
pub(crate) type THandlerInEvent<THandler> =
<<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;
pub(crate) type THandlerOutEvent<THandler> =
<<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent;
pub(crate) type THandlerError<THandler> =
<<THandler as IntoConnectionHandler>::Handler as ConnectionHandler>::Error;
/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote.
OutboundSubstreamRequest(TOutboundOpenInfo),
/// Other event.
Custom(TCustom),
}
/// Event produced by a handler.
impl<TOutboundOpenInfo, TCustom> ConnectionHandlerEvent<TOutboundOpenInfo, TCustom> {
/// If this is `OutboundSubstreamRequest`, maps the content to something else.
pub fn map_outbound_open_info<F, I>(self, map: F) -> ConnectionHandlerEvent<I, TCustom>
where
F: FnOnce(TOutboundOpenInfo) -> I,
{
match self {
ConnectionHandlerEvent::OutboundSubstreamRequest(val) => {
ConnectionHandlerEvent::OutboundSubstreamRequest(map(val))
}
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(val),
}
}
/// If this is `Custom`, maps the content to something else.
pub fn map_custom<F, I>(self, map: F) -> ConnectionHandlerEvent<TOutboundOpenInfo, I>
where
F: FnOnce(TCustom) -> I,
{
match self {
ConnectionHandlerEvent::OutboundSubstreamRequest(val) => {
ConnectionHandlerEvent::OutboundSubstreamRequest(val)
}
ConnectionHandlerEvent::Custom(val) => ConnectionHandlerEvent::Custom(map(val)),
}
}
}

View File

@ -1,607 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Manage listening on multiple multiaddresses at once.
use crate::{
transport::{ListenerEvent, TransportError},
Multiaddr, Transport,
};
use futures::{prelude::*, task::Context, task::Poll};
use log::debug;
use smallvec::SmallVec;
use std::{collections::VecDeque, fmt, mem, pin::Pin};
/// Implementation of `futures::Stream` that allows listening on multiaddresses.
///
/// To start using a `ListenersStream`, create one with `new` by passing an implementation of
/// `Transport`. This `Transport` will be used to start listening, therefore you want to pass
/// a `Transport` that supports the protocols you wish you listen on.
///
/// Then, call `ListenerStream::listen_on` for all addresses you want to start listening on.
///
/// The `ListenersStream` never ends and never produces errors. If a listener errors or closes,
/// an event is generated on the stream and the listener is then dropped, but the `ListenersStream`
/// itself continues.
///
/// # Example
///
/// ```no_run
/// use futures::prelude::*;
/// use libp2p_core::connection::{ListenersEvent, ListenersStream};
///
/// let mut listeners = ListenersStream::new(libp2p_tcp::TcpConfig::new());
///
/// // Ask the `listeners` to start listening on the given multiaddress.
/// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
///
/// // The `listeners` will now generate events when polled.
/// futures::executor::block_on(async move {
/// while let Some(event) = listeners.next().await {
/// match event {
/// ListenersEvent::NewAddress { listener_id, listen_addr } => {
/// println!("Listener {:?} is listening at address {}", listener_id, listen_addr);
/// },
/// ListenersEvent::AddressExpired { listener_id, listen_addr } => {
/// println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr);
/// },
/// ListenersEvent::Closed { listener_id, .. } => {
/// println!("Listener {:?} has been closed", listener_id);
/// },
/// ListenersEvent::Error { listener_id, error } => {
/// println!("Listener {:?} has experienced an error: {}", listener_id, error);
/// },
/// ListenersEvent::Incoming { listener_id, upgrade, local_addr, .. } => {
/// println!("Listener {:?} has a new connection on {}", listener_id, local_addr);
/// // We don't do anything with the newly-opened connection, but in a real-life
/// // program you probably want to use it!
/// drop(upgrade);
/// },
/// }
/// }
/// })
/// ```
pub struct ListenersStream<TTrans>
where
TTrans: Transport,
{
/// Transport used to spawn listeners.
transport: TTrans,
/// All the active listeners.
/// The `Listener` struct contains a stream that we want to be pinned. Since the `VecDeque`
/// can be resized, the only way is to use a `Pin<Box<>>`.
listeners: VecDeque<Pin<Box<Listener<TTrans>>>>,
/// The next listener ID to assign.
next_id: ListenerId,
/// Pending listeners events to return from [`ListenersStream::poll`].
pending_events: VecDeque<ListenersEvent<TTrans>>,
}
/// The ID of a single listener.
///
/// It is part of most [`ListenersEvent`]s and can be used to remove
/// individual listeners from the [`ListenersStream`].
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);
/// A single active listener.
#[pin_project::pin_project]
#[derive(Debug)]
struct Listener<TTrans>
where
TTrans: Transport,
{
/// The ID of this listener.
id: ListenerId,
/// The object that actually listens.
#[pin]
listener: TTrans::Listener,
/// Addresses it is listening on.
addresses: SmallVec<[Multiaddr; 4]>,
}
/// Event that can happen on the `ListenersStream`.
pub enum ListenersEvent<TTrans>
where
TTrans: Transport,
{
/// A new address is being listened on.
NewAddress {
/// The listener that is listening on the new address.
listener_id: ListenerId,
/// The new address that is being listened on.
listen_addr: Multiaddr,
},
/// An address is no longer being listened on.
AddressExpired {
/// The listener that is no longer listening on the address.
listener_id: ListenerId,
/// The new address that is being listened on.
listen_addr: Multiaddr,
},
/// A connection is incoming on one of the listeners.
Incoming {
/// The listener that produced the upgrade.
listener_id: ListenerId,
/// The produced upgrade.
upgrade: TTrans::ListenerUpgrade,
/// Local connection address.
local_addr: Multiaddr,
/// Address used to send back data to the incoming client.
send_back_addr: Multiaddr,
},
/// A listener closed.
Closed {
/// The ID of the listener that closed.
listener_id: ListenerId,
/// The addresses that the listener was listening on.
addresses: Vec<Multiaddr>,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), TTrans::Error>,
},
/// A listener errored.
///
/// The listener will continue to be polled for new events and the event
/// is for informational purposes only.
Error {
/// The ID of the listener that errored.
listener_id: ListenerId,
/// The error value.
error: TTrans::Error,
},
}
impl<TTrans> ListenersStream<TTrans>
where
TTrans: Transport,
{
/// Starts a new stream of listeners.
pub fn new(transport: TTrans) -> Self {
ListenersStream {
transport,
listeners: VecDeque::new(),
next_id: ListenerId(1),
pending_events: VecDeque::new(),
}
}
/// Same as `new`, but pre-allocates enough memory for the given number of
/// simultaneous listeners.
pub fn with_capacity(transport: TTrans, capacity: usize) -> Self {
ListenersStream {
transport,
listeners: VecDeque::with_capacity(capacity),
next_id: ListenerId(1),
pending_events: VecDeque::new(),
}
}
/// Start listening on a multiaddress.
///
/// Returns an error if the transport doesn't support the given multiaddress.
pub fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<ListenerId, TransportError<TTrans::Error>>
where
TTrans: Clone,
{
let listener = self.transport.clone().listen_on(addr)?;
self.listeners.push_back(Box::pin(Listener {
id: self.next_id,
listener,
addresses: SmallVec::new(),
}));
let id = self.next_id;
self.next_id = ListenerId(self.next_id.0 + 1);
Ok(id)
}
/// Remove the listener matching the given `ListenerId`.
///
/// Returns `true` if there was a listener with this ID, `false`
/// otherwise.
pub fn remove_listener(&mut self, id: ListenerId) -> bool {
if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
let mut listener = self
.listeners
.remove(i)
.expect("Index can not be out of bounds.");
let listener_project = listener.as_mut().project();
let addresses = mem::take(listener_project.addresses).into_vec();
self.pending_events.push_back(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses,
reason: Ok(()),
});
true
} else {
false
}
}
/// Returns the transport passed when building this object.
pub fn transport(&self) -> &TTrans {
&self.transport
}
/// Returns an iterator that produces the list of addresses we're listening on.
pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.iter().flat_map(|l| l.addresses.iter())
}
/// Provides an API similar to `Stream`, except that it cannot end.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenersEvent<TTrans>> {
// Return pending events from closed listeners.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(event);
}
// We remove each element from `listeners` one by one and add them back.
let mut remaining = self.listeners.len();
while let Some(mut listener) = self.listeners.pop_back() {
let mut listener_project = listener.as_mut().project();
match TryStream::try_poll_next(listener_project.listener.as_mut(), cx) {
Poll::Pending => {
self.listeners.push_front(listener);
remaining -= 1;
if remaining == 0 {
break;
}
}
Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
upgrade,
local_addr,
remote_addr,
}))) => {
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::Incoming {
listener_id: id,
upgrade,
local_addr,
send_back_addr: remote_addr,
});
}
Poll::Ready(Some(Ok(ListenerEvent::NewAddress(a)))) => {
if listener_project.addresses.contains(&a) {
debug!("Transport has reported address {} multiple times", a)
} else {
listener_project.addresses.push(a.clone());
}
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::NewAddress {
listener_id: id,
listen_addr: a,
});
}
Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(a)))) => {
listener_project.addresses.retain(|x| x != &a);
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::AddressExpired {
listener_id: id,
listen_addr: a,
});
}
Poll::Ready(Some(Ok(ListenerEvent::Error(error)))) => {
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::Error {
listener_id: id,
error,
});
}
Poll::Ready(None) => {
let addresses = mem::take(listener_project.addresses).into_vec();
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses,
reason: Ok(()),
});
}
Poll::Ready(Some(Err(err))) => {
let addresses = mem::take(listener_project.addresses).into_vec();
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
addresses,
reason: Err(err),
});
}
}
}
// We register the current task to be woken up if a new listener is added.
Poll::Pending
}
}
impl<TTrans> Stream for ListenersStream<TTrans>
where
TTrans: Transport,
{
type Item = ListenersEvent<TTrans>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
ListenersStream::poll(self, cx).map(Option::Some)
}
}
impl<TTrans> Unpin for ListenersStream<TTrans> where TTrans: Transport {}
impl<TTrans> fmt::Debug for ListenersStream<TTrans>
where
TTrans: Transport + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ListenersStream")
.field("transport", &self.transport)
.field("listen_addrs", &self.listen_addrs().collect::<Vec<_>>())
.finish()
}
}
impl<TTrans> fmt::Debug for ListenersEvent<TTrans>
where
TTrans: Transport,
TTrans::Error: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
ListenersEvent::NewAddress {
listener_id,
listen_addr,
} => f
.debug_struct("ListenersEvent::NewAddress")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::AddressExpired {
listener_id,
listen_addr,
} => f
.debug_struct("ListenersEvent::AddressExpired")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::Incoming {
listener_id,
local_addr,
..
} => f
.debug_struct("ListenersEvent::Incoming")
.field("listener_id", listener_id)
.field("local_addr", local_addr)
.finish(),
ListenersEvent::Closed {
listener_id,
addresses,
reason,
} => f
.debug_struct("ListenersEvent::Closed")
.field("listener_id", listener_id)
.field("addresses", addresses)
.field("reason", reason)
.finish(),
ListenersEvent::Error { listener_id, error } => f
.debug_struct("ListenersEvent::Error")
.field("listener_id", listener_id)
.field("error", error)
.finish(),
}
}
}
#[cfg(test)]
mod tests {
use futures::{future::BoxFuture, stream::BoxStream};
use super::*;
use crate::transport;
#[test]
fn incoming_event() {
async_std::task::block_on(async move {
let mem_transport = transport::MemoryTransport::default();
let mut listeners = ListenersStream::new(mem_transport);
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
let address = {
let event = listeners.next().await.unwrap();
if let ListenersEvent::NewAddress { listen_addr, .. } = event {
listen_addr
} else {
panic!("Was expecting the listen address to be reported")
}
};
let address2 = address.clone();
async_std::task::spawn(async move {
mem_transport.dial(address2).unwrap().await.unwrap();
});
match listeners.next().await.unwrap() {
ListenersEvent::Incoming {
local_addr,
send_back_addr,
..
} => {
assert_eq!(local_addr, address);
assert!(send_back_addr != address);
}
_ => panic!(),
}
});
}
#[test]
fn listener_event_error_isnt_fatal() {
// Tests that a listener continues to be polled even after producing
// a `ListenerEvent::Error`.
#[derive(Clone)]
struct DummyTrans;
impl transport::Transport for DummyTrans {
type Output = ();
type Error = std::io::Error;
type Listener = BoxStream<
'static,
Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>,
>;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(
self,
_: Multiaddr,
) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
Ok(Box::pin(stream::unfold((), |()| async move {
Some((
Ok(ListenerEvent::Error(std::io::Error::from(
std::io::ErrorKind::Other,
))),
(),
))
})))
}
fn dial(
self,
_: Multiaddr,
) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
fn dial_as_listener(
self,
_: Multiaddr,
) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
}
async_std::task::block_on(async move {
let transport = DummyTrans;
let mut listeners = ListenersStream::new(transport);
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
for _ in 0..10 {
match listeners.next().await.unwrap() {
ListenersEvent::Error { .. } => {}
_ => panic!(),
}
}
});
}
#[test]
fn listener_error_is_fatal() {
// Tests that a listener stops after producing an error on the stream itself.
#[derive(Clone)]
struct DummyTrans;
impl transport::Transport for DummyTrans {
type Output = ();
type Error = std::io::Error;
type Listener = BoxStream<
'static,
Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>,
>;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn listen_on(
self,
_: Multiaddr,
) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
Ok(Box::pin(stream::unfold((), |()| async move {
Some((Err(std::io::Error::from(std::io::ErrorKind::Other)), ()))
})))
}
fn dial(
self,
_: Multiaddr,
) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
fn dial_as_listener(
self,
_: Multiaddr,
) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
}
async_std::task::block_on(async move {
let transport = DummyTrans;
let mut listeners = ListenersStream::new(transport);
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
match listeners.next().await.unwrap() {
ListenersEvent::Closed { .. } => {}
_ => panic!(),
}
});
}
#[test]
fn listener_closed() {
async_std::task::block_on(async move {
let mem_transport = transport::MemoryTransport::default();
let mut listeners = ListenersStream::new(mem_transport);
let id = listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
let event = listeners.next().await.unwrap();
let addr;
if let ListenersEvent::NewAddress { listen_addr, .. } = event {
addr = listen_addr
} else {
panic!("Was expecting the listen address to be reported")
}
assert!(listeners.remove_listener(id));
match listeners.next().await.unwrap() {
ListenersEvent::Closed {
listener_id,
addresses,
reason: Ok(()),
} => {
assert_eq!(listener_id, id);
assert!(addresses.contains(&addr));
}
other => panic!("Unexpected listeners event: {:?}", other),
}
});
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,164 +0,0 @@
// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{
connection::Endpoint,
transport::{Transport, TransportError},
Multiaddr, PeerId,
};
use futures::{
future::{BoxFuture, Future, FutureExt},
ready,
stream::{FuturesUnordered, StreamExt},
};
use std::{
num::NonZeroU8,
pin::Pin,
task::{Context, Poll},
};
type Dial<TTrans> = BoxFuture<
'static,
(
Multiaddr,
Result<<TTrans as Transport>::Output, TransportError<<TTrans as Transport>::Error>>,
),
>;
pub struct ConcurrentDial<TTrans: Transport> {
dials: FuturesUnordered<Dial<TTrans>>,
pending_dials: Box<dyn Iterator<Item = Dial<TTrans>> + Send>,
errors: Vec<(Multiaddr, TransportError<TTrans::Error>)>,
}
impl<TTrans: Transport> Unpin for ConcurrentDial<TTrans> {}
impl<TTrans> ConcurrentDial<TTrans>
where
TTrans: Transport + Clone + Send + 'static,
TTrans::Output: Send,
TTrans::Error: Send,
TTrans::Dial: Send + 'static,
{
pub(crate) fn new(
transport: TTrans,
peer: Option<PeerId>,
addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
concurrency_factor: NonZeroU8,
role_override: Endpoint,
) -> Self {
let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) {
Ok(address) => {
let dial = match role_override {
Endpoint::Dialer => transport.clone().dial(address.clone()),
Endpoint::Listener => transport.clone().dial_as_listener(address.clone()),
};
match dial {
Ok(fut) => fut
.map(|r| (address, r.map_err(|e| TransportError::Other(e))))
.boxed(),
Err(err) => futures::future::ready((address, Err(err))).boxed(),
}
}
Err(address) => futures::future::ready((
address.clone(),
Err(TransportError::MultiaddrNotSupported(address)),
))
.boxed(),
});
let dials = FuturesUnordered::new();
while let Some(dial) = pending_dials.next() {
dials.push(dial);
if dials.len() == concurrency_factor.get() as usize {
break;
}
}
Self {
dials,
errors: Default::default(),
pending_dials: Box::new(pending_dials),
}
}
}
impl<TTrans> Future for ConcurrentDial<TTrans>
where
TTrans: Transport,
{
type Output = Result<
// Either one dial succeeded, returning the negotiated [`PeerId`], the address, the
// muxer and the addresses and errors of the dials that failed before.
(
Multiaddr,
TTrans::Output,
Vec<(Multiaddr, TransportError<TTrans::Error>)>,
),
// Or all dials failed, thus returning the address and error for each dial.
Vec<(Multiaddr, TransportError<TTrans::Error>)>,
>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match ready!(self.dials.poll_next_unpin(cx)) {
Some((addr, Ok(output))) => {
let errors = std::mem::replace(&mut self.errors, vec![]);
return Poll::Ready(Ok((addr, output, errors)));
}
Some((addr, Err(e))) => {
self.errors.push((addr, e));
if let Some(dial) = self.pending_dials.next() {
self.dials.push(dial)
}
}
None => {
return Poll::Ready(Err(std::mem::replace(&mut self.errors, vec![])));
}
}
}
}
}
/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
///
/// If the given address is already a `p2p` address for the given peer,
/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
///
/// If the given address is already a `p2p` address for a different peer
/// than the one given, the given `Multiaddr` is returned as an `Err`.
///
/// If the given address is not yet a `p2p` address for the given peer,
/// the `/p2p/<peer-id>` protocol is appended to the returned address.
fn p2p_addr(peer: Option<PeerId>, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
let peer = match peer {
Some(p) => p,
None => return Ok(addr),
};
if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() {
if &hash != peer.as_ref() {
return Err(addr);
}
Ok(addr)
} else {
Ok(addr.with(multiaddr::Protocol::P2p(peer.into())))
}
}

View File

@ -1,260 +0,0 @@
// Copyright 2021 Protocol Labs.
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Async functions driving pending and established connections in the form of a task.
use super::concurrent_dial::ConcurrentDial;
use crate::{
connection::{
self,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
ConnectionError, ConnectionHandler, ConnectionId, IntoConnectionHandler,
PendingInboundConnectionError, PendingOutboundConnectionError, Substream,
},
muxing::StreamMuxer,
transport::{Transport, TransportError},
Multiaddr, PeerId,
};
use futures::{
channel::{mpsc, oneshot},
future::{poll_fn, Either, Future},
SinkExt, StreamExt,
};
use std::pin::Pin;
use void::Void;
/// Commands that can be sent to a task.
#[derive(Debug)]
pub enum Command<T> {
/// Notify the connection handler of an event.
NotifyHandler(T),
/// Gracefully close the connection (active close) before
/// terminating the task.
Close,
}
#[derive(Debug)]
pub enum PendingConnectionEvent<TTrans>
where
TTrans: Transport,
{
ConnectionEstablished {
id: ConnectionId,
output: TTrans::Output,
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed in parallel. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<TTrans::Error>)>)>,
},
/// A pending connection failed.
PendingFailed {
id: ConnectionId,
error: Either<
PendingOutboundConnectionError<TTrans::Error>,
PendingInboundConnectionError<TTrans::Error>,
>,
},
}
#[derive(Debug)]
pub enum EstablishedConnectionEvent<THandler: IntoConnectionHandler> {
/// A node we are connected to has changed its address.
AddressChange {
id: ConnectionId,
peer_id: PeerId,
new_address: Multiaddr,
},
/// Notify the manager of an event from the connection.
Notify {
id: ConnectionId,
peer_id: PeerId,
event: THandlerOutEvent<THandler>,
},
/// A connection closed, possibly due to an error.
///
/// If `error` is `None`, the connection has completed
/// an active orderly close.
Closed {
id: ConnectionId,
peer_id: PeerId,
error: Option<ConnectionError<THandlerError<THandler>>>,
handler: THandler::Handler,
},
}
pub async fn new_for_pending_outgoing_connection<TTrans>(
connection_id: ConnectionId,
dial: ConcurrentDial<TTrans>,
drop_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
) where
TTrans: Transport,
{
match futures::future::select(drop_receiver, Box::pin(dial)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
id: connection_id,
error: Either::Left(PendingOutboundConnectionError::Aborted),
})
.await;
}
Either::Left((Ok(v), _)) => void::unreachable(v),
Either::Right((Ok((address, output, errors)), _)) => {
let _ = events
.send(PendingConnectionEvent::ConnectionEstablished {
id: connection_id,
output,
outgoing: Some((address, errors)),
})
.await;
}
Either::Right((Err(e), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
id: connection_id,
error: Either::Left(PendingOutboundConnectionError::Transport(e)),
})
.await;
}
}
}
pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
connection_id: ConnectionId,
future: TFut,
drop_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
) where
TTrans: Transport,
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
{
match futures::future::select(drop_receiver, Box::pin(future)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
id: connection_id,
error: Either::Right(PendingInboundConnectionError::Aborted),
})
.await;
}
Either::Left((Ok(v), _)) => void::unreachable(v),
Either::Right((Ok(output), _)) => {
let _ = events
.send(PendingConnectionEvent::ConnectionEstablished {
id: connection_id,
output,
outgoing: None,
})
.await;
}
Either::Right((Err(e), _)) => {
let _ = events
.send(PendingConnectionEvent::PendingFailed {
id: connection_id,
error: Either::Right(PendingInboundConnectionError::Transport(
TransportError::Other(e),
)),
})
.await;
}
}
}
pub async fn new_for_established_connection<TMuxer, THandler>(
connection_id: ConnectionId,
peer_id: PeerId,
mut connection: crate::connection::Connection<TMuxer, THandler::Handler>,
mut command_receiver: mpsc::Receiver<Command<THandlerInEvent<THandler>>>,
mut events: mpsc::Sender<EstablishedConnectionEvent<THandler>>,
) where
TMuxer: StreamMuxer,
THandler: IntoConnectionHandler,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
loop {
match futures::future::select(
command_receiver.next(),
poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
)
.await
{
Either::Left((Some(command), _)) => match command {
Command::NotifyHandler(event) => connection.inject_event(event),
Command::Close => {
command_receiver.close();
let (handler, closing_muxer) = connection.close();
let error = closing_muxer.await.err().map(ConnectionError::IO);
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error,
handler,
})
.await;
return;
}
},
// The manager has disappeared; abort.
Either::Left((None, _)) => return,
Either::Right((event, _)) => {
match event {
Ok(connection::Event::Handler(event)) => {
let _ = events
.send(EstablishedConnectionEvent::Notify {
id: connection_id,
peer_id,
event,
})
.await;
}
Ok(connection::Event::AddressChange(new_address)) => {
let _ = events
.send(EstablishedConnectionEvent::AddressChange {
id: connection_id,
peer_id,
new_address,
})
.await;
}
Err(error) => {
command_receiver.close();
let (handler, _closing_muxer) = connection.close();
// Terminate the task with the error, dropping the connection.
let _ = events
.send(EstablishedConnectionEvent::Closed {
id: connection_id,
peer_id,
error: Some(error),
handler,
})
.await;
return;
}
}
}
}
}
}

View File

@ -1,275 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::muxing::{substream_from_ref, StreamMuxer, StreamMuxerEvent, SubstreamRef};
use futures::prelude::*;
use multiaddr::Multiaddr;
use smallvec::SmallVec;
use std::sync::Arc;
use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll};
/// Endpoint for a received substream.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum SubstreamEndpoint<TDialInfo> {
Dialer(TDialInfo),
Listener,
}
impl<TDialInfo> SubstreamEndpoint<TDialInfo> {
/// Returns true for `Dialer`.
pub fn is_dialer(&self) -> bool {
match self {
SubstreamEndpoint::Dialer(_) => true,
SubstreamEndpoint::Listener => false,
}
}
/// Returns true for `Listener`.
pub fn is_listener(&self) -> bool {
match self {
SubstreamEndpoint::Dialer(_) => false,
SubstreamEndpoint::Listener => true,
}
}
}
/// Implementation of `Stream` that handles substream multiplexing.
///
/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying
/// the `Muxing` will **not** close the existing substreams.
///
/// The stream will close once both the inbound and outbound channels are closed, and no more
/// outbound substream attempt is pending.
pub struct Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
/// The muxer used to manage substreams.
inner: Arc<TMuxer>,
/// List of substreams we are currently opening.
outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
}
/// Future that signals the remote that we have closed the connection.
pub struct Close<TMuxer> {
/// Muxer to close.
muxer: Arc<TMuxer>,
}
/// A successfully opened substream.
pub type Substream<TMuxer> = SubstreamRef<Arc<TMuxer>>;
/// Event that can happen on the `Muxing`.
pub enum SubstreamEvent<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
/// A new inbound substream arrived.
InboundSubstream {
/// The newly-opened substream. Will return EOF of an error if the `Muxing` is
/// destroyed or `close_graceful` is called.
substream: Substream<TMuxer>,
},
/// An outbound substream has successfully been opened.
OutboundSubstream {
/// User data that has been passed to the `open_substream` method.
user_data: TUserData,
/// The newly-opened substream. Will return EOF of an error if the `Muxing` is
/// destroyed or `close_graceful` is called.
substream: Substream<TMuxer>,
},
/// Address to the remote has changed. The previous one is now obsolete.
///
/// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes
/// > can change their IP address while retaining the same QUIC connection.
AddressChange(Multiaddr),
}
/// Identifier for a substream being opened.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct OutboundSubstreamId(usize);
impl<TMuxer, TUserData> Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
/// Creates a new node events stream.
pub fn new(muxer: TMuxer) -> Self {
Muxing {
inner: Arc::new(muxer),
outbound_substreams: SmallVec::new(),
}
}
/// Starts the process of opening a new outbound substream.
///
/// After calling this method, polling the stream should eventually produce either an
/// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has
/// been passed to this method.
pub fn open_substream(&mut self, user_data: TUserData) {
let raw = self.inner.open_outbound();
self.outbound_substreams.push((user_data, raw));
}
/// Destroys the node stream and returns all the pending outbound substreams, plus an object
/// that signals the remote that we shut down the connection.
#[must_use]
pub fn close(mut self) -> (Close<TMuxer>, Vec<TUserData>) {
let substreams = self.cancel_outgoing();
let close = Close {
muxer: self.inner.clone(),
};
(close, substreams)
}
/// Destroys all outbound streams and returns the corresponding user data.
pub fn cancel_outgoing(&mut self) -> Vec<TUserData> {
let mut out = Vec::with_capacity(self.outbound_substreams.len());
for (user_data, outbound) in self.outbound_substreams.drain(..) {
out.push(user_data);
self.inner.destroy_outbound(outbound);
}
out
}
/// Provides an API similar to `Future`.
pub fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> {
// Polling inbound substream.
match self.inner.poll_event(cx) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => {
let substream = substream_from_ref(self.inner.clone(), substream);
return Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream }));
}
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => {
return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr)))
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => {}
}
// Polling outbound substreams.
// We remove each element from `outbound_substreams` one by one and add them back.
for n in (0..self.outbound_substreams.len()).rev() {
let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n);
match self.inner.poll_outbound(cx, &mut outbound) {
Poll::Ready(Ok(substream)) => {
let substream = substream_from_ref(self.inner.clone(), substream);
self.inner.destroy_outbound(outbound);
return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream {
user_data,
substream,
}));
}
Poll::Pending => {
self.outbound_substreams.push((user_data, outbound));
}
Poll::Ready(Err(err)) => {
self.inner.destroy_outbound(outbound);
return Poll::Ready(Err(err.into()));
}
}
}
// Nothing happened. Register our task to be notified and return.
Poll::Pending
}
}
impl<TMuxer, TUserData> fmt::Debug for Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Muxing")
.field("outbound_substreams", &self.outbound_substreams.len())
.finish()
}
}
impl<TMuxer, TUserData> Drop for Muxing<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
{
fn drop(&mut self) {
// The substreams that were produced will continue to work, as the muxer is held in an Arc.
// However we will no longer process any further inbound or outbound substream, and we
// therefore close everything.
for (_, outbound) in self.outbound_substreams.drain(..) {
self.inner.destroy_outbound(outbound);
}
}
}
impl<TMuxer> Future for Close<TMuxer>
where
TMuxer: StreamMuxer,
{
type Output = Result<(), IoError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.muxer.close(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
}
}
}
impl<TMuxer> fmt::Debug for Close<TMuxer>
where
TMuxer: StreamMuxer,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Close").finish()
}
}
impl<TMuxer, TUserData> fmt::Debug for SubstreamEvent<TMuxer, TUserData>
where
TMuxer: StreamMuxer,
TMuxer::Substream: fmt::Debug,
TUserData: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubstreamEvent::InboundSubstream { substream } => f
.debug_struct("SubstreamEvent::OutboundClosed")
.field("substream", substream)
.finish(),
SubstreamEvent::OutboundSubstream {
user_data,
substream,
} => f
.debug_struct("SubstreamEvent::OutboundSubstream")
.field("user_data", user_data)
.field("substream", substream)
.finish(),
SubstreamEvent::AddressChange(address) => f
.debug_struct("SubstreamEvent::AddressChange")
.field("address", address)
.finish(),
}
}
}

View File

@ -61,18 +61,16 @@ pub mod connection;
pub mod either;
pub mod identity;
pub mod muxing;
pub mod network;
pub mod peer_record;
pub mod signed_envelope;
pub mod transport;
pub mod upgrade;
pub use connection::{Connected, ConnectedPoint, Endpoint};
pub use connection::{ConnectedPoint, Endpoint};
pub use identity::PublicKey;
pub use multiaddr::Multiaddr;
pub use multihash;
pub use muxing::StreamMuxer;
pub use network::{DialOpts, Network};
pub use peer_id::PeerId;
pub use peer_record::PeerRecord;
pub use signed_envelope::SignedEnvelope;

View File

@ -1,738 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod event;
pub mod peer;
pub use crate::connection::{ConnectionCounters, ConnectionLimits, Endpoint};
pub use event::{IncomingConnection, NetworkEvent};
pub use peer::Peer;
use crate::{
connection::{
handler::{THandlerInEvent, THandlerOutEvent},
pool::{Pool, PoolConfig, PoolEvent},
ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo, IntoConnectionHandler,
ListenerId, ListenersEvent, ListenersStream, PendingPoint, Substream,
},
muxing::StreamMuxer,
transport::{Transport, TransportError},
Executor, Multiaddr, PeerId,
};
use either::Either;
use multihash::Multihash;
use std::{
convert::TryFrom as _,
error, fmt,
num::{NonZeroU8, NonZeroUsize},
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;
/// Implementation of `Stream` that handles the nodes.
pub struct Network<TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
/// The local peer ID.
local_peer_id: PeerId,
/// Listeners for incoming connections.
listeners: ListenersStream<TTrans>,
/// The nodes currently active.
pool: Pool<THandler, TTrans>,
}
impl<TTrans, THandler> fmt::Debug for Network<TTrans, THandler>
where
TTrans: fmt::Debug + Transport,
THandler: fmt::Debug + ConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ReachAttempts")
.field("local_peer_id", &self.local_peer_id)
.field("listeners", &self.listeners)
.field("peers", &self.pool)
.finish()
}
}
impl<TTrans, THandler> Unpin for Network<TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
}
impl<TTrans, THandler> Network<TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
/// Checks whether the network has an established connection to a peer.
pub fn is_connected(&self, peer: &PeerId) -> bool {
self.pool.is_connected(peer)
}
fn dialing_attempts(&self, peer: PeerId) -> impl Iterator<Item = &ConnectionId> {
self.pool
.iter_pending_info()
.filter(move |(_, endpoint, peer_id)| {
matches!(endpoint, PendingPoint::Dialer { .. }) && peer_id.as_ref() == Some(&peer)
})
.map(|(connection_id, _, _)| connection_id)
}
/// Checks whether the network has an ongoing dialing attempt to a peer.
pub fn is_dialing(&self, peer: &PeerId) -> bool {
self.dialing_attempts(*peer).next().is_some()
}
fn disconnect(&mut self, peer: &PeerId) {
self.pool.disconnect(peer);
}
}
impl<TTrans, THandler> Network<TTrans, THandler>
where
TTrans: Transport + Clone + 'static,
<TTrans as Transport>::Error: Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
{
/// Creates a new node events stream.
pub fn new(transport: TTrans, local_peer_id: PeerId, config: NetworkConfig) -> Self {
Network {
local_peer_id,
listeners: ListenersStream::new(transport),
pool: Pool::new(local_peer_id, config.pool_config, config.limits),
}
}
/// Returns the transport passed when building this object.
pub fn transport(&self) -> &TTrans {
self.listeners.transport()
}
/// Start listening on the given multiaddress.
pub fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<ListenerId, TransportError<TTrans::Error>> {
self.listeners.listen_on(addr)
}
/// Remove a previously added listener.
///
/// Returns `true` if there was a listener with this ID, `false`
/// otherwise.
pub fn remove_listener(&mut self, id: ListenerId) -> bool {
self.listeners.remove_listener(id)
}
/// Returns an iterator that produces the list of addresses we are listening on.
pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.listen_addrs()
}
/// Maps the given `observed_addr`, representing an address of the local
/// node observed by a remote peer, onto the locally known listen addresses
/// to yield one or more addresses of the local node that may be publicly
/// reachable.
///
/// I.e. this method incorporates the view of other peers into the listen
/// addresses seen by the local node to account for possible IP and port
/// mappings performed by intermediate network devices in an effort to
/// obtain addresses for the local peer that are also reachable for peers
/// other than the peer who reported the `observed_addr`.
///
/// The translation is transport-specific. See [`Transport::address_translation`].
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr) -> Vec<Multiaddr>
where
THandler: 'a,
{
let transport = self.listeners.transport();
let mut addrs: Vec<_> = self
.listen_addrs()
.filter_map(move |server| transport.address_translation(server, observed_addr))
.collect();
// remove duplicates
addrs.sort_unstable();
addrs.dedup();
addrs
}
/// Returns the peer id of the local node.
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
/// Dial a known or unknown peer.
///
/// The given `handler` will be used to create the
/// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned.
pub fn dial(
&mut self,
handler: THandler,
opts: impl Into<DialOpts>,
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Transport + Send,
TTrans::Output: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
{
let opts = opts.into();
let (peer_id, addresses, dial_concurrency_factor_override, role_override) = match opts.0 {
// Dial a known peer.
Opts::WithPeerIdWithAddresses(WithPeerIdWithAddresses {
peer_id,
addresses,
dial_concurrency_factor_override,
role_override,
}) => (
Some(peer_id),
Either::Left(addresses.into_iter()),
dial_concurrency_factor_override,
role_override,
),
// Dial an unknown peer.
Opts::WithoutPeerIdWithAddress(WithoutPeerIdWithAddress {
address,
role_override,
}) => {
// If the address ultimately encapsulates an expected peer ID, dial that peer
// such that any mismatch is detected. We do not "pop off" the `P2p` protocol
// from the address, because it may be used by the `Transport`, i.e. `P2p`
// is a protocol component that can influence any transport, like `libp2p-dns`.
let peer_id = match address
.iter()
.last()
.and_then(|p| {
if let multiaddr::Protocol::P2p(ma) = p {
Some(PeerId::try_from(ma))
} else {
None
}
})
.transpose()
{
Ok(peer_id) => peer_id,
Err(multihash) => return Err(DialError::InvalidPeerId { handler, multihash }),
};
(
peer_id,
Either::Right(std::iter::once(address)),
None,
role_override,
)
}
};
self.pool.add_outgoing(
self.transport().clone(),
addresses,
peer_id,
handler,
role_override,
dial_concurrency_factor_override,
)
}
/// Returns information about the state of the `Network`.
pub fn info(&self) -> NetworkInfo {
let num_peers = self.pool.num_peers();
let connection_counters = self.pool.counters().clone();
NetworkInfo {
num_peers,
connection_counters,
}
}
/// Returns an iterator for information on all pending incoming connections.
pub fn incoming_info(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
self.pool.iter_pending_incoming()
}
/// Returns a list of all connected peers, i.e. peers to whom the `Network`
/// has at least one established connection.
pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.pool.iter_connected()
}
/// Checks whether the network has neither an ongoing dialing attempt,
/// nor an established connection to a peer.
pub fn is_disconnected(&self, peer: &PeerId) -> bool {
!self.is_connected(peer) && !self.is_dialing(peer)
}
/// Returns a list of all the peers to whom a new outgoing connection
/// is currently being established.
pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
self.pool
.iter_pending_info()
.filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer { .. }))
.filter_map(|(_, _, peer)| peer.as_ref())
}
/// Obtains a view of a [`Peer`] with the given ID in the network.
pub fn peer(&mut self, peer_id: PeerId) -> Peer<'_, TTrans, THandler> {
Peer::new(self, peer_id)
}
/// Accepts a pending incoming connection obtained via [`NetworkEvent::IncomingConnection`],
/// adding it to the `Network`s connection pool subject to the configured limits.
///
/// Once the connection is established and all transport protocol upgrades
/// completed, the connection is associated with the provided `handler`.
pub fn accept(
&mut self,
IncomingConnection {
upgrade,
local_addr,
send_back_addr,
}: IncomingConnection<TTrans::ListenerUpgrade>,
handler: THandler,
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where
TTrans: Transport,
TTrans::Output: Send + 'static,
TTrans::Error: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
{
self.pool.add_incoming(
upgrade,
handler,
IncomingInfo {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
},
)
}
/// Provides an API similar to `Stream`, except that it does not terminate.
pub fn poll<'a, TMuxer>(
&'a mut self,
cx: &mut Context<'_>,
) -> Poll<
NetworkEvent<'a, TTrans, THandlerInEvent<THandler>, THandlerOutEvent<THandler>, THandler>,
>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::Error: std::fmt::Debug,
TMuxer::OutboundSubstream: Send,
THandler: IntoConnectionHandler + Send + 'static,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>> + Send,
{
// Poll the listener(s) for new connections.
match ListenersStream::poll(Pin::new(&mut self.listeners), cx) {
Poll::Pending => (),
Poll::Ready(ListenersEvent::Incoming {
listener_id,
upgrade,
local_addr,
send_back_addr,
}) => {
return Poll::Ready(NetworkEvent::IncomingConnection {
listener_id,
connection: IncomingConnection {
upgrade,
local_addr,
send_back_addr,
},
})
}
Poll::Ready(ListenersEvent::NewAddress {
listener_id,
listen_addr,
}) => {
return Poll::Ready(NetworkEvent::NewListenerAddress {
listener_id,
listen_addr,
})
}
Poll::Ready(ListenersEvent::AddressExpired {
listener_id,
listen_addr,
}) => {
return Poll::Ready(NetworkEvent::ExpiredListenerAddress {
listener_id,
listen_addr,
})
}
Poll::Ready(ListenersEvent::Closed {
listener_id,
addresses,
reason,
}) => {
return Poll::Ready(NetworkEvent::ListenerClosed {
listener_id,
addresses,
reason,
})
}
Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
return Poll::Ready(NetworkEvent::ListenerError { listener_id, error })
}
}
// Poll the known peers.
let event = match self.pool.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(PoolEvent::ConnectionEstablished {
connection,
other_established_connection_ids,
concurrent_dial_errors,
}) => NetworkEvent::ConnectionEstablished {
connection,
other_established_connection_ids,
concurrent_dial_errors,
},
Poll::Ready(PoolEvent::PendingOutboundConnectionError {
id: _,
error,
handler,
peer,
}) => {
if let Some(peer) = peer {
NetworkEvent::DialError {
handler,
peer_id: peer,
error,
}
} else {
NetworkEvent::UnknownPeerDialError { error, handler }
}
}
Poll::Ready(PoolEvent::PendingInboundConnectionError {
id: _,
send_back_addr,
local_addr,
error,
handler,
}) => NetworkEvent::IncomingConnectionError {
error,
handler,
send_back_addr,
local_addr,
},
Poll::Ready(PoolEvent::ConnectionClosed {
id,
connected,
error,
remaining_established_connection_ids,
handler,
..
}) => NetworkEvent::ConnectionClosed {
id,
connected,
remaining_established_connection_ids,
error,
handler,
},
Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
NetworkEvent::ConnectionEvent { connection, event }
}
Poll::Ready(PoolEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
}) => NetworkEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
},
};
Poll::Ready(event)
}
}
/// Information about the network obtained by [`Network::info()`].
#[derive(Clone, Debug)]
pub struct NetworkInfo {
/// The total number of connected peers.
num_peers: usize,
/// Counters of ongoing network connections.
connection_counters: ConnectionCounters,
}
impl NetworkInfo {
/// The number of connected peers, i.e. peers with whom at least
/// one established connection exists.
pub fn num_peers(&self) -> usize {
self.num_peers
}
/// Gets counters for ongoing network connections.
pub fn connection_counters(&self) -> &ConnectionCounters {
&self.connection_counters
}
}
/// The (optional) configuration for a [`Network`].
///
/// The default configuration specifies no dedicated task executor, no
/// connection limits, a connection event buffer size of 32, and a
/// `notify_handler` buffer size of 8.
#[derive(Default)]
pub struct NetworkConfig {
/// Connection [`Pool`] configuration.
pool_config: PoolConfig,
/// The effective connection limits.
limits: ConnectionLimits,
}
impl NetworkConfig {
/// Configures the executor to use for spawning connection background tasks.
pub fn with_executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.pool_config.executor = Some(e);
self
}
/// Configures the executor to use for spawning connection background tasks,
/// only if no executor has already been configured.
pub fn or_else_with_executor<F>(mut self, f: F) -> Self
where
F: FnOnce() -> Option<Box<dyn Executor + Send>>,
{
self.pool_config.executor = self.pool_config.executor.or_else(f);
self
}
/// Sets the maximum number of events sent to a connection's background task
/// that may be buffered, if the task cannot keep up with their consumption and
/// delivery to the connection handler.
///
/// When the buffer for a particular connection is full, `notify_handler` will no
/// longer be able to deliver events to the associated `ConnectionHandler`,
/// thus exerting back-pressure on the connection and peer API.
pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
self.pool_config.task_command_buffer_size = n.get() - 1;
self
}
/// Sets the maximum number of buffered connection events (beyond a guaranteed
/// buffer of 1 event per connection).
///
/// When the buffer is full, the background tasks of all connections will stall.
/// In this way, the consumers of network events exert back-pressure on
/// the network connection I/O.
pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
self.pool_config.task_event_buffer_size = n;
self
}
/// Number of addresses concurrently dialed for a single outbound connection attempt.
pub fn with_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.pool_config.dial_concurrency_factor = factor;
self
}
/// Sets the connection limits to enforce.
pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self {
self.limits = limits;
self
}
}
/// Possible (synchronous) errors when dialing a peer.
#[derive(Debug, Clone, Error)]
pub enum DialError<THandler> {
/// The dialing attempt is rejected because of a connection limit.
#[error("The dialing attempt was rejected because of a connection limit: {limit}")]
ConnectionLimit {
limit: ConnectionLimit,
handler: THandler,
},
/// The dialing attempt is rejected because the peer being dialed is the local peer.
#[error("The dialing attempt was rejected because the peer being dialed is the local peer")]
LocalPeerId { handler: THandler },
/// The dialing attempt is rejected because the PeerId is invalid.
#[error("The dialing attempt was rejected because a valid PeerId could not be constructed from: {multihash:?}")]
InvalidPeerId {
handler: THandler,
multihash: Multihash,
},
}
/// Options to configure a dial to a known or unknown peer.
///
/// Used in [`Network::dial`].
///
/// To construct use either of:
///
/// - [`DialOpts::peer_id`] dialing a known peer
///
/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer
#[derive(Debug, Clone, PartialEq)]
pub struct DialOpts(pub(super) Opts);
impl DialOpts {
/// Dial a known peer.
pub fn peer_id(peer_id: PeerId) -> WithPeerId {
WithPeerId { peer_id }
}
/// Dial an unknown peer.
pub fn unknown_peer_id() -> WithoutPeerId {
WithoutPeerId {}
}
}
impl From<Multiaddr> for DialOpts {
fn from(address: Multiaddr) -> Self {
DialOpts::unknown_peer_id().address(address).build()
}
}
/// Internal options type.
///
/// Not to be constructed manually. Use either of the below instead:
///
/// - [`DialOpts::peer_id`] dialing a known peer
/// - [`DialOpts::unknown_peer_id`] dialing an unknown peer
#[derive(Debug, Clone, PartialEq)]
pub(super) enum Opts {
WithPeerIdWithAddresses(WithPeerIdWithAddresses),
WithoutPeerIdWithAddress(WithoutPeerIdWithAddress),
}
#[derive(Debug, Clone, PartialEq)]
pub struct WithPeerId {
pub(crate) peer_id: PeerId,
}
impl WithPeerId {
/// Specify a set of addresses to be used to dial the known peer.
pub fn addresses(self, addresses: Vec<Multiaddr>) -> WithPeerIdWithAddresses {
WithPeerIdWithAddresses {
peer_id: self.peer_id,
addresses,
dial_concurrency_factor_override: Default::default(),
role_override: Endpoint::Dialer,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct WithPeerIdWithAddresses {
pub(crate) peer_id: PeerId,
pub(crate) addresses: Vec<Multiaddr>,
pub(crate) dial_concurrency_factor_override: Option<NonZeroU8>,
pub(crate) role_override: Endpoint,
}
impl WithPeerIdWithAddresses {
/// Override [`NetworkConfig::with_dial_concurrency_factor`].
pub fn override_dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
self.dial_concurrency_factor_override = Some(factor);
self
}
/// Override role of local node on connection. I.e. execute the dial _as a
/// listener_.
///
/// See
/// [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer)
/// for details.
pub fn override_role(mut self, role: Endpoint) -> Self {
self.role_override = role;
self
}
/// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts {
DialOpts(Opts::WithPeerIdWithAddresses(self))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct WithoutPeerId {}
impl WithoutPeerId {
/// Specify a single address to dial the unknown peer.
pub fn address(self, address: Multiaddr) -> WithoutPeerIdWithAddress {
WithoutPeerIdWithAddress {
address,
role_override: Endpoint::Dialer,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct WithoutPeerIdWithAddress {
pub(crate) address: Multiaddr,
pub(crate) role_override: Endpoint,
}
impl WithoutPeerIdWithAddress {
/// Override role of local node on connection. I.e. execute the dial _as a
/// listener_.
///
/// See
/// [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer)
/// for details.
pub fn override_role(mut self, role: Endpoint) -> Self {
self.role_override = role;
self
}
/// Build the final [`DialOpts`].
pub fn build(self) -> DialOpts {
DialOpts(Opts::WithoutPeerIdWithAddress(self))
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::future::Future;
struct Dummy;
impl Executor for Dummy {
fn exec(&self, _: Pin<Box<dyn Future<Output = ()> + Send>>) {}
}
#[test]
fn set_executor() {
NetworkConfig::default()
.with_executor(Box::new(Dummy))
.with_executor(Box::new(|f| {
async_std::task::spawn(f);
}));
}
}

View File

@ -1,300 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Network events and associated information.
use crate::{
connection::{
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionId,
EstablishedConnection, IntoConnectionHandler, ListenerId, PendingInboundConnectionError,
PendingOutboundConnectionError,
},
transport::{Transport, TransportError},
Multiaddr, PeerId,
};
use std::fmt;
/// Event that can happen on the `Network`.
pub enum NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
/// One of the listeners gracefully closed.
ListenerClosed {
/// The listener ID that closed.
listener_id: ListenerId,
/// The addresses that the listener was listening on.
addresses: Vec<Multiaddr>,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), TTrans::Error>,
},
/// One of the listeners reported a non-fatal error.
ListenerError {
/// The listener that errored.
listener_id: ListenerId,
/// The listener error.
error: TTrans::Error,
},
/// One of the listeners is now listening on an additional address.
NewListenerAddress {
/// The listener that is listening on the new address.
listener_id: ListenerId,
/// The new address the listener is now also listening on.
listen_addr: Multiaddr,
},
/// One of the listeners is no longer listening on some address.
ExpiredListenerAddress {
/// The listener that is no longer listening on some address.
listener_id: ListenerId,
/// The expired address.
listen_addr: Multiaddr,
},
/// A new connection arrived on a listener.
///
/// To accept the connection, see [`Network::accept`](crate::Network::accept).
IncomingConnection {
/// The listener who received the connection.
listener_id: ListenerId,
/// The pending incoming connection.
connection: IncomingConnection<TTrans::ListenerUpgrade>,
},
/// An error happened on a connection during its initial handshake.
///
/// This can include, for example, an error during the handshake of the encryption layer, or
/// the connection unexpectedly closed.
IncomingConnectionError {
/// Local connection address.
local_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
/// The error that happened.
error: PendingInboundConnectionError<TTrans::Error>,
handler: THandler,
},
/// A new connection to a peer has been established.
ConnectionEstablished {
/// The newly established connection.
connection: EstablishedConnection<'a, TInEvent>,
/// List of other connections to the same peer.
///
/// Note: Does not include the connection reported through this event.
other_established_connection_ids: Vec<ConnectionId>,
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed in parallel. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<TTrans::Error>)>>,
},
/// An established connection to a peer has been closed.
///
/// A connection may close if
///
/// * it encounters an error, which includes the connection being
/// closed by the remote. In this case `error` is `Some`.
/// * it was actively closed by [`EstablishedConnection::start_close`],
/// i.e. a successful, orderly close. In this case `error` is `None`.
/// * it was actively closed by [`super::peer::ConnectedPeer::disconnect`] or
/// [`super::peer::DialingPeer::disconnect`], i.e. dropped without an
/// orderly close. In this case `error` is `None`.
///
ConnectionClosed {
/// The ID of the connection that encountered an error.
id: ConnectionId,
/// Information about the connection that encountered the error.
connected: Connected,
/// The error that occurred.
error: Option<ConnectionError<<THandler::Handler as ConnectionHandler>::Error>>,
/// List of remaining established connections to the same peer.
remaining_established_connection_ids: Vec<ConnectionId>,
handler: THandler::Handler,
},
/// A dialing attempt to an address of a peer failed.
DialError {
/// The number of remaining dialing attempts.
handler: THandler,
/// Id of the peer we were trying to dial.
peer_id: PeerId,
/// The error that happened.
error: PendingOutboundConnectionError<TTrans::Error>,
},
/// Failed to reach a peer that we were trying to dial.
UnknownPeerDialError {
/// The error that happened.
error: PendingOutboundConnectionError<TTrans::Error>,
handler: THandler,
},
/// An established connection produced an event.
ConnectionEvent {
/// The connection on which the event occurred.
connection: EstablishedConnection<'a, TInEvent>,
/// Event that was produced by the node.
event: TOutEvent,
},
/// An established connection has changed its address.
AddressChange {
/// The connection whose address has changed.
connection: EstablishedConnection<'a, TInEvent>,
/// New endpoint of this connection.
new_endpoint: ConnectedPoint,
/// Old endpoint of this connection.
old_endpoint: ConnectedPoint,
},
}
impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug
for NetworkEvent<'_, TTrans, TInEvent, TOutEvent, THandler>
where
TInEvent: fmt::Debug,
TOutEvent: fmt::Debug,
TTrans: Transport,
TTrans::Error: fmt::Debug,
THandler: IntoConnectionHandler,
<THandler::Handler as ConnectionHandler>::Error: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
NetworkEvent::NewListenerAddress {
listener_id,
listen_addr,
} => f
.debug_struct("NewListenerAddress")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
NetworkEvent::ExpiredListenerAddress {
listener_id,
listen_addr,
} => f
.debug_struct("ExpiredListenerAddress")
.field("listener_id", listener_id)
.field("listen_addr", listen_addr)
.finish(),
NetworkEvent::ListenerClosed {
listener_id,
addresses,
reason,
} => f
.debug_struct("ListenerClosed")
.field("listener_id", listener_id)
.field("addresses", addresses)
.field("reason", reason)
.finish(),
NetworkEvent::ListenerError { listener_id, error } => f
.debug_struct("ListenerError")
.field("listener_id", listener_id)
.field("error", error)
.finish(),
NetworkEvent::IncomingConnection { connection, .. } => f
.debug_struct("IncomingConnection")
.field("local_addr", &connection.local_addr)
.field("send_back_addr", &connection.send_back_addr)
.finish(),
NetworkEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
handler: _,
} => f
.debug_struct("IncomingConnectionError")
.field("local_addr", local_addr)
.field("send_back_addr", send_back_addr)
.field("error", error)
.finish(),
NetworkEvent::ConnectionEstablished {
connection,
concurrent_dial_errors,
..
} => f
.debug_struct("OutgoingConnectionEstablished")
.field("connection", connection)
.field("concurrent_dial_errors", concurrent_dial_errors)
.finish(),
NetworkEvent::ConnectionClosed {
id,
connected,
error,
..
} => f
.debug_struct("ConnectionClosed")
.field("id", id)
.field("connected", connected)
.field("error", error)
.finish(),
NetworkEvent::DialError {
handler: _,
peer_id,
error,
} => f
.debug_struct("DialError")
.field("peer_id", peer_id)
.field("error", error)
.finish(),
NetworkEvent::UnknownPeerDialError {
// multiaddr,
error,
..
} => f
.debug_struct("UnknownPeerDialError")
// .field("multiaddr", multiaddr)
.field("error", error)
.finish(),
NetworkEvent::ConnectionEvent { connection, event } => f
.debug_struct("ConnectionEvent")
.field("connection", connection)
.field("event", event)
.finish(),
NetworkEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
} => f
.debug_struct("AddressChange")
.field("connection", connection)
.field("new_endpoint", new_endpoint)
.field("old_endpoint", old_endpoint)
.finish(),
}
}
}
/// A pending incoming connection produced by a listener.
pub struct IncomingConnection<TUpgrade> {
/// The connection upgrade.
pub(crate) upgrade: TUpgrade,
/// Local connection address.
pub local_addr: Multiaddr,
/// Address used to send back data to the remote.
pub send_back_addr: Multiaddr,
}

View File

@ -1,513 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use super::Network;
use crate::{
connection::{
handler::THandlerInEvent, pool::Pool, ConnectionHandler, ConnectionId,
EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler, PendingConnection,
},
PeerId, Transport,
};
use std::{collections::VecDeque, error, fmt};
/// The possible representations of a peer in a [`Network`], as
/// seen by the local node.
///
/// > **Note**: In any state there may always be a pending incoming
/// > connection attempt from the peer, however, the remote identity
/// > of a peer is only known once a connection is fully established.
pub enum Peer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
/// At least one established connection exists to the peer.
Connected(ConnectedPeer<'a, TTrans, THandler>),
/// There is an ongoing dialing (i.e. outgoing connection) attempt
/// to the peer. There may already be other established connections
/// to the peer.
Dialing(DialingPeer<'a, TTrans, THandler>),
/// There exists no established connection to the peer and there is
/// currently no ongoing dialing (i.e. outgoing connection) attempt
/// in progress.
Disconnected(DisconnectedPeer<'a, TTrans, THandler>),
/// The peer represents the local node.
Local,
}
impl<'a, TTrans, THandler> fmt::Debug for Peer<'a, TTrans, THandler>
where
TTrans: Transport,
TTrans::Error: Send + 'static,
THandler: IntoConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
Peer::Connected(p) => f.debug_struct("Connected").field("peer", &p).finish(),
Peer::Dialing(p) => f.debug_struct("Dialing").field("peer", &p).finish(),
Peer::Disconnected(p) => f.debug_struct("Disconnected").field("peer", &p).finish(),
Peer::Local => f.debug_struct("Local").finish(),
}
}
}
impl<'a, TTrans, THandler> Peer<'a, TTrans, THandler>
where
TTrans: Transport,
TTrans::Error: Send + 'static,
THandler: IntoConnectionHandler,
{
pub(super) fn new(network: &'a mut Network<TTrans, THandler>, peer_id: PeerId) -> Self {
if peer_id == network.local_peer_id {
return Peer::Local;
}
if network.pool.is_connected(&peer_id) {
return Self::connected(network, peer_id);
}
if network.is_dialing(&peer_id) {
return Self::dialing(network, peer_id);
}
Self::disconnected(network, peer_id)
}
fn disconnected(network: &'a mut Network<TTrans, THandler>, peer_id: PeerId) -> Self {
Peer::Disconnected(DisconnectedPeer {
_network: network,
peer_id,
})
}
fn connected(network: &'a mut Network<TTrans, THandler>, peer_id: PeerId) -> Self {
Peer::Connected(ConnectedPeer { network, peer_id })
}
fn dialing(network: &'a mut Network<TTrans, THandler>, peer_id: PeerId) -> Self {
Peer::Dialing(DialingPeer { network, peer_id })
}
}
impl<'a, TTrans, THandler> Peer<'a, TTrans, THandler>
where
TTrans: Transport + Clone + Send + 'static,
TTrans::Output: Send + 'static,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
THandler: IntoConnectionHandler + Send + 'static,
THandler::Handler: ConnectionHandler + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
{
/// Checks whether the peer is currently connected.
///
/// Returns `true` iff [`Peer::into_connected`] returns `Some`.
pub fn is_connected(&self) -> bool {
match self {
Peer::Connected(..) => true,
Peer::Dialing(peer) => peer.is_connected(),
Peer::Disconnected(..) => false,
Peer::Local => false,
}
}
/// Checks whether the peer is currently being dialed.
///
/// Returns `true` iff [`Peer::into_dialing`] returns `Some`.
pub fn is_dialing(&self) -> bool {
match self {
Peer::Dialing(_) => true,
Peer::Connected(peer) => peer.is_dialing(),
Peer::Disconnected(..) => false,
Peer::Local => false,
}
}
/// Checks whether the peer is currently disconnected.
///
/// Returns `true` iff [`Peer::into_disconnected`] returns `Some`.
pub fn is_disconnected(&self) -> bool {
matches!(self, Peer::Disconnected(..))
}
/// Converts the peer into a `ConnectedPeer`, if an established connection exists.
///
/// Succeeds if the there is at least one established connection to the peer.
pub fn into_connected(self) -> Option<ConnectedPeer<'a, TTrans, THandler>> {
match self {
Peer::Connected(peer) => Some(peer),
Peer::Dialing(peer) => peer.into_connected(),
Peer::Disconnected(..) => None,
Peer::Local => None,
}
}
/// Converts the peer into a `DialingPeer`, if a dialing attempt exists.
///
/// Succeeds if the there is at least one pending outgoing connection to the peer.
pub fn into_dialing(self) -> Option<DialingPeer<'a, TTrans, THandler>> {
match self {
Peer::Dialing(peer) => Some(peer),
Peer::Connected(peer) => peer.into_dialing(),
Peer::Disconnected(..) => None,
Peer::Local => None,
}
}
/// Converts the peer into a `DisconnectedPeer`, if neither an established connection
/// nor a dialing attempt exists.
pub fn into_disconnected(self) -> Option<DisconnectedPeer<'a, TTrans, THandler>> {
match self {
Peer::Disconnected(peer) => Some(peer),
_ => None,
}
}
}
/// The representation of a peer in a [`Network`] to whom at least
/// one established connection exists. There may also be additional ongoing
/// dialing attempts to the peer.
pub struct ConnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
network: &'a mut Network<TTrans, THandler>,
peer_id: PeerId,
}
impl<'a, TTrans, THandler> ConnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
<TTrans as Transport>::Error: Send + 'static,
THandler: IntoConnectionHandler,
{
pub fn id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the `ConnectedPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
Peer::Connected(self)
}
/// Obtains an established connection to the peer by ID.
pub fn connection(
&mut self,
id: ConnectionId,
) -> Option<EstablishedConnection<THandlerInEvent<THandler>>> {
self.network.pool.get_established(id)
}
/// The number of established connections to the peer.
pub fn num_connections(&self) -> u32 {
self.network.pool.num_peer_established(self.peer_id)
}
/// Checks whether there is an ongoing dialing attempt to the peer.
///
/// Returns `true` iff [`ConnectedPeer::into_dialing`] returns `Some`.
pub fn is_dialing(&self) -> bool {
self.network.is_dialing(&self.peer_id)
}
/// Converts this peer into a [`DialingPeer`], if there is an ongoing
/// dialing attempt, `None` otherwise.
pub fn into_dialing(self) -> Option<DialingPeer<'a, TTrans, THandler>> {
if self.network.is_dialing(&self.peer_id) {
Some(DialingPeer {
network: self.network,
peer_id: self.peer_id,
})
} else {
None
}
}
/// Gets an iterator over all established connections to the peer.
pub fn connections(
&mut self,
) -> EstablishedConnectionIter<impl Iterator<Item = ConnectionId>, THandlerInEvent<THandler>>
{
self.network.pool.iter_peer_established(&self.peer_id)
}
/// Obtains some established connection to the peer.
pub fn some_connection(&mut self) -> EstablishedConnection<THandlerInEvent<THandler>> {
self.connections()
.into_first()
.expect("By `Peer::new` and the definition of `ConnectedPeer`.")
}
/// Disconnects from the peer, closing all connections.
pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, THandler> {
self.network.disconnect(&self.peer_id);
DisconnectedPeer {
_network: self.network,
peer_id: self.peer_id,
}
}
}
impl<'a, TTrans, THandler> fmt::Debug for ConnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
TTrans::Error: Send + 'static,
THandler: IntoConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ConnectedPeer")
.field("peer_id", &self.peer_id)
.field(
"established",
&self
.network
.pool
.iter_peer_established_info(&self.peer_id)
.collect::<Vec<_>>(),
)
.field("attempts", &self.network.is_dialing(&self.peer_id))
.finish()
}
}
/// The representation of a peer in a [`Network`] to whom a dialing
/// attempt is ongoing. There may already exist other established
/// connections to this peer.
pub struct DialingPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
network: &'a mut Network<TTrans, THandler>,
peer_id: PeerId,
}
impl<'a, TTrans, THandler> DialingPeer<'a, TTrans, THandler>
where
TTrans: Transport,
TTrans::Error: Send + 'static,
THandler: IntoConnectionHandler,
{
pub fn id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the `DialingPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
Peer::Dialing(self)
}
/// Disconnects from this peer, closing all established connections and
/// aborting all dialing attempts.
pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, THandler> {
self.network.disconnect(&self.peer_id);
DisconnectedPeer {
_network: self.network,
peer_id: self.peer_id,
}
}
/// Checks whether there is an established connection to the peer.
///
/// Returns `true` iff [`DialingPeer::into_connected`] returns `Some`.
pub fn is_connected(&self) -> bool {
self.network.pool.is_connected(&self.peer_id)
}
/// Converts the peer into a `ConnectedPeer`, if an established connection exists.
pub fn into_connected(self) -> Option<ConnectedPeer<'a, TTrans, THandler>> {
if self.is_connected() {
Some(ConnectedPeer {
peer_id: self.peer_id,
network: self.network,
})
} else {
None
}
}
/// Obtains a dialing attempt to the peer by connection ID of
/// the current connection attempt.
pub fn attempt(&mut self, id: ConnectionId) -> Option<DialingAttempt<'_, THandler>> {
Some(DialingAttempt {
peer_id: self.peer_id,
inner: self.network.pool.get_outgoing(id)?,
})
}
/// Gets an iterator over all dialing (i.e. pending outgoing) connections to the peer.
pub fn attempts(&mut self) -> DialingAttemptIter<'_, THandler, TTrans> {
DialingAttemptIter::new(&self.peer_id, &mut self.network)
}
/// Obtains some dialing connection to the peer.
///
/// At least one dialing connection is guaranteed to exist on a `DialingPeer`.
pub fn some_attempt(&mut self) -> DialingAttempt<'_, THandler> {
self.attempts()
.into_first()
.expect("By `Peer::new` and the definition of `DialingPeer`.")
}
}
impl<'a, TTrans, THandler> fmt::Debug for DialingPeer<'a, TTrans, THandler>
where
TTrans: Transport,
TTrans::Error: Send + 'static,
THandler: IntoConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("DialingPeer")
.field("peer_id", &self.peer_id)
.field(
"established",
&self
.network
.pool
.iter_peer_established_info(&self.peer_id)
.collect::<Vec<_>>(),
)
.finish()
}
}
/// The representation of a peer to whom the `Network` has currently
/// neither an established connection, nor an ongoing dialing attempt
/// initiated by the local peer.
pub struct DisconnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
peer_id: PeerId,
_network: &'a mut Network<TTrans, THandler>,
}
impl<'a, TTrans, THandler> fmt::Debug for DisconnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("DisconnectedPeer")
.field("peer_id", &self.peer_id)
.finish()
}
}
impl<'a, TTrans, THandler> DisconnectedPeer<'a, TTrans, THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
pub fn id(&self) -> &PeerId {
&self.peer_id
}
/// Returns the `DisconnectedPeer` into a `Peer`.
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
Peer::Disconnected(self)
}
}
/// A [`DialingAttempt`] is a pending outgoing connection attempt to a known /
/// expected remote peer ID.
pub struct DialingAttempt<'a, THandler: IntoConnectionHandler> {
peer_id: PeerId,
/// The underlying pending connection in the `Pool`.
inner: PendingConnection<'a, THandler>,
}
impl<'a, THandler: IntoConnectionHandler> DialingAttempt<'a, THandler> {
/// Returns the ID of the current connection attempt.
pub fn id(&self) -> ConnectionId {
self.inner.id()
}
/// Returns the (expected) peer ID of the dialing attempt.
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
/// Aborts the dialing attempt.
pub fn abort(self) {
self.inner.abort();
}
}
/// An iterator over the ongoing dialing attempts to a peer.
pub struct DialingAttemptIter<'a, THandler: IntoConnectionHandler, TTrans: Transport> {
/// The peer whose dialing attempts are being iterated.
peer_id: &'a PeerId,
/// The underlying connection `Pool` of the `Network`.
pool: &'a mut Pool<THandler, TTrans>,
/// [`ConnectionId`]s of the dialing attempts of the peer.
connections: VecDeque<ConnectionId>,
}
// Note: Ideally this would be an implementation of `Iterator`, but that
// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and
// a different definition of `Iterator`.
impl<'a, THandler: IntoConnectionHandler, TTrans: Transport>
DialingAttemptIter<'a, THandler, TTrans>
{
fn new(peer_id: &'a PeerId, network: &'a mut Network<TTrans, THandler>) -> Self {
let connections = network.dialing_attempts(*peer_id).map(|id| *id).collect();
Self {
pool: &mut network.pool,
peer_id,
connections,
}
}
/// Obtains the next dialing connection, if any.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<DialingAttempt<'_, THandler>> {
let connection_id = self.connections.pop_front()?;
let inner = self.pool.get_outgoing(connection_id)?;
Some(DialingAttempt {
peer_id: *self.peer_id,
inner,
})
}
/// Returns the first connection, if any, consuming the iterator.
pub fn into_first<'b>(mut self) -> Option<DialingAttempt<'b, THandler>>
where
'a: 'b,
{
let connection_id = self.connections.pop_front()?;
let inner = self.pool.get_outgoing(connection_id)?;
Some(DialingAttempt {
peer_id: *self.peer_id,
inner,
})
}
}

View File

@ -46,7 +46,7 @@ use std::{
};
/// A `Builder` facilitates upgrading of a [`Transport`] for use with
/// a [`Network`].
/// a `Swarm`.
///
/// The upgrade process is defined by the following stages:
///
@ -61,11 +61,9 @@ use std::{
/// and [multiplexed](Authenticated::multiplex).
/// 2. Authentication must precede the negotiation of a multiplexer.
/// 3. Applying a multiplexer is the last step in the upgrade process.
/// 4. The [`Transport::Output`] conforms to the requirements of a [`Network`],
/// 4. The [`Transport::Output`] conforms to the requirements of a `Swarm`,
/// namely a tuple of a [`PeerId`] (from the authentication upgrade) and a
/// [`StreamMuxer`] (from the multiplexing upgrade).
///
/// [`Network`]: crate::Network
#[derive(Clone)]
pub struct Builder<T> {
inner: T,