From b311d66b588f8985f2d6d70e635e8e3a9fc93c3c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 28 Nov 2017 12:20:28 +0100 Subject: [PATCH] Add muxing trait and architecture --- Cargo.toml | 2 + example/examples/echo-server.rs | 2 +- libp2p-swarm/Cargo.toml | 1 + libp2p-swarm/src/connection_reuse.rs | 185 +++++++++++++++++++++++++++ libp2p-swarm/src/lib.rs | 5 + libp2p-swarm/src/muxing.rs | 59 +++++++++ libp2p-swarm/src/transport.rs | 30 ++--- libp2p-tcp-transport/src/lib.rs | 12 +- 8 files changed, 276 insertions(+), 20 deletions(-) create mode 100644 libp2p-swarm/src/connection_reuse.rs create mode 100644 libp2p-swarm/src/muxing.rs diff --git a/Cargo.toml b/Cargo.toml index 42db7aa5..6f1accf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,5 @@ members = [ [replace] # Ring has a feature merged in master that hasn't been published yet ; remove this after 0.12.2 "ring:0.12.1" = { git = "https://github.com/briansmith/ring" } +# Multiaddr has a feature merged in master that hasn't been published yet ; remove this after 0.2.1 +"multiaddr:0.2.0" = { git = "https://github.com/multiformats/rust-multiaddr" } diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index c4f3c336..904919a6 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -66,7 +66,7 @@ fn main() { let future = with_echo.listen_on(swarm::multiaddr::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap()) .map_err(|_| panic!()) .unwrap() - .for_each(|socket| { + .for_each(|(socket, _)| { loop_fn(socket, |socket| { socket.into_future() .map_err(|(err, _)| err) diff --git a/libp2p-swarm/Cargo.toml b/libp2p-swarm/Cargo.toml index 39615d84..26dd3023 100644 --- a/libp2p-swarm/Cargo.toml +++ b/libp2p-swarm/Cargo.toml @@ -8,5 +8,6 @@ bytes = "0.4" multiaddr = "0.2.0" multistream-select = { path = "../multistream-select" } futures = "0.1" +smallvec = "0.5" tokio-core = "0.1" tokio-io = "0.1" diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs new file mode 100644 index 00000000..df3c3f79 --- /dev/null +++ b/libp2p-swarm/src/connection_reuse.rs @@ -0,0 +1,185 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Contains the `ConnectionReuse` struct. Stores open muxed connections to nodes so that dialing +//! a node reuses the same connection instead of opening a new one. +//! +//! A `ConnectionReuse` can only be created from an `UpgradedNode` whose `ConnectionUpgrade` +//! yields as `StreamMuxer`. +//! +//! # Behaviour +//! +//! The API exposed by the `ConnectionReuse` struct consists in the `Transport` trait +//! implementation, with the `dial` and `listen_on` methods. +//! +//! When called on a `ConnectionReuse`, the `listen_on` method will listen on the given +//! multiaddress (by using the underlying `Transport`), then will apply a `flat_map` on the +//! incoming connections so that we actually listen to the incoming substreams of each connection. +//! TODO: design issue ; we need a way to handle the substreams that are opened by remotes on +//! connections opened by us +//! +//! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has +//! already been opened earlier, and open an outgoing substream on it. If none is available, it +//! will dial the given multiaddress. +//! TODO: this raises several questions ^ +//! +//! TODO: this whole code is a dummy and should be rewritten after the design has been properly +//! figured out. + +use futures::{Future, Stream, Async, Poll}; +use futures::stream::Fuse as StreamFuse; +use multiaddr::Multiaddr; +use smallvec::SmallVec; +use std::io::Error as IoError; +use muxing::StreamMuxer; +use transport::{Transport, ConnectionUpgrade, UpgradedNode}; + +/// Allows reusing the same muxed connection multiple times. +/// +/// Can be created from an `UpgradedNode` through the `From` trait. +/// +/// Implements the `Transport` trait. +#[derive(Clone)] +pub struct ConnectionReuse + where T: Transport, + C: ConnectionUpgrade, +{ + // Underlying transport and connection upgrade for when we need to dial or listen. + inner: UpgradedNode, +} + +impl From> for ConnectionReuse + where T: Transport, + C: ConnectionUpgrade, +{ + #[inline] + fn from(node: UpgradedNode) -> ConnectionReuse { + ConnectionReuse { + inner: node, + } + } +} + +impl Transport for ConnectionReuse + where T: Transport + 'static, // TODO: 'static :( + C: ConnectionUpgrade + 'static, // TODO: 'static :( + C: Clone, + C::Output: StreamMuxer + Clone, + C::NamesIter: Clone, // TODO: not elegant +{ + type RawConn = ::Substream; + type Listener = ConnectionReuseListener>, C::Output>; + type Dial = Box>; + + fn listen_on(self, addr: Multiaddr) -> Result { + let listener = match self.inner.listen_on(addr.clone()) { + Ok(l) => l, + Err((inner, addr)) => { + return Err((ConnectionReuse { + inner: inner, + }, addr)); + } + }; + + Ok(ConnectionReuseListener { + listener: listener.fuse(), + connections: Vec::new(), + }) + } + + fn dial(self, addr: Multiaddr) -> Result { + let dial = match self.inner.dial(addr) { + Ok(l) => l, + Err((inner, addr)) => { + return Err((ConnectionReuse { + inner: inner, + }, addr)); + } + }; + + let future = dial + .and_then(|dial| { + dial.outbound() + }); + Ok(Box::new(future) as Box<_>) + } +} + +/// Implementation of `Stream + where S: Stream, + M: StreamMuxer, +{ + listener: StreamFuse, + connections: Vec<(M, ::InboundSubstream, Multiaddr)>, +} + +impl Stream for ConnectionReuseListener + where S: Stream, + M: StreamMuxer + Clone + 'static, // TODO: 'static :( +{ + type Item = (M::Substream, Multiaddr); + type Error = IoError; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.listener.poll() { + Ok(Async::Ready(Some((upgrade, client_addr)))) => { + let next_incoming = upgrade.clone().inbound(); + self.connections.push((upgrade, next_incoming, client_addr)); + }, + Ok(Async::NotReady) => (), + Ok(Async::Ready(None)) => { + if self.connections.is_empty() { + return Ok(Async::Ready(None)); + } + }, + Err(err) => { + if self.connections.is_empty() { + return Err(err); + } + }, + }; + + let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new(); + + for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in + self.connections.iter_mut().enumerate() + { + match next_incoming.poll() { + Ok(Async::Ready(incoming)) => { + let mut new_next = muxer.clone().inbound(); + *next_incoming = new_next; + return Ok(Async::Ready(Some((incoming, client_addr.clone())))); + }, + Ok(Async::NotReady) => (), + Err(_) => { + connections_to_drop.push(index); + }, + }; + } + + for &index in connections_to_drop.iter().rev() { + self.connections.remove(index); + } + + Ok(Async::NotReady) + } +} diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index 3602c643..968c1937 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -24,11 +24,16 @@ extern crate bytes; #[macro_use] extern crate futures; extern crate multistream_select; +extern crate smallvec; extern crate tokio_io; /// Multi-address re-export. pub extern crate multiaddr; +mod connection_reuse; +pub mod muxing; pub mod transport; +pub use self::connection_reuse::ConnectionReuse; +pub use self::muxing::StreamMuxer; pub use self::transport::{ConnectionUpgrade, PlainText, Transport, UpgradedNode, OrUpgrade}; diff --git a/libp2p-swarm/src/muxing.rs b/libp2p-swarm/src/muxing.rs new file mode 100644 index 00000000..d38576d0 --- /dev/null +++ b/libp2p-swarm/src/muxing.rs @@ -0,0 +1,59 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::future::{Future, FutureResult, ok}; +use std::io::Error as IoError; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Implemented on objects that can be turned into a substream. +/// +/// > **Note**: The methods of this trait consume the object, but if the object implements `Clone` +/// > then you can clone it and keep the original in order to open additional substreams. +pub trait StreamMuxer { + /// Type of the object that represents the raw substream where data can be read and written. + type Substream: AsyncRead + AsyncWrite; + /// Future that will be resolved when a new incoming substream is open. + type InboundSubstream: Future; + /// Future that will be resolved when the outgoing substream is open. + type OutboundSubstream: Future; + + /// Produces a future that will be resolved when a new incoming substream arrives. + fn inbound(self) -> Self::InboundSubstream; + + /// Opens a new outgoing substream, and produces a future that will be resolved when it becomes + /// available. + fn outbound(self) -> Self::OutboundSubstream; +} + +impl StreamMuxer for T where T: AsyncRead + AsyncWrite { + type Substream = Self; + type InboundSubstream = FutureResult; // TODO: use ! + type OutboundSubstream = FutureResult; // TODO: use ! + + #[inline] + fn inbound(self) -> Self::InboundSubstream { + ok(self) + } + + #[inline] + fn outbound(self) -> Self::OutboundSubstream { + ok(self) + } +} diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 56d9ac85..c7711922 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -53,7 +53,7 @@ pub trait Transport { type RawConn: AsyncRead + AsyncWrite; /// The listener produces incoming connections. - type Listener: Stream; + type Listener: Stream; /// A future which indicates that we are currently dialing to a peer. type Dial: IntoFuture; @@ -104,7 +104,7 @@ pub struct DeniedTransport; impl Transport for DeniedTransport { // TODO: could use `!` for associated types once stable type RawConn = Cursor>; - type Listener = Box>; + type Listener = Box>; type Dial = Box>; #[inline] @@ -165,21 +165,21 @@ pub enum EitherStream { Second(B), } -impl Stream for EitherStream - where A: Stream, - B: Stream +impl Stream for EitherStream + where A: Stream, + B: Stream { - type Item = EitherSocket; + type Item = (EitherSocket, Multiaddr); type Error = IoError; #[inline] fn poll(&mut self) -> Poll, Self::Error> { match self { &mut EitherStream::First(ref mut a) => { - a.poll().map(|i| i.map(|v| v.map(EitherSocket::First))) + a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a)))) } &mut EitherStream::Second(ref mut a) => { - a.poll().map(|i| i.map(|v| v.map(EitherSocket::Second))) + a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a)))) } } } @@ -543,7 +543,7 @@ impl<'a, T, C> UpgradedNode pub fn listen_on( self, addr: Multiaddr, - ) -> Result + 'a>, (Self, Multiaddr)> + ) -> Result + 'a>, (Self, Multiaddr)> where C::NamesIter: Clone, // TODO: not elegant C: Clone { @@ -563,7 +563,7 @@ impl<'a, T, C> UpgradedNode let stream = listening_stream // Try to negotiate the protocol. - .and_then(move |connection| { + .and_then(move |(connection, client_addr)| { let upgrade = upgrade.clone(); #[inline] fn iter_map((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) { @@ -572,12 +572,12 @@ impl<'a, T, C> UpgradedNode let iter = upgrade.protocol_names().map(iter_map); let negotiated = multistream_select::listener_select_proto(connection, iter) .map_err(|err| panic!("{:?}", err)); // TODO: - negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade)) + negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr)) .map_err(|_| panic!()) // TODO: }) - .map_err(|_| panic!()) // TODO: - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id) + .map_err(|err| panic!("{:?}", err)) // TODO: + .and_then(|(upgrade_id, connection, upgrade, client_addr)| { + upgrade.upgrade(connection, upgrade_id).map(|c| (c, client_addr)) }); Ok(Box::new(stream)) @@ -592,7 +592,7 @@ impl Transport for UpgradedNode C: Clone { type RawConn = C::Output; - type Listener = Box>; + type Listener = Box>; type Dial = Box>; #[inline] diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index 15d2e7a0..6f7e819b 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -32,7 +32,7 @@ use tokio_core::reactor::Handle; use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew}; use futures::Future; use futures::stream::Stream; -use multiaddr::{Multiaddr, Protocol}; +use multiaddr::{Multiaddr, Protocol, ToMultiaddr}; use swarm::Transport; /// Represents a TCP/IP transport capability for libp2p. @@ -55,7 +55,7 @@ impl Transport for Tcp { type RawConn = TcpStream; /// The listener produces incoming connections. - type Listener = Box>; + type Listener = Box>; /// A future which indicates currently dialing to a peer. type Dial = TcpStreamNew; @@ -69,7 +69,11 @@ impl Transport for Tcp { TcpListener::bind(&socket_addr, &self.event_loop), ).map(|listener| { // Pull out a stream of sockets for incoming connections - listener.incoming().map(|x| x.0) + listener.incoming().map(|(sock, addr)| { + let addr = addr.to_multiaddr() + .expect("generating a multiaddr from a socket addr never fails"); + (sock, addr) + }) }) .flatten_stream(), )) @@ -188,7 +192,7 @@ mod tests { let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); let tcp = Tcp::new(core.handle()).unwrap(); let handle = core.handle(); - let listener = tcp.listen_on(addr).unwrap().for_each(|sock| { + let listener = tcp.listen_on(addr).unwrap().for_each(|(sock, _)| { // Define what to do with the socket that just connected to us // Which in this case is read 3 bytes let handle_conn = tokio_io::io::read_exact(sock, [0; 3])