Add muxing trait and architecture

This commit is contained in:
Pierre Krieger 2017-11-28 12:20:28 +01:00
parent 710dec3110
commit b311d66b58
8 changed files with 276 additions and 20 deletions

View File

@ -14,3 +14,5 @@ members = [
[replace]
# Ring has a feature merged in master that hasn't been published yet ; remove this after 0.12.2
"ring:0.12.1" = { git = "https://github.com/briansmith/ring" }
# Multiaddr has a feature merged in master that hasn't been published yet ; remove this after 0.2.1
"multiaddr:0.2.0" = { git = "https://github.com/multiformats/rust-multiaddr" }

View File

@ -66,7 +66,7 @@ fn main() {
let future = with_echo.listen_on(swarm::multiaddr::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap())
.map_err(|_| panic!())
.unwrap()
.for_each(|socket| {
.for_each(|(socket, _)| {
loop_fn(socket, |socket| {
socket.into_future()
.map_err(|(err, _)| err)

View File

@ -8,5 +8,6 @@ bytes = "0.4"
multiaddr = "0.2.0"
multistream-select = { path = "../multistream-select" }
futures = "0.1"
smallvec = "0.5"
tokio-core = "0.1"
tokio-io = "0.1"

View File

@ -0,0 +1,185 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Contains the `ConnectionReuse` struct. Stores open muxed connections to nodes so that dialing
//! a node reuses the same connection instead of opening a new one.
//!
//! A `ConnectionReuse` can only be created from an `UpgradedNode` whose `ConnectionUpgrade`
//! yields as `StreamMuxer`.
//!
//! # Behaviour
//!
//! The API exposed by the `ConnectionReuse` struct consists in the `Transport` trait
//! implementation, with the `dial` and `listen_on` methods.
//!
//! When called on a `ConnectionReuse`, the `listen_on` method will listen on the given
//! multiaddress (by using the underlying `Transport`), then will apply a `flat_map` on the
//! incoming connections so that we actually listen to the incoming substreams of each connection.
//! TODO: design issue ; we need a way to handle the substreams that are opened by remotes on
//! connections opened by us
//!
//! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has
//! already been opened earlier, and open an outgoing substream on it. If none is available, it
//! will dial the given multiaddress.
//! TODO: this raises several questions ^
//!
//! TODO: this whole code is a dummy and should be rewritten after the design has been properly
//! figured out.
use futures::{Future, Stream, Async, Poll};
use futures::stream::Fuse as StreamFuse;
use multiaddr::Multiaddr;
use smallvec::SmallVec;
use std::io::Error as IoError;
use muxing::StreamMuxer;
use transport::{Transport, ConnectionUpgrade, UpgradedNode};
/// Allows reusing the same muxed connection multiple times.
///
/// Can be created from an `UpgradedNode` through the `From` trait.
///
/// Implements the `Transport` trait.
#[derive(Clone)]
pub struct ConnectionReuse<T, C>
where T: Transport,
C: ConnectionUpgrade<T::RawConn>,
{
// Underlying transport and connection upgrade for when we need to dial or listen.
inner: UpgradedNode<T, C>,
}
impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
where T: Transport,
C: ConnectionUpgrade<T::RawConn>,
{
#[inline]
fn from(node: UpgradedNode<T, C>) -> ConnectionReuse<T, C> {
ConnectionReuse {
inner: node,
}
}
}
impl<T, C> Transport for ConnectionReuse<T, C>
where T: Transport + 'static, // TODO: 'static :(
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :(
C: Clone,
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone, // TODO: not elegant
{
type RawConn = <C::Output as StreamMuxer>::Substream;
type Listener = ConnectionReuseListener<Box<Stream<Item = (C::Output, Multiaddr), Error = IoError>>, C::Output>;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
let listener = match self.inner.listen_on(addr.clone()) {
Ok(l) => l,
Err((inner, addr)) => {
return Err((ConnectionReuse {
inner: inner,
}, addr));
}
};
Ok(ConnectionReuseListener {
listener: listener.fuse(),
connections: Vec::new(),
})
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let dial = match self.inner.dial(addr) {
Ok(l) => l,
Err((inner, addr)) => {
return Err((ConnectionReuse {
inner: inner,
}, addr));
}
};
let future = dial
.and_then(|dial| {
dial.outbound()
});
Ok(Box::new(future) as Box<_>)
}
}
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct.
pub struct ConnectionReuseListener<S, M>
where S: Stream<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer,
{
listener: StreamFuse<S>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
}
impl<S, M> Stream for ConnectionReuseListener<S, M>
where S: Stream<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer + Clone + 'static, // TODO: 'static :(
{
type Item = (M::Substream, Multiaddr);
type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.listener.poll() {
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
let next_incoming = upgrade.clone().inbound();
self.connections.push((upgrade, next_incoming, client_addr));
},
Ok(Async::NotReady) => (),
Ok(Async::Ready(None)) => {
if self.connections.is_empty() {
return Ok(Async::Ready(None));
}
},
Err(err) => {
if self.connections.is_empty() {
return Err(err);
}
},
};
let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new();
for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in
self.connections.iter_mut().enumerate()
{
match next_incoming.poll() {
Ok(Async::Ready(incoming)) => {
let mut new_next = muxer.clone().inbound();
*next_incoming = new_next;
return Ok(Async::Ready(Some((incoming, client_addr.clone()))));
},
Ok(Async::NotReady) => (),
Err(_) => {
connections_to_drop.push(index);
},
};
}
for &index in connections_to_drop.iter().rev() {
self.connections.remove(index);
}
Ok(Async::NotReady)
}
}

View File

@ -24,11 +24,16 @@ extern crate bytes;
#[macro_use]
extern crate futures;
extern crate multistream_select;
extern crate smallvec;
extern crate tokio_io;
/// Multi-address re-export.
pub extern crate multiaddr;
mod connection_reuse;
pub mod muxing;
pub mod transport;
pub use self::connection_reuse::ConnectionReuse;
pub use self::muxing::StreamMuxer;
pub use self::transport::{ConnectionUpgrade, PlainText, Transport, UpgradedNode, OrUpgrade};

View File

@ -0,0 +1,59 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::future::{Future, FutureResult, ok};
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
/// Implemented on objects that can be turned into a substream.
///
/// > **Note**: The methods of this trait consume the object, but if the object implements `Clone`
/// > then you can clone it and keep the original in order to open additional substreams.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
type Substream: AsyncRead + AsyncWrite;
/// Future that will be resolved when a new incoming substream is open.
type InboundSubstream: Future<Item = Self::Substream, Error = IoError>;
/// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream: Future<Item = Self::Substream, Error = IoError>;
/// Produces a future that will be resolved when a new incoming substream arrives.
fn inbound(self) -> Self::InboundSubstream;
/// Opens a new outgoing substream, and produces a future that will be resolved when it becomes
/// available.
fn outbound(self) -> Self::OutboundSubstream;
}
impl<T> StreamMuxer for T where T: AsyncRead + AsyncWrite {
type Substream = Self;
type InboundSubstream = FutureResult<Self, IoError>; // TODO: use !
type OutboundSubstream = FutureResult<Self, IoError>; // TODO: use !
#[inline]
fn inbound(self) -> Self::InboundSubstream {
ok(self)
}
#[inline]
fn outbound(self) -> Self::OutboundSubstream {
ok(self)
}
}

View File

@ -53,7 +53,7 @@ pub trait Transport {
type RawConn: AsyncRead + AsyncWrite;
/// The listener produces incoming connections.
type Listener: Stream<Item = Self::RawConn, Error = IoError>;
type Listener: Stream<Item = (Self::RawConn, Multiaddr), Error = IoError>;
/// A future which indicates that we are currently dialing to a peer.
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
@ -104,7 +104,7 @@ pub struct DeniedTransport;
impl Transport for DeniedTransport {
// TODO: could use `!` for associated types once stable
type RawConn = Cursor<Vec<u8>>;
type Listener = Box<Stream<Item = Self::RawConn, Error = IoError>>;
type Listener = Box<Stream<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
#[inline]
@ -165,21 +165,21 @@ pub enum EitherStream<A, B> {
Second(B),
}
impl<A, B> Stream for EitherStream<A, B>
where A: Stream<Error = IoError>,
B: Stream<Error = IoError>
impl<A, B, Sa, Sb> Stream for EitherStream<A, B>
where A: Stream<Item = (Sa, Multiaddr), Error = IoError>,
B: Stream<Item = (Sb, Multiaddr), Error = IoError>
{
type Item = EitherSocket<A::Item, B::Item>;
type Item = (EitherSocket<Sa, Sb>, Multiaddr);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut EitherStream::First(ref mut a) => {
a.poll().map(|i| i.map(|v| v.map(EitherSocket::First)))
a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a))))
}
&mut EitherStream::Second(ref mut a) => {
a.poll().map(|i| i.map(|v| v.map(EitherSocket::Second)))
a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a))))
}
}
}
@ -543,7 +543,7 @@ impl<'a, T, C> UpgradedNode<T, C>
pub fn listen_on(
self,
addr: Multiaddr,
) -> Result<Box<Stream<Item = C::Output, Error = IoError> + 'a>, (Self, Multiaddr)>
) -> Result<Box<Stream<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, (Self, Multiaddr)>
where C::NamesIter: Clone, // TODO: not elegant
C: Clone
{
@ -563,7 +563,7 @@ impl<'a, T, C> UpgradedNode<T, C>
let stream = listening_stream
// Try to negotiate the protocol.
.and_then(move |connection| {
.and_then(move |(connection, client_addr)| {
let upgrade = upgrade.clone();
#[inline]
fn iter_map<T>((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) {
@ -572,12 +572,12 @@ impl<'a, T, C> UpgradedNode<T, C>
let iter = upgrade.protocol_names().map(iter_map);
let negotiated = multistream_select::listener_select_proto(connection, iter)
.map_err(|err| panic!("{:?}", err)); // TODO:
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade))
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr))
.map_err(|_| panic!()) // TODO:
})
.map_err(|_| panic!()) // TODO:
.and_then(|(upgrade_id, connection, upgrade)| {
upgrade.upgrade(connection, upgrade_id)
.map_err(|err| panic!("{:?}", err)) // TODO:
.and_then(|(upgrade_id, connection, upgrade, client_addr)| {
upgrade.upgrade(connection, upgrade_id).map(|c| (c, client_addr))
});
Ok(Box::new(stream))
@ -592,7 +592,7 @@ impl<T, C> Transport for UpgradedNode<T, C>
C: Clone
{
type RawConn = C::Output;
type Listener = Box<Stream<Item = C::Output, Error = IoError>>;
type Listener = Box<Stream<Item = (C::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
#[inline]

View File

@ -32,7 +32,7 @@ use tokio_core::reactor::Handle;
use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew};
use futures::Future;
use futures::stream::Stream;
use multiaddr::{Multiaddr, Protocol};
use multiaddr::{Multiaddr, Protocol, ToMultiaddr};
use swarm::Transport;
/// Represents a TCP/IP transport capability for libp2p.
@ -55,7 +55,7 @@ impl Transport for Tcp {
type RawConn = TcpStream;
/// The listener produces incoming connections.
type Listener = Box<Stream<Item = Self::RawConn, Error = IoError>>;
type Listener = Box<Stream<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
/// A future which indicates currently dialing to a peer.
type Dial = TcpStreamNew;
@ -69,7 +69,11 @@ impl Transport for Tcp {
TcpListener::bind(&socket_addr, &self.event_loop),
).map(|listener| {
// Pull out a stream of sockets for incoming connections
listener.incoming().map(|x| x.0)
listener.incoming().map(|(sock, addr)| {
let addr = addr.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails");
(sock, addr)
})
})
.flatten_stream(),
))
@ -188,7 +192,7 @@ mod tests {
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap();
let tcp = Tcp::new(core.handle()).unwrap();
let handle = core.handle();
let listener = tcp.listen_on(addr).unwrap().for_each(|sock| {
let listener = tcp.listen_on(addr).unwrap().for_each(|(sock, _)| {
// Define what to do with the socket that just connected to us
// Which in this case is read 3 bytes
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])