Refine boxing during transport construction. (#1794)

* Rework boxing during transport construction.

* Cleanup

* Fix chat-tokio example.

* Update changelogs and versions.
This commit is contained in:
Roman Borschel
2020-10-16 16:53:02 +02:00
committed by GitHub
parent 2dbf834d10
commit dc56d44edb
51 changed files with 369 additions and 331 deletions

View File

@ -25,11 +25,10 @@
//! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades.
use crate::{ConnectedPoint, ConnectionInfo, muxing::{StreamMuxer, StreamMuxerBox}};
use crate::ConnectedPoint;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error::Error, fmt};
use std::time::Duration;
pub mod and_then;
pub mod choice;
@ -129,24 +128,16 @@ pub trait Transport {
where
Self: Sized;
/// Boxes an authenticated, multiplexed transport, including the
/// `StreamMuxer` and transport errors.
fn boxed<I, M>(self) -> boxed::Boxed<(I, StreamMuxerBox), std::io::Error>
/// Boxes the transport, including custom transport errors.
fn boxed(self) -> boxed::Boxed<Self::Output>
where
Self: Transport<Output = (I, M)> + Sized + Clone + Send + Sync + 'static,
Self: Transport + Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
Self::Error: Send + Sync,
I: ConnectionInfo,
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + 'static,
M::OutboundSubstream: Send + 'static
{
boxed::boxed(
self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
boxed::boxed(self)
}
/// Applies a function on the connections created by the transport.
@ -198,33 +189,6 @@ pub trait Transport {
and_then::AndThen::new(self, f)
}
/// Adds a timeout to the connection setup (including upgrades) for all
/// inbound and outbound connections established through the transport.
fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized
{
timeout::TransportTimeout::new(self, timeout)
}
/// Adds a timeout to the connection setup (including upgrades) for all outbound
/// connections established through the transport.
fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized
{
timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
}
/// Adds a timeout to the connection setup (including upgrades) for all inbound
/// connections established through the transport.
fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized
{
timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
}
/// Begins a series of protocol upgrades via an
/// [`upgrade::Builder`](upgrade::Builder).
fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>

View File

@ -21,12 +21,13 @@
use crate::transport::{ListenerEvent, Transport, TransportError};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error, fmt, pin::Pin, sync::Arc};
use std::{error::Error, fmt, io, pin::Pin, sync::Arc};
/// See the `Transport::boxed` method.
pub fn boxed<T>(transport: T) -> Boxed<T::Output, T::Error>
/// Creates a new [`Boxed`] transport from the given transport.
pub fn boxed<T>(transport: T) -> Boxed<T::Output>
where
T: Transport + Clone + Send + Sync + 'static,
T::Error: Send + Sync,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
@ -36,49 +37,56 @@ where
}
}
/// See the `Transport::boxed` method.
pub struct Boxed<O, E> {
inner: Arc<dyn Abstract<O, E> + Send + Sync>,
/// A `Boxed` transport is a `Transport` whose `Dial`, `Listener`
/// and `ListenerUpgrade` futures are `Box`ed and only the `Output`
/// and `Error` types are captured in type variables.
pub struct Boxed<O> {
inner: Arc<dyn Abstract<O> + Send + Sync>,
}
type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
type Dial<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
type Listener<O> = Pin<Box<dyn Stream<Item = io::Result<ListenerEvent<ListenerUpgrade<O>, io::Error>>> + Send>>;
type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
trait Abstract<O, E> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>>;
trait Abstract<O> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
}
impl<T, O, E> Abstract<O, E> for T
impl<T, O> Abstract<O> for T
where
T: Transport<Output = O, Error = E> + Clone + 'static,
E: error::Error,
T: Transport<Output = O> + Clone + 'static,
T::Error: Send + Sync,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
{
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>> {
let listener = Transport::listen_on(self.clone(), addr)?;
let fut = listener.map_ok(|event| event.map(|upgrade| {
Box::pin(upgrade) as ListenerUpgrade<O, E>
}));
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>> {
let listener = Transport::listen_on(self.clone(), addr).map_err(|e| e.map(box_err))?;
let fut = listener.map_ok(|event|
event.map(|upgrade| {
let up = upgrade.map_err(box_err);
Box::pin(up) as ListenerUpgrade<O>
}).map_err(box_err)
).map_err(box_err);
Ok(Box::pin(fut))
}
fn dial(&self, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>> {
let fut = Transport::dial(self.clone(), addr)?;
Ok(Box::pin(fut) as Dial<_, _>)
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>> {
let fut = Transport::dial(self.clone(), addr)
.map(|r| r.map_err(box_err))
.map_err(|e| e.map(box_err))?;
Ok(Box::pin(fut) as Dial<_>)
}
}
impl<O, E> fmt::Debug for Boxed<O, E> {
impl<O> fmt::Debug for Boxed<O> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BoxedTransport")
}
}
impl<O, E> Clone for Boxed<O, E> {
impl<O> Clone for Boxed<O> {
fn clone(&self) -> Self {
Boxed {
inner: self.inner.clone(),
@ -86,14 +94,12 @@ impl<O, E> Clone for Boxed<O, E> {
}
}
impl<O, E> Transport for Boxed<O, E>
where E: error::Error,
{
impl<O> Transport for Boxed<O> {
type Output = O;
type Error = E;
type Listener = Listener<O, E>;
type ListenerUpgrade = ListenerUpgrade<O, E>;
type Dial = Dial<O, E>;
type Error = io::Error;
type Listener = Listener<O>;
type ListenerUpgrade = ListenerUpgrade<O>;
type Dial = Dial<O>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.inner.listen_on(addr)
@ -103,3 +109,7 @@ where E: error::Error,
self.inner.dial(addr)
}
}
fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}

View File

@ -31,8 +31,10 @@ use crate::{
TransportError,
ListenerEvent,
and_then::AndThen,
boxed::boxed,
timeout::TransportTimeout,
},
muxing::StreamMuxer,
muxing::{StreamMuxer, StreamMuxerBox},
upgrade::{
self,
OutboundUpgrade,
@ -46,7 +48,13 @@ use crate::{
};
use futures::{prelude::*, ready};
use multiaddr::Multiaddr;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use std::{
error::Error,
fmt,
pin::Pin,
task::{Context, Poll},
time::Duration
};
/// A `Builder` facilitates upgrading of a [`Transport`] for use with
/// a [`Network`].
@ -54,14 +62,14 @@ use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
/// The upgrade process is defined by the following stages:
///
/// [`authenticate`](Builder::authenticate)`{1}`
/// -> [`apply`](Builder::apply)`{*}`
/// -> [`multiplex`](Builder::multiplex)`{1}`
/// -> [`apply`](Authenticated::apply)`{*}`
/// -> [`multiplex`](Authenticated::multiplex)`{1}`
///
/// It thus enforces the following invariants on every transport
/// obtained from [`multiplex`](Builder::multiplex):
/// obtained from [`multiplex`](Authenticated::multiplex):
///
/// 1. The transport must be [authenticated](Builder::authenticate)
/// and [multiplexed](Builder::multiplex).
/// and [multiplexed](Authenticated::multiplex).
/// 2. Authentication must precede the negotiation of a multiplexer.
/// 3. Applying a multiplexer is the last step in the upgrade process.
/// 4. The [`Transport::Output`] conforms to the requirements of a [`Network`],
@ -69,6 +77,7 @@ use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
/// [`StreamMuxer`] (from the multiplexing upgrade).
///
/// [`Network`]: crate::Network
#[derive(Clone)]
pub struct Builder<T> {
inner: T,
version: upgrade::Version,
@ -97,7 +106,7 @@ where
///
/// * I/O upgrade: `C -> (I, D)`.
/// * Transport output: `C -> (I, D)`
pub fn authenticate<C, D, U, I, E>(self, upgrade: U) -> Builder<
pub fn authenticate<C, D, U, I, E>(self, upgrade: U) -> Authenticated<
AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
> where
T: Transport<Output = C>,
@ -109,95 +118,11 @@ where
E: Error + 'static,
{
let version = self.version;
Builder::new(self.inner.and_then(move |conn, endpoint| {
Authenticated(Builder::new(self.inner.and_then(move |conn, endpoint| {
Authenticate {
inner: upgrade::apply(conn, upgrade, endpoint, version)
}
}), version)
}
/// Applies an arbitrary upgrade on an authenticated, non-multiplexed
/// transport.
///
/// The upgrade receives the I/O resource (i.e. connection) `C` and
/// must produce a new I/O resource `D`. Any number of such upgrades
/// can be performed.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> D`.
/// * Transport output: `(I, C) -> (I, D)`.
pub fn apply<C, D, U, I, E>(self, upgrade: U) -> Builder<Upgrade<T, U>>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static,
{
Builder::new(Upgrade::new(self.inner, upgrade), self.version)
}
/// Upgrades the transport with a (sub)stream multiplexer.
///
/// The supplied upgrade receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process, yielding the underlying,
/// configured transport.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex<C, M, U, I, E>(self, upgrade: U)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}
/// Like [`Builder::multiplex`] but accepts a function which returns the upgrade.
///
/// The supplied function is applied to [`ConnectionInfo`] and [`ConnectedPoint`]
/// and returns an upgrade which receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process, yielding the underlying,
/// configured transport.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex_ext<C, M, U, I, E, F>(self, up: F)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a I, &'a ConnectedPoint) -> U + Clone
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, up(&i, &endpoint), endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}), version))
}
}
@ -234,7 +159,7 @@ where
/// An upgrade that negotiates a (sub)stream multiplexer on
/// top of an authenticated transport.
///
/// Configured through [`Builder::multiplex`].
/// Configured through [`Authenticated::multiplex`].
#[pin_project::pin_project]
pub struct Multiplex<C, U, I>
where
@ -265,10 +190,162 @@ where
}
}
/// An transport with peer authentication, obtained from [`Builder::authenticate`].
#[derive(Clone)]
pub struct Authenticated<T>(Builder<T>);
impl<T> Authenticated<T>
where
T: Transport,
T::Error: 'static
{
/// Applies an arbitrary upgrade.
///
/// The upgrade receives the I/O resource (i.e. connection) `C` and
/// must produce a new I/O resource `D`. Any number of such upgrades
/// can be performed.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> D`.
/// * Transport output: `(I, C) -> (I, D)`.
pub fn apply<C, D, U, I, E>(self, upgrade: U) -> Authenticated<Upgrade<T, U>>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = D, Error = E> + Clone,
E: Error + 'static,
{
Authenticated(Builder::new(Upgrade::new(self.0.inner, upgrade), self.0.version))
}
/// Upgrades the transport with a (sub)stream multiplexer.
///
/// The supplied upgrade receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex<C, M, U, I, E>(self, upgrade: U) -> Multiplexed<
AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
> where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
Multiplex { info: Some(i), upgrade }
}))
}
/// Like [`Authenticated::multiplex`] but accepts a function which returns the upgrade.
///
/// The supplied function is applied to [`ConnectionInfo`] and [`ConnectedPoint`]
/// and returns an upgrade which receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex_ext<C, M, U, I, E, F>(self, up: F) -> Multiplexed<
AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
> where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a I, &'a ConnectedPoint) -> U + Clone
{
let version = self.0.version;
Multiplexed(self.0.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, up(&i, &endpoint), endpoint, version);
Multiplex { info: Some(i), upgrade }
}))
}
}
/// A authenticated and multiplexed transport, obtained from
/// [`Authenticated::multiplex`].
#[derive(Clone)]
pub struct Multiplexed<T>(T);
impl<T> Multiplexed<T> {
/// Boxes the authenticated, multiplexed transport, including
/// the [`StreamMuxer`] and custom transport errors.
pub fn boxed<I, M>(self) -> super::Boxed<(I, StreamMuxerBox)>
where
T: Transport<Output = (I, M)> + Sized + Clone + Send + Sync + 'static,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Error: Send + Sync,
I: ConnectionInfo,
M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + 'static,
M::OutboundSubstream: Send + 'static
{
boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
}
/// Adds a timeout to the setup and protocol upgrade process for all
/// inbound and outbound connections established through the transport.
pub fn timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
Multiplexed(TransportTimeout::new(self.0, timeout))
}
/// Adds a timeout to the setup and protocol upgrade process for all
/// outbound connections established through the transport.
pub fn outbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
Multiplexed(TransportTimeout::with_outgoing_timeout(self.0, timeout))
}
/// Adds a timeout to the setup and protocol upgrade process for all
/// inbound connections established through the transport.
pub fn inbound_timeout(self, timeout: Duration) -> Multiplexed<TransportTimeout<T>> {
Multiplexed(TransportTimeout::with_ingoing_timeout(self.0, timeout))
}
}
impl<T> Transport for Multiplexed<T>
where
T: Transport,
{
type Output = T::Output;
type Error = T::Error;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.0.dial(addr)
}
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.0.listen_on(addr)
}
}
/// An inbound or outbound upgrade.
type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
/// An upgrade on an authenticated, non-multiplexed [`Transport`].
/// A custom upgrade on an [`Authenticated`] transport.
///
/// See [`Transport::upgrade`]
#[derive(Debug, Copy, Clone)]