Add MuxedTransport trait

This commit is contained in:
Pierre Krieger
2017-12-07 15:10:46 +01:00
committed by Vurich
parent 3bc4581eb5
commit e7cfc5ab44
5 changed files with 294 additions and 36 deletions

View File

@@ -7,7 +7,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
bytes = "0.4"
multiaddr = "0.2.0"
multistream-select = { path = "../multistream-select" }
futures = "0.1"
futures = { version = "0.1", features = ["use_std"] }
smallvec = "0.5"
tokio-io = "0.1"

View File

@@ -1,3 +1,5 @@
# libp2p-swarm
Transport and protocol upgrade system of *libp2p*.
This crate contains all the core traits and mechanisms of the transport system of *libp2p*.
@@ -20,6 +22,17 @@ multiple times in a row in order to chain as many implementations as you want.
// TODO: right now only tcp-transport exists, we need to add an example for chaining
// multiple transports once that makes sense
## The `MuxedTransport` trait
The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on
transports that can receive incoming connections on streams that have been opened with `dial()`.
The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of
incoming connections.
> **Note**: This trait is mainly implemented for transports that provide stream muxing
> capabilities.
# Connection upgrades
Once a socket has been opened with a remote through a `Transport`, it can be *upgraded*. This
@@ -37,14 +50,16 @@ There exists three kinds of connection upgrades: middlewares, muxers, and actual
Examples of middleware connection upgrades include `PlainTextConfig` (dummy upgrade) or
`SecioConfig` (encyption layer, provided by the `secio` crate).
The output of a middleware connection upgrade must implement the `AsyncRead` and `AsyncWrite`
The output of a middleware connection upgrade implements the `AsyncRead` and `AsyncWrite`
traits, just like sockets do.
A middleware can be applied on a transport by using the `with_upgrade` method of the
`Transport` trait. The return value of this method also implements the `Transport` trait, which
means that you can call `dial()` and `listen_on()` on it in order to directly obtain an
upgraded connection or a listener that will yield upgraded connections. An error is produced if
the remote doesn't support the protocol corresponding to the connection upgrade.
upgraded connection or a listener that will yield upgraded connections. Similarly, the
`dial_and_listen()` method will automatically apply the upgrade on both the dialer and the
listener. An error is produced if the remote doesn't support the protocol corresponding to the
connection upgrade.
```rust
extern crate libp2p_swarm;
@@ -68,10 +83,12 @@ If the output of the connection upgrade instead implements the `StreamMuxer` and
traits, then you can turn the `UpgradedNode` struct into a `ConnectionReuse` struct by calling
`ConnectionReuse::from(upgraded_node)`.
The `ConnectionReuse` struct then implements the `Transport` trait, and can be used to dial or
listen to multiaddresses, just like any other transport. The only difference is that dialing
a node will try to open a new substream on an existing connection instead of opening a new
one every time.
The `ConnectionReuse` struct then implements the `Transport` and `MuxedTransport` traits, and
can be used to dial or listen to multiaddresses, just like any other transport. The only
difference is that dialing a node will try to open a new substream on an existing connection
instead of opening a new one every time.
> **Note**: Right now the `ConnectionReuse` struct is not fully implemented.
TODO: add an example once the multiplex pull request is merged
@@ -83,11 +100,11 @@ implement the `AsyncRead` and `AsyncWrite` traits. This means that that the retu
transport.
However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named
`dial` and `listen_on`, which will yield you respectively a `Future` or a `Stream`, which you
can use to obtain the `Output`. This `Output` can then be used in a protocol-specific way to
use the protocol.
`dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`,
which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific
way to use the protocol.
```rust
```no_run
extern crate futures;
extern crate libp2p_ping;
extern crate libp2p_swarm;
@@ -98,6 +115,7 @@ use futures::Future;
use libp2p_ping::Ping;
use libp2p_swarm::Transport;
# fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
@@ -112,6 +130,7 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
// Runs until the ping arrives.
core.run(ping_finished_future).unwrap();
# }
```
## Grouping protocols

View File

@@ -45,11 +45,12 @@
use futures::{Future, Stream, Async, Poll};
use futures::stream::Fuse as StreamFuse;
use futures::stream;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use smallvec::SmallVec;
use std::io::Error as IoError;
use transport::{Transport, ConnectionUpgrade, UpgradedNode};
use transport::{Transport, ConnectionUpgrade, UpgradedNode, MuxedTransport};
/// Allows reusing the same muxed connection multiple times.
///
@@ -123,6 +124,39 @@ impl<T, C> Transport for ConnectionReuse<T, C>
}
}
impl<T, C> MuxedTransport for ConnectionReuse<T, C>
where T: Transport + 'static, // TODO: 'static :(
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :(
C: Clone,
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone // TODO: not elegant
{
type Incoming = Box<Stream<Item = <C::Output as StreamMuxer>::Substream, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::RawConn, Self::Incoming), Error = IoError>>;
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
let muxer_dial = match self.inner.dial(addr) {
Ok(l) => l,
Err((inner, addr)) => {
return Err((ConnectionReuse { inner: inner }, addr));
}
};
let future = muxer_dial
.and_then(|muxer| {
let dial = muxer.clone().outbound();
dial.map(|d| (d, muxer))
})
.and_then(|(dial, muxer)| {
let listener = stream::repeat(muxer).and_then(|muxer| muxer.inbound());
let listener = Box::new(listener) as Box<Stream<Item = _, Error = _>>;
Ok((dial, listener))
});
Ok(Box::new(future) as Box<_>)
}
}
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct.
pub struct ConnectionReuseListener<S, M>

View File

@@ -43,6 +43,17 @@
//! // TODO: right now only tcp-transport exists, we need to add an example for chaining
//! // multiple transports once that makes sense
//!
//! ## The `MuxedTransport` trait
//!
//! The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on
//! transports that can receive incoming connections on streams that have been opened with `dial()`.
//!
//! The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of
//! incoming connections.
//!
//! > **Note**: This trait is mainly implemented for transports that provide stream muxing
//! > capabilities.
//!
//! # Connection upgrades
//!
//! Once a socket has been opened with a remote through a `Transport`, it can be *upgraded*. This
@@ -60,14 +71,16 @@
//! Examples of middleware connection upgrades include `PlainTextConfig` (dummy upgrade) or
//! `SecioConfig` (encyption layer, provided by the `secio` crate).
//!
//! The output of a middleware connection upgrade must implement the `AsyncRead` and `AsyncWrite`
//! The output of a middleware connection upgrade implements the `AsyncRead` and `AsyncWrite`
//! traits, just like sockets do.
//!
//! A middleware can be applied on a transport by using the `with_upgrade` method of the
//! `Transport` trait. The return value of this method also implements the `Transport` trait, which
//! means that you can call `dial()` and `listen_on()` on it in order to directly obtain an
//! upgraded connection or a listener that will yield upgraded connections. An error is produced if
//! the remote doesn't support the protocol corresponding to the connection upgrade.
//! upgraded connection or a listener that will yield upgraded connections. Similarly, the
//! `dial_and_listen()` method will automatically apply the upgrade on both the dialer and the
//! listener. An error is produced if the remote doesn't support the protocol corresponding to the
//! connection upgrade.
//!
//! ```
//! extern crate libp2p_swarm;
@@ -93,10 +106,12 @@
//! traits, then you can turn the `UpgradedNode` struct into a `ConnectionReuse` struct by calling
//! `ConnectionReuse::from(upgraded_node)`.
//!
//! The `ConnectionReuse` struct then implements the `Transport` trait, and can be used to dial or
//! listen to multiaddresses, just like any other transport. The only difference is that dialing
//! a node will try to open a new substream on an existing connection instead of opening a new
//! one every time.
//! The `ConnectionReuse` struct then implements the `Transport` and `MuxedTransport` traits, and
//! can be used to dial or listen to multiaddresses, just like any other transport. The only
//! difference is that dialing a node will try to open a new substream on an existing connection
//! instead of opening a new one every time.
//!
//! > **Note**: Right now the `ConnectionReuse` struct is not fully implemented.
//!
//! TODO: add an example once the multiplex pull request is merged
//!
@@ -108,9 +123,9 @@
//! transport.
//!
//! However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named
//! `dial` and `listen_on`, which will yield you respectively a `Future` or a `Stream`, which you
//! can use to obtain the `Output`. This `Output` can then be used in a protocol-specific way to
//! use the protocol.
//! `dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`,
//! which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific
//! way to use the protocol.
//!
//! ```no_run
//! extern crate futures;

View File

@@ -105,6 +105,22 @@ pub trait Transport {
}
}
/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which
/// the dialed node can dial you back.
pub trait MuxedTransport: Transport {
/// Produces substreams on the dialed connection.
type Incoming: Stream<Item = Self::RawConn, Error = IoError>;
/// Future indicating when dialing succeeded.
type DialAndListen: Future<Item = (Self::RawConn, Self::Incoming), Error = IoError>;
/// Dial to the given multi-addr, and listen to incoming substreams on the dialed connection.
///
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)>
where Self: Sized;
}
/// Dummy implementation of `Transport` that just denies every single attempt.
#[derive(Debug, Copy, Clone)]
pub struct DeniedTransport;
@@ -126,6 +142,17 @@ impl Transport for DeniedTransport {
}
}
impl MuxedTransport for DeniedTransport {
// TODO: could use `!` once stable
type Incoming = Box<Stream<Item = Self::RawConn, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::RawConn, Self::Incoming), Error = IoError>>;
#[inline]
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
Err((DeniedTransport, addr))
}
}
/// Struct returned by `or_transport()`.
#[derive(Debug, Copy, Clone)]
pub struct OrTransport<A, B>(A, B);
@@ -135,7 +162,7 @@ impl<A, B> Transport for OrTransport<A, B>
B: Transport
{
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
type Listener = EitherStream<A::Listener, B::Listener>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type Dial = EitherTransportFuture<
<A::Dial as IntoFuture>::Future,
<B::Dial as IntoFuture>::Future,
@@ -143,12 +170,12 @@ impl<A, B> Transport for OrTransport<A, B>
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (first, addr) = match self.0.listen_on(addr) {
Ok((connec, addr)) => return Ok((EitherStream::First(connec), addr)),
Ok((connec, addr)) => return Ok((EitherListenStream::First(connec), addr)),
Err(err) => err,
};
match self.1.listen_on(addr) {
Ok((connec, addr)) => Ok((EitherStream::Second(connec), addr)),
Ok((connec, addr)) => Ok((EitherListenStream::Second(connec), addr)),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
}
@@ -198,6 +225,32 @@ impl<F> Clone for SimpleProtocol<F> {
}
}
impl<A, B> MuxedTransport for OrTransport<A, B>
where A: MuxedTransport,
B: MuxedTransport,
{
type Incoming = EitherIncomingStream<A::Incoming, B::Incoming>;
type DialAndListen = EitherMuxedTransportFuture<A::DialAndListen, B::DialAndListen>;
fn dial_and_listen(self, addr: Multiaddr)
-> Result<Self::DialAndListen, (Self, Multiaddr)>
{
let (first, addr) = match self.0.dial_and_listen(addr) {
Ok(connec) => {
return Ok(EitherMuxedTransportFuture::First(connec));
},
Err(err) => err,
};
match self.1.dial_and_listen(addr) {
Ok(connec) => {
Ok(EitherMuxedTransportFuture::Second(connec))
},
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
}
}
impl<C, F, O> ConnectionUpgrade<C> for SimpleProtocol<F>
where C: AsyncRead + AsyncWrite,
F: Fn(C) -> O,
@@ -223,12 +276,12 @@ impl<C, F, O> ConnectionUpgrade<C> for SimpleProtocol<F>
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
pub enum EitherStream<A, B> {
pub enum EitherListenStream<A, B> {
First(A),
Second(B),
}
impl<A, B, Sa, Sb> Stream for EitherStream<A, B>
impl<A, B, Sa, Sb> Stream for EitherListenStream<A, B>
where A: Stream<Item = (Sa, Multiaddr), Error = IoError>,
B: Stream<Item = (Sb, Multiaddr), Error = IoError>
{
@@ -238,16 +291,43 @@ impl<A, B, Sa, Sb> Stream for EitherStream<A, B>
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut EitherStream::First(ref mut a) => {
&mut EitherListenStream::First(ref mut a) => {
a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a))))
}
&mut EitherStream::Second(ref mut a) => {
&mut EitherListenStream::Second(ref mut a) => {
a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a))))
}
}
}
}
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
pub enum EitherIncomingStream<A, B> {
First(A),
Second(B),
}
impl<A, B, Sa, Sb> Stream for EitherIncomingStream<A, B>
where A: Stream<Item = Sa, Error = IoError>,
B: Stream<Item = Sb, Error = IoError>
{
type Item = EitherSocket<Sa, Sb>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut EitherIncomingStream::First(ref mut a) => {
a.poll().map(|i| i.map(|v| v.map(EitherSocket::First)))
}
&mut EitherIncomingStream::Second(ref mut a) => {
a.poll().map(|i| i.map(|v| v.map(EitherSocket::Second)))
}
}
}
}
/// Implements `Future` and redirects calls to either `First` or `Second`.
///
/// Additionally, the output will be wrapped inside a `EitherSocket`.
@@ -282,6 +362,40 @@ impl<A, B> Future for EitherTransportFuture<A, B>
}
}
/// Implements `Future` and redirects calls to either `First` or `Second`.
///
/// Additionally, the output will be wrapped inside a `EitherSocket` and a `EitherIncomingStream`.
///
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
/// > removed eventually.
#[derive(Debug, Copy, Clone)]
pub enum EitherMuxedTransportFuture<A, B> {
First(A),
Second(B),
}
impl<A, B, Da, Db, Sa, Sb> Future for EitherMuxedTransportFuture<A, B>
where A: Future<Item = (Da, Sa), Error = IoError>,
B: Future<Item = (Db, Sb), Error = IoError>
{
type Item = (EitherSocket<Da, Db>, EitherIncomingStream<Sa, Sb>);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherMuxedTransportFuture::First(ref mut a) => {
let (dial, listen) = try_ready!(a.poll());
Ok(Async::Ready((EitherSocket::First(dial), EitherIncomingStream::First(listen))))
}
&mut EitherMuxedTransportFuture::Second(ref mut b) => {
let (dial, listen) = try_ready!(b.poll());
Ok(Async::Ready((EitherSocket::Second(dial), EitherIncomingStream::Second(listen))))
}
}
}
}
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to either `First` or
/// `Second`.
#[derive(Debug, Copy, Clone)]
@@ -634,6 +748,70 @@ impl<'a, T, C> UpgradedNode<T, C>
Ok(Box::new(future))
}
/// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade
/// the connection. Also listens to incoming substream requires on that dialed connection, and
/// automatically upgrades the incoming substreams.
///
/// Note that this does the same as `MuxedTransport::dial_and_listen`, but with less
/// restrictions on the trait requirements.
pub fn dial_and_listen(self, addr: Multiaddr)
-> Result<Box<Future<Item = (C::Output, Box<Stream<Item = C::Output, Error = IoError> + 'a>), Error = IoError> + 'a>, (Self, Multiaddr)>
where T: MuxedTransport,
C::NamesIter: Clone, // TODO: not elegant
C: Clone
{
let upgrade = self.upgrade;
let dialed_fut = match self.transports.dial_and_listen(addr) {
Ok(f) => f,
Err((trans, addr)) => {
let builder = UpgradedNode {
transports: trans,
upgrade: upgrade,
};
return Err((builder, addr));
}
};
let upgrade2 = upgrade.clone();
let dialed_fut = dialed_fut
// Try to negotiate the protocol.
.and_then(move |(dialer, in_stream)| {
let dialer = {
let iter = upgrade2.protocol_names()
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
let negotiated = multistream_select::dialer_select_proto(dialer, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2))
}
.and_then(|(upgrade_id, connection, upgrade)| {
upgrade.upgrade(connection, upgrade_id)
});
let in_stream = in_stream
// Try to negotiate the protocol.
.and_then(move |connection| {
let upgrade = upgrade.clone();
#[inline]
fn iter_map<T>((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) {
(n, <Bytes as PartialEq>::eq, t)
}
let iter = upgrade.protocol_names().map(iter_map);
let negotiated = multistream_select::listener_select_proto(connection, iter);
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade))
.map_err(|err| IoError::new(IoErrorKind::Other, err))
})
.and_then(|(upgrade_id, connection, upgrade)| {
upgrade.upgrade(connection, upgrade_id)
});
dialer.map(|d| (d, Box::new(in_stream) as Box<Stream<Item = _, Error = _>>))
});
Ok(Box::new(dialed_fut) as Box<_>)
}
/// Start listening on the multiaddr using the transport that was passed to `new`.
/// Then whenever a connection is opened, it is upgraded.
///
@@ -698,16 +876,28 @@ impl<T, C> Transport for UpgradedNode<T, C>
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where Self: Sized
{
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
self.listen_on(addr)
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where Self: Sized
{
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
self.dial(addr)
}
}
impl<T, C> MuxedTransport for UpgradedNode<T, C>
where T: MuxedTransport + 'static,
C: ConnectionUpgrade<T::RawConn> + 'static,
C::Output: AsyncRead + AsyncWrite,
C::NamesIter: Clone, // TODO: not elegant
C: Clone
{
type Incoming = Box<Stream<Item = C::Output, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::RawConn, Self::Incoming), Error = IoError>>;
#[inline]
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
self.dial_and_listen(addr)
}
}