diff --git a/Cargo.toml b/Cargo.toml index 96e8a312..507ae53f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,10 +3,10 @@ members = [ "multihash", "multistream-select", "datastore", - "libp2p-host", + "example", "libp2p-peerstore", "libp2p-secio", - "libp2p-transport", + "libp2p-swarm", "libp2p-tcp-transport", "rw-stream-sink", ] diff --git a/README.md b/README.md index 9ae9934b..c7de3c20 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,14 @@ Architecture of the crates of this repository: - `datastore`: Utility library whose API provides a key-value storage with multiple possible backends. Used by `peerstore`. -- `libp2p-host`: Stub. Will probably get reworked or removed. +- `example`: Example usages of this library. - `libp2p-peerstore`: Generic storage for information about remote peers (their multiaddresses and their public key), with multiple possible backends. Each multiaddress also has a time-to-live. -- `libp2p-secio`: Implementation of the `secio` protocol. Encrypts communications. -- `libp2p-tcp-transport`: Implementation of the `Transport` trait for TCP/IP. -- `libp2p-transport`: Contains the `Transport` trait. Will probably get reworked or removed. + Used by `libp2p-swarm`. +- `libp2p-secio`: Implementation of the `secio` protocol. Encrypts communications. Implements the + `ConnectionUpgrade` trait of `libp2p-swarm`. +- `libp2p-swarm`: Core library that contains all the traits of *libp2p* and plugs things together. +- `libp2p-tcp-transport`: Implementation of the `Transport` trait of `libp2p-swarm` for TCP/IP. - `multihash`: Utility library that allows one to represent and manipulate [*multihashes*](https://github.com/multiformats/multihash). A *multihash* is a combination of a hash and its hashing algorithm. diff --git a/example/Cargo.toml b/example/Cargo.toml new file mode 100644 index 00000000..9ac384c6 --- /dev/null +++ b/example/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "example" +version = "0.1.0" +authors = ["pierre "] + +[dependencies] +bytes = "0.4" +futures = "0.1" +libp2p-secio = { path = "../libp2p-secio" } +libp2p-swarm = { path = "../libp2p-swarm" } +libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +ring = { version = "0.12.1", features = ["rsa_signing"] } +tokio-core = "0.1" +tokio-io = "0.1" +untrusted = "0.6" diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs new file mode 100644 index 00000000..afb928bf --- /dev/null +++ b/example/examples/echo-dialer.rs @@ -0,0 +1,105 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate bytes; +extern crate futures; +extern crate libp2p_secio as secio; +extern crate libp2p_swarm as swarm; +extern crate libp2p_tcp_transport as tcp; +extern crate ring; +extern crate tokio_core; +extern crate tokio_io; +extern crate untrusted; + +use bytes::Bytes; +use futures::future::{Future, FutureResult, IntoFuture}; +use futures::{Stream, Sink}; +use ring::signature::RSAKeyPair; +use std::io::Error as IoError; +use std::iter; +use std::sync::Arc; +use swarm::{Transport, ConnectionUpgrade}; +use tcp::Tcp; +use tokio_core::reactor::Core; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; +use untrusted::Input; + +fn main() { + let mut core = Core::new().unwrap(); + let tcp = Tcp::new(core.handle()).unwrap(); + + let with_secio = tcp + .with_upgrade(swarm::PlainText) + .or_upgrade({ + let private_key = { + let pkcs8 = include_bytes!("test-private-key.pk8"); + Arc::new(RSAKeyPair::from_pkcs8(Input::from(&pkcs8[..])).unwrap()) + }; + let public_key = include_bytes!("test-public-key.der").to_vec(); + + secio::SecioConnUpgrade { + local_public_key: public_key, + local_private_key: private_key, + } + }); + + let with_echo = with_secio.with_upgrade(Echo); + + let dialer = with_echo.dial(swarm::multiaddr::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap()) + .map_err(|_| panic!()) + .unwrap() + .and_then(|f| { + f.send("hello world".into()) + }) + .and_then(|f| { + f.into_future() + .map(|(msg, rest)| { + println!("received: {:?}", msg); + rest + }) + .map_err(|(err, _)| err) + }); + + core.run(dialer).unwrap(); +} + +// TODO: copy-pasted from echo-server +#[derive(Debug, Copy, Clone)] +pub struct Echo; +impl ConnectionUpgrade for Echo + where C: AsyncRead + AsyncWrite +{ + type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once(("/echo/1.0.0".into(), ())) + } + + type Output = length_delimited::Framed; + type Future = FutureResult; + + #[inline] + fn upgrade(self, socket: C, _: Self::UpgradeIdentifier) -> Self::Future { + Ok(length_delimited::Framed::new(socket)).into_future() + } +} diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs new file mode 100644 index 00000000..c4f3c336 --- /dev/null +++ b/example/examples/echo-server.rs @@ -0,0 +1,107 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate bytes; +extern crate futures; +extern crate libp2p_secio as secio; +extern crate libp2p_swarm as swarm; +extern crate libp2p_tcp_transport as tcp; +extern crate ring; +extern crate tokio_core; +extern crate tokio_io; +extern crate untrusted; + +use bytes::Bytes; +use futures::future::{Future, FutureResult, IntoFuture, loop_fn, Loop}; +use futures::{Stream, Sink}; +use ring::signature::RSAKeyPair; +use std::io::Error as IoError; +use std::iter; +use std::sync::Arc; +use swarm::{Transport, ConnectionUpgrade}; +use tcp::Tcp; +use tokio_core::reactor::Core; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; +use untrusted::Input; + +fn main() { + let mut core = Core::new().unwrap(); + let tcp = Tcp::new(core.handle()).unwrap(); + + let with_secio = tcp + .with_upgrade(swarm::PlainText) + .or_upgrade({ + let private_key = { + let pkcs8 = include_bytes!("test-private-key.pk8"); + Arc::new(RSAKeyPair::from_pkcs8(Input::from(&pkcs8[..])).unwrap()) + }; + let public_key = include_bytes!("test-public-key.der").to_vec(); + + secio::SecioConnUpgrade { + local_public_key: public_key, + local_private_key: private_key, + } + }); + + let with_echo = with_secio.with_upgrade(Echo); + + let future = with_echo.listen_on(swarm::multiaddr::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap()) + .map_err(|_| panic!()) + .unwrap() + .for_each(|socket| { + loop_fn(socket, |socket| { + socket.into_future() + .map_err(|(err, _)| err) + .and_then(|(msg, rest)| { + if let Some(msg) = msg { + Box::new(rest.send(msg).map(|m| Loop::Continue(m))) as Box> + } else { + Box::new(Ok(Loop::Break(())).into_future()) as Box> + } + }) + }) + }); + + core.run(future).unwrap(); +} + +// TODO: copy-pasted from echo-dialer +#[derive(Debug, Copy, Clone)] +pub struct Echo; +impl ConnectionUpgrade for Echo + where C: AsyncRead + AsyncWrite +{ + type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once(("/echo/1.0.0".into(), ())) + } + + type Output = length_delimited::Framed; + type Future = FutureResult; + + #[inline] + fn upgrade(self, socket: C, _: Self::UpgradeIdentifier) -> Self::Future { + Ok(length_delimited::Framed::new(socket)).into_future() + } +} diff --git a/example/examples/test-private-key.pk8 b/example/examples/test-private-key.pk8 new file mode 100644 index 00000000..452b7af1 Binary files /dev/null and b/example/examples/test-private-key.pk8 differ diff --git a/example/examples/test-public-key.der b/example/examples/test-public-key.der new file mode 100644 index 00000000..9e62c93e Binary files /dev/null and b/example/examples/test-public-key.der differ diff --git a/example/examples/test-public-key.pem b/example/examples/test-public-key.pem new file mode 100644 index 00000000..072f84c3 --- /dev/null +++ b/example/examples/test-public-key.pem @@ -0,0 +1 @@ +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAw3Dq8sdKZ/Lq0AkCFRB+ywQXpubvHgscR+WyVV4tdQE+0OcJSC5hx5W+XLR/y21PTe/30f0oYP7oJv8rH2Mov1Gvops2l6efVqPA8ZggDRrAkotjLXXJggDimIGichRS9+izNi/Lit77H2bFLmlkTfrFOjibWrPP+XvoYRFN3B1gyUT5P1hARePlbb86dcd1e5l/x/lBDH7DJ+TxsY7li6HjgvlxK4jAXa9yzdkDvJOpScs+la7gGawwesDKoQ5dWyqlgT93cbXhwOHTUvownl0hwtYjiK9UGWW8ptn9/3ehYAyi6Kx/SqLJsXiJFlPg16KNunGBHL7VAFyYZ51NEwIDAQAB \ No newline at end of file diff --git a/example/src/lib.rs b/example/src/lib.rs new file mode 100644 index 00000000..e69de29b diff --git a/libp2p-host/src/lib.rs b/libp2p-host/src/lib.rs deleted file mode 100644 index 652a0697..00000000 --- a/libp2p-host/src/lib.rs +++ /dev/null @@ -1,96 +0,0 @@ -//! "Host" abstraction for transports, listener addresses, peer store. - -extern crate futures; -extern crate libp2p_transport as transport; - -use futures::{Future, IntoFuture}; -use transport::{ProtocolId, Socket}; -use transport::multiaddr::Multiaddr; - -/// Produces a future for each incoming `Socket`. -pub trait Handler { - type Future: IntoFuture; - - /// Handle the incoming socket, producing a future which should resolve - /// when the handler is finished. - fn handle(&self, socket: S) -> Self::Future; - fn boxed(self) -> BoxHandler - where - Self: Sized + Send + 'static, - ::Future: Send + 'static, - { - BoxHandler(Box::new(move |socket| { - Box::new(self.handle(socket).into_future()) as _ - })) - } -} - -impl Handler for F -where - F: Fn(S) -> U, - U: IntoFuture, -{ - type Future = U; - - fn handle(&self, socket: S) -> U { - (self)(socket) - } -} - -/// A boxed handler. -pub struct BoxHandler(Box>>>); - -impl Handler for BoxHandler { - type Future = Box>; - - fn handle(&self, socket: S) -> Self::Future { - self.0.handle(socket) - } -} - -/// Multiplexes sockets onto handlers by protocol-id. -pub trait Mux: Sync { - /// The socket type this manages. - type Socket: Socket; - - /// Attach an incoming socket. - fn push(&self, socket: Self::Socket); - - /// Set the socket handler for a given protocol id. - fn set_handler(&self, proto: ProtocolId, handler: BoxHandler); - - /// Remove the socket handler for the given protocol id, returning the old handler if it existed. - fn remove_handler(&self, proto: &ProtocolId) -> Option>; -} - -/// Unimplemented. Maps peer IDs to connected addresses, protocols, and data. -pub trait PeerStore {} - -/// This is a common abstraction over the low-level bits of libp2p. -/// -/// It handles connecting over, adding and removing transports, -/// wraps an arbitrary event loop, and manages protocol IDs. -pub trait Host { - type Socket: Socket; - type Mux: Mux; - type Multiaddrs: IntoIterator; - - /// Get a handle to the peer store. - fn peer_store(&self) -> &PeerStore; - - /// Get a handle to the underlying muxer. - fn mux(&self) -> &Self::Mux; - - /// Set the socket handler for a given protocol id. - fn set_handler(&self, proto: ProtocolId, handler: BoxHandler) { - self.mux().set_handler(proto, handler); - } - - /// Remove the socket handler for the given protocol id, returning the old handler if it existed. - fn remove_handler(&self, proto: &ProtocolId) -> Option> { - self.mux().remove_handler(proto) - } - - /// Addresses we're listening on. - fn listen_addrs(&self) -> Self::Multiaddrs; -} diff --git a/libp2p-secio/Cargo.toml b/libp2p-secio/Cargo.toml index 223ad513..21c85713 100644 --- a/libp2p-secio/Cargo.toml +++ b/libp2p-secio/Cargo.toml @@ -6,10 +6,12 @@ authors = ["Parity Technologies "] [dependencies] bytes = "0.4" futures = "0.1" +libp2p-swarm = { path = "../libp2p-swarm" } protobuf = "1.4.2" rand = "0.3.17" ring = { version = "0.12.1", features = ["rsa_signing"] } rust-crypto = "^0.2" +rw-stream-sink = { path = "../rw-stream-sink" } tokio-core = "0.1.6" tokio-io = "0.1.0" untrusted = "0.6.0" diff --git a/libp2p-secio/src/lib.rs b/libp2p-secio/src/lib.rs index 9ba1d9bf..94fcf2a2 100644 --- a/libp2p-secio/src/lib.rs +++ b/libp2p-secio/src/lib.rs @@ -27,23 +27,32 @@ //! method will perform a handshake with the host, and return a future that corresponds to the //! moment when the handshake succeeds or errored. On success, the future produces a //! `SecioMiddleware` that implements `Sink` and `Stream` and can be used to send packets of data. +//! +//! However for integration with the rest of `libp2p` you are encouraged to use the +//! `SecioConnUpgrade` struct instead. This struct implements the `ConnectionUpgrade` trait and +//! will automatically apply secio on any incoming or outgoing connection. extern crate bytes; extern crate crypto; extern crate futures; +extern crate libp2p_swarm; extern crate protobuf; extern crate rand; extern crate ring; +extern crate rw_stream_sink; extern crate tokio_core; extern crate tokio_io; extern crate untrusted; pub use self::error::SecioError; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures::{Future, Poll, StartSend, Sink, Stream}; +use futures::stream::MapErr as StreamMapErr; use ring::signature::RSAKeyPair; -use std::io::Error as IoError; +use rw_stream_sink::RwStreamSink; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::iter; use std::sync::Arc; use tokio_io::{AsyncRead, AsyncWrite}; @@ -54,6 +63,55 @@ mod keys_proto; mod handshake; mod structs_proto; +/// Implementation of the `ConnectionUpgrade` trait of `libp2p_swarm`. Automatically applies any +/// secio on any connection. +#[derive(Clone)] +pub struct SecioConnUpgrade { + /// Public key of the local node. Must match `local_private_key` or an error will happen during + /// the handshake. + pub local_public_key: Vec, + /// Private key that will be used to prove the identity of the local node. + pub local_private_key: Arc, +} + +impl libp2p_swarm::ConnectionUpgrade for SecioConnUpgrade + where S: AsyncRead + AsyncWrite + 'static +{ + type Output = RwStreamSink< + StreamMapErr< + SecioMiddleware, + fn(SecioError) -> IoError, + >, + >; + type Future = Box>; + type NamesIter = iter::Once<(Bytes, ())>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once(("/secio/1.0.0".into(), ())) + } + + #[inline] + fn upgrade(self, incoming: S, _: ()) -> Self::Future { + let fut = SecioMiddleware::handshake( + incoming, + self.local_public_key.clone(), + self.local_private_key.clone(), + ); + let wrapped = fut.map(|stream_sink| { + let mapped = stream_sink.map_err(map_err as fn(_) -> _); + RwStreamSink::new(mapped) + }).map_err(map_err); + Box::new(wrapped) + } +} + +#[inline] +fn map_err(err: SecioError) -> IoError { + IoError::new(IoErrorKind::InvalidData, err) +} + /// Wraps around an object that implements `AsyncRead` and `AsyncWrite`. /// /// Implements `Sink` and `Stream` whose items are frames of data. Each frame is encoded diff --git a/libp2p-host/Cargo.toml b/libp2p-swarm/Cargo.toml similarity index 57% rename from libp2p-host/Cargo.toml rename to libp2p-swarm/Cargo.toml index 41348a5c..39615d84 100644 --- a/libp2p-host/Cargo.toml +++ b/libp2p-swarm/Cargo.toml @@ -1,10 +1,12 @@ [package] -name = "libp2p-host" +name = "libp2p-swarm" version = "0.1.0" authors = ["Parity Technologies "] [dependencies] +bytes = "0.4" multiaddr = "0.2.0" +multistream-select = { path = "../multistream-select" } futures = "0.1" +tokio-core = "0.1" tokio-io = "0.1" -libp2p-transport = { path = "../libp2p-transport" } \ No newline at end of file diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs new file mode 100644 index 00000000..3602c643 --- /dev/null +++ b/libp2p-swarm/src/lib.rs @@ -0,0 +1,34 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Transport and I/O primitives for libp2p. + +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate multistream_select; +extern crate tokio_io; + +/// Multi-address re-export. +pub extern crate multiaddr; + +pub mod transport; + +pub use self::transport::{ConnectionUpgrade, PlainText, Transport, UpgradedNode, OrUpgrade}; diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs new file mode 100644 index 00000000..56d9ac85 --- /dev/null +++ b/libp2p-swarm/src/transport.rs @@ -0,0 +1,611 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Handles entering a connection with a peer. +//! +//! The two main elements of this module are the `Transport` and `ConnectionUpgrade` traits. +//! `Transport` is implemented on objects that allow dialing and listening. `ConnectionUpgrade` is +//! implemented on objects that make it possible to upgrade a connection (for example by adding an +//! encryption middleware to the connection). +//! +//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and +//! `UpgradeNode::or_upgrade` methods, you can combine multiple transports and/or upgrades together +//! in a complex chain of protocols negotiation. + +use bytes::Bytes; +use futures::{Stream, Poll, Async}; +use futures::future::{IntoFuture, Future, ok as future_ok, FutureResult}; +use multiaddr::Multiaddr; +use multistream_select; +use std::io::{Cursor, Error as IoError, Read, Write}; +use std::iter; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// A transport is an object that can be used to produce connections by listening or dialing a +/// peer. +/// +/// This trait is implemented on concrete transports (eg. TCP, UDP, etc.), but also on wrappers +/// around them. +/// +/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other +/// > words, listening or dialing consumes the transport object. This has been designed +/// > so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly +/// > on `Foo`. +pub trait Transport { + /// The raw connection to a peer. + type RawConn: AsyncRead + AsyncWrite; + + /// The listener produces incoming connections. + type Listener: Stream; + + /// A future which indicates that we are currently dialing to a peer. + type Dial: IntoFuture; + + /// Listen on the given multi-addr. + /// + /// Returns the address back if it isn't supported. + fn listen_on(self, addr: Multiaddr) -> Result + where Self: Sized; + + /// Dial to the given multi-addr. + /// + /// Returns either a future which may resolve to a connection, or gives back the multiaddress. + fn dial(self, addr: Multiaddr) -> Result where Self: Sized; + + /// Builds a new struct that implements `Transport` that contains both `self` and `other`. + /// + /// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial` + /// return an error then `other` will be tried. + #[inline] + fn or_transport(self, other: T) -> OrTransport + where Self: Sized + { + OrTransport(self, other) + } + + /// Wraps this transport inside an upgrade. Whenever a connection that uses this transport + /// is established, it is wrapped inside the upgrade. + /// + /// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio* + /// > (communication encryption), *multiplex*, but also a protocol handler. + #[inline] + fn with_upgrade(self, upgrade: U) -> UpgradedNode + where Self: Sized, + U: ConnectionUpgrade + { + UpgradedNode { + transports: self, + upgrade: upgrade, + } + } +} + +/// Dummy implementation of `Transport` that just denies every single attempt. +#[derive(Debug, Copy, Clone)] +pub struct DeniedTransport; + +impl Transport for DeniedTransport { + // TODO: could use `!` for associated types once stable + type RawConn = Cursor>; + type Listener = Box>; + type Dial = Box>; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result { + Err((DeniedTransport, addr)) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result { + Err((DeniedTransport, addr)) + } +} + +/// Struct returned by `or_transport()`. +#[derive(Debug, Copy, Clone)] +pub struct OrTransport(A, B); + +impl Transport for OrTransport + where A: Transport, + B: Transport +{ + type RawConn = EitherSocket; + type Listener = EitherStream; + type Dial = EitherTransportFuture< + ::Future, + ::Future, + >; + + fn listen_on(self, addr: Multiaddr) -> Result { + let (first, addr) = match self.0.listen_on(addr) { + Ok(connec) => return Ok(EitherStream::First(connec)), + Err(err) => err, + }; + + match self.1.listen_on(addr) { + Ok(connec) => Ok(EitherStream::Second(connec)), + Err((second, addr)) => Err((OrTransport(first, second), addr)), + } + } + + fn dial(self, addr: Multiaddr) -> Result { + let (first, addr) = match self.0.dial(addr) { + Ok(connec) => return Ok(EitherTransportFuture::First(connec.into_future())), + Err(err) => err, + }; + + match self.1.dial(addr) { + Ok(connec) => Ok(EitherTransportFuture::Second(connec.into_future())), + Err((second, addr)) => Err((OrTransport(first, second), addr)), + } + } +} + +/// Implements `Stream` and dispatches all method calls to either `First` or `Second`. +#[derive(Debug, Copy, Clone)] +pub enum EitherStream { + First(A), + Second(B), +} + +impl Stream for EitherStream + where A: Stream, + B: Stream +{ + type Item = EitherSocket; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + match self { + &mut EitherStream::First(ref mut a) => { + a.poll().map(|i| i.map(|v| v.map(EitherSocket::First))) + } + &mut EitherStream::Second(ref mut a) => { + a.poll().map(|i| i.map(|v| v.map(EitherSocket::Second))) + } + } + } +} + +/// Implements `Future` and redirects calls to either `First` or `Second`. +/// +/// Additionally, the output will be wrapped inside a `EitherSocket`. +/// +/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be +/// > removed eventually. +#[derive(Debug, Copy, Clone)] +pub enum EitherTransportFuture { + First(A), + Second(B), +} + +impl Future for EitherTransportFuture + where A: Future, + B: Future +{ + type Item = EitherSocket; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self { + &mut EitherTransportFuture::First(ref mut a) => { + let item = try_ready!(a.poll()); + Ok(Async::Ready(EitherSocket::First(item))) + } + &mut EitherTransportFuture::Second(ref mut b) => { + let item = try_ready!(b.poll()); + Ok(Async::Ready(EitherSocket::Second(item))) + } + } + } +} + +/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to either `First` or +/// `Second`. +#[derive(Debug, Copy, Clone)] +pub enum EitherSocket { + First(A), + Second(B), +} + +impl AsyncRead for EitherSocket + where A: AsyncRead, + B: AsyncRead +{ + #[inline] + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + match self { + &EitherSocket::First(ref a) => a.prepare_uninitialized_buffer(buf), + &EitherSocket::Second(ref b) => b.prepare_uninitialized_buffer(buf), + } + } +} + +impl Read for EitherSocket + where A: Read, + B: Read +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + match self { + &mut EitherSocket::First(ref mut a) => a.read(buf), + &mut EitherSocket::Second(ref mut b) => b.read(buf), + } + } +} + +impl AsyncWrite for EitherSocket + where A: AsyncWrite, + B: AsyncWrite +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + match self { + &mut EitherSocket::First(ref mut a) => a.shutdown(), + &mut EitherSocket::Second(ref mut b) => b.shutdown(), + } + } +} + +impl Write for EitherSocket + where A: Write, + B: Write +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> Result { + match self { + &mut EitherSocket::First(ref mut a) => a.write(buf), + &mut EitherSocket::Second(ref mut b) => b.write(buf), + } + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + match self { + &mut EitherSocket::First(ref mut a) => a.flush(), + &mut EitherSocket::Second(ref mut b) => b.flush(), + } + } +} + +/// Implemented on structs that describe a possible upgrade to a connection between two peers. +/// +/// The generic `C` is the type of the incoming connection before it is upgraded. +/// +/// > **Note**: The `upgrade` method of this trait uses `self` and not `&self` or `&mut self`. +/// > This has been designed so that you would implement this trait on `&Foo` or +/// > `&mut Foo` instead of directly on `Foo`. +pub trait ConnectionUpgrade { + /// Iterator returned by `protocol_names`. + type NamesIter: Iterator; + /// Type that serves as an identifier for the protocol. This type only exists to be returned + /// by the `NamesIter` and then be passed to `upgrade`. + /// + /// This is only useful on implementations that dispatch between multiple possible upgrades. + /// Any basic implementation will probably just use the `()` type. + type UpgradeIdentifier; + + /// Returns the name of the protocols to advertise to the remote. + fn protocol_names(&self) -> Self::NamesIter; + + /// Type of the stream that has been upgraded. Generally wraps around `C` and `Self`. + /// + /// > **Note**: For upgrades that add an intermediary layer (such as `secio` or `multiplex`), + /// > this associated type must implement `AsyncRead + AsyncWrite`. + type Output; + /// Type of the future that will resolve to `Self::Output`. + type Future: Future; + + /// This method is called after protocol negotiation has been performed. + /// + /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), + /// this function returns a future instead of the direct output. + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier) -> Self::Future; +} + +/// See `or_upgrade()`. +#[derive(Debug, Copy, Clone)] +pub struct OrUpgrade(A, B); + +impl ConnectionUpgrade for OrUpgrade + where C: AsyncRead + AsyncWrite, + A: ConnectionUpgrade, + B: ConnectionUpgrade +{ + type NamesIter = NamesIterChain; + type UpgradeIdentifier = EitherUpgradeIdentifier; + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + NamesIterChain { + first: self.0.protocol_names(), + second: self.1.protocol_names(), + } + } + + type Output = EitherSocket; + type Future = EitherConnUpgrFuture; + + #[inline] + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier) -> Self::Future { + match id { + EitherUpgradeIdentifier::First(id) => { + EitherConnUpgrFuture::First(self.0.upgrade(socket, id)) + } + EitherUpgradeIdentifier::Second(id) => { + EitherConnUpgrFuture::Second(self.1.upgrade(socket, id)) + } + } + } +} + +/// Internal struct used by the `OrUpgrade` trait. +#[derive(Debug, Copy, Clone)] +pub enum EitherUpgradeIdentifier { + First(A), + Second(B), +} + +/// Implements `Future` and redirects calls to either `First` or `Second`. +/// +/// Additionally, the output will be wrapped inside a `EitherSocket`. +/// +/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be +/// > removed eventually. +#[derive(Debug, Copy, Clone)] +pub enum EitherConnUpgrFuture { + First(A), + Second(B), +} + +impl Future for EitherConnUpgrFuture + where A: Future, + B: Future +{ + type Item = EitherSocket; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self { + &mut EitherConnUpgrFuture::First(ref mut a) => { + let item = try_ready!(a.poll()); + Ok(Async::Ready(EitherSocket::First(item))) + } + &mut EitherConnUpgrFuture::Second(ref mut b) => { + let item = try_ready!(b.poll()); + Ok(Async::Ready(EitherSocket::Second(item))) + } + } + } +} + +/// Internal type used by the `OrUpgrade` struct. +/// +/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be +/// > removed eventually. +#[derive(Debug, Copy, Clone)] +pub struct NamesIterChain { + first: A, + second: B, +} + +impl Iterator for NamesIterChain + where A: Iterator, + B: Iterator +{ + type Item = (Bytes, EitherUpgradeIdentifier); + + #[inline] + fn next(&mut self) -> Option { + if let Some((name, id)) = self.first.next() { + return Some((name, EitherUpgradeIdentifier::First(id))); + } + if let Some((name, id)) = self.second.next() { + return Some((name, EitherUpgradeIdentifier::Second(id))); + } + None + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + let (min1, max1) = self.first.size_hint(); + let (min2, max2) = self.second.size_hint(); + let max = match (max1, max2) { + (Some(max1), Some(max2)) => max1.checked_add(max2), + _ => None, + }; + (min1.saturating_add(min2), max) + } +} + +/// Implementation of the `ConnectionUpgrade` that negotiates the `/plaintext/1.0.0` protocol and +/// simply passes communications through without doing anything more. +/// +/// > **Note**: Generally used as an alternative to `secio` if a security layer is not desirable. +// TODO: move `PlainText` to a separate crate? +#[derive(Debug, Copy, Clone)] +pub struct PlainText; + +impl ConnectionUpgrade for PlainText + where C: AsyncRead + AsyncWrite +{ + type Output = C; + type Future = FutureResult; + type UpgradeIdentifier = (); + type NamesIter = iter::Once<(Bytes, ())>; + + #[inline] + fn upgrade(self, i: C, _: ()) -> Self::Future { + future_ok(i) + } + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once((Bytes::from("/plaintext/1.0.0"), ())) + } +} + +/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received +/// connection. +/// +/// See the `Transport::with_upgrade` method. +#[derive(Debug, Clone)] +pub struct UpgradedNode { + transports: T, + upgrade: C, +} + +impl<'a, T, C> UpgradedNode + where T: Transport + 'a, + C: ConnectionUpgrade + 'a +{ + /// Builds a new struct that implements `ConnectionUpgrade` that contains both `self` and + /// `other_upg`. + /// + /// The returned object will try to negotiate either the protocols of `self` or the protocols + /// of `other_upg`, then upgrade the connection to the negogiated protocol. + #[inline] + pub fn or_upgrade(self, other_upg: D) -> UpgradedNode> + where D: ConnectionUpgrade + 'a + { + UpgradedNode { + transports: self.transports, + upgrade: OrUpgrade(self.upgrade, other_upg), + } + } + + /// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade + /// the connection. + /// + /// Note that this does the same as `Transport::dial`, but with less restrictions on the trait + /// requirements. + #[inline] + pub fn dial( + self, + addr: Multiaddr, + ) -> Result + 'a>, (Self, Multiaddr)> { + let upgrade = self.upgrade; + + let dialed_fut = match self.transports.dial(addr) { + Ok(f) => f.into_future(), + Err((trans, addr)) => { + let builder = UpgradedNode { + transports: trans, + upgrade: upgrade, + }; + + return Err((builder, addr)); + } + }; + + let future = dialed_fut + // Try to negotiate the protocol. + .and_then(move |connection| { + let iter = upgrade.protocol_names() + .map(|(name, id)| (name, ::eq, id)); + let negotiated = multistream_select::dialer_select_proto(connection, iter) + .map_err(|err| panic!("{:?}", err)); // TODO: + negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade)) + }) + .and_then(|(upgrade_id, connection, upgrade)| { + upgrade.upgrade(connection, upgrade_id) + }); + + Ok(Box::new(future)) + } + + /// Start listening on the multiaddr using the transport that was passed to `new`. + /// Then whenever a connection is opened, it is upgraded. + /// + /// Note that this does the same as `Transport::listen_on`, but with less restrictions on the + /// trait requirements. + #[inline] + pub fn listen_on( + self, + addr: Multiaddr, + ) -> Result + 'a>, (Self, Multiaddr)> + where C::NamesIter: Clone, // TODO: not elegant + C: Clone + { + let upgrade = self.upgrade; + + let listening_stream = match self.transports.listen_on(addr) { + Ok(l) => l, + Err((trans, addr)) => { + let builder = UpgradedNode { + transports: trans, + upgrade: upgrade, + }; + + return Err((builder, addr)); + } + }; + + let stream = listening_stream + // Try to negotiate the protocol. + .and_then(move |connection| { + let upgrade = upgrade.clone(); + #[inline] + fn iter_map((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) { + (n, ::eq, t) + } + let iter = upgrade.protocol_names().map(iter_map); + let negotiated = multistream_select::listener_select_proto(connection, iter) + .map_err(|err| panic!("{:?}", err)); // TODO: + negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade)) + .map_err(|_| panic!()) // TODO: + }) + .map_err(|_| panic!()) // TODO: + .and_then(|(upgrade_id, connection, upgrade)| { + upgrade.upgrade(connection, upgrade_id) + }); + + Ok(Box::new(stream)) + } +} + +impl Transport for UpgradedNode + where T: Transport + 'static, + C: ConnectionUpgrade + 'static, + C::Output: AsyncRead + AsyncWrite, + C::NamesIter: Clone, // TODO: not elegant + C: Clone +{ + type RawConn = C::Output; + type Listener = Box>; + type Dial = Box>; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result + where Self: Sized + { + self.listen_on(addr) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result + where Self: Sized + { + self.dial(addr) + } +} diff --git a/libp2p-tcp-transport/Cargo.toml b/libp2p-tcp-transport/Cargo.toml index 0a198a0c..5a0f50b0 100644 --- a/libp2p-tcp-transport/Cargo.toml +++ b/libp2p-tcp-transport/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" authors = ["Parity Technologies "] [dependencies] -libp2p-transport = { path = "../libp2p-transport" } +libp2p-swarm = { path = "../libp2p-swarm" } futures = "0.1" multiaddr = "0.2.0" tokio-core = "0.1" diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index 33e8870f..15d2e7a0 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -1,4 +1,26 @@ -extern crate libp2p_transport as transport; +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the libp2p `Transport` trait for TCP/IP. + +extern crate libp2p_swarm as swarm; extern crate tokio_core; extern crate tokio_io; extern crate multiaddr; @@ -6,20 +28,25 @@ extern crate futures; use std::io::Error as IoError; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use tokio_core::reactor::Core; +use tokio_core::reactor::Handle; use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew}; use futures::Future; use futures::stream::Stream; use multiaddr::{Multiaddr, Protocol}; -use transport::Transport; +use swarm::Transport; +/// Represents a TCP/IP transport capability for libp2p. +/// +/// Each `Tcp` struct is tied to a tokio reactor. The TCP sockets created by libp2p will need to +/// be progressed by running the futures and streams obtained by libp2p through the tokio reactor. +#[derive(Debug, Clone)] pub struct Tcp { - pub event_loop: Core, + event_loop: Handle, } impl Tcp { - pub fn new() -> Result { - Ok(Tcp { event_loop: Core::new()? }) + pub fn new(handle: Handle) -> Result { + Ok(Tcp { event_loop: handle }) } } @@ -35,11 +62,11 @@ impl Transport for Tcp { /// Listen on the given multi-addr. /// Returns the address back if it isn't supported. - fn listen_on(&mut self, addr: Multiaddr) -> Result { + fn listen_on(self, addr: Multiaddr) -> Result { if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { Ok(Box::new( futures::future::result( - TcpListener::bind(&socket_addr, &self.event_loop.handle()), + TcpListener::bind(&socket_addr, &self.event_loop), ).map(|listener| { // Pull out a stream of sockets for incoming connections listener.incoming().map(|x| x.0) @@ -47,18 +74,18 @@ impl Transport for Tcp { .flatten_stream(), )) } else { - Err(addr) + Err((self, addr)) } } /// Dial to the given multi-addr. /// Returns either a future which may resolve to a connection, /// or gives back the multiaddress. - fn dial(&mut self, addr: Multiaddr) -> Result { + fn dial(self, addr: Multiaddr) -> Result { if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { - Ok(TcpStream::connect(&socket_addr, &self.event_loop.handle())) + Ok(TcpStream::connect(&socket_addr, &self.event_loop)) } else { - Err(addr) + Err((self, addr)) } } } @@ -98,11 +125,12 @@ mod tests { use super::{Tcp, multiaddr_to_socketaddr}; use std; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use tokio_core::reactor::Core; use tokio_io; use futures::Future; use futures::stream::Stream; use multiaddr::Multiaddr; - use transport::Transport; + use swarm::Transport; #[test] fn multiaddr_to_tcp_conversion() { @@ -156,9 +184,10 @@ mod tests { use std::io::Write; std::thread::spawn(move || { + let mut core = Core::new().unwrap(); let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); - let mut tcp = Tcp::new().unwrap(); - let handle = tcp.event_loop.handle(); + let tcp = Tcp::new(core.handle()).unwrap(); + let handle = core.handle(); let listener = tcp.listen_on(addr).unwrap().for_each(|sock| { // Define what to do with the socket that just connected to us // Which in this case is read 3 bytes @@ -172,11 +201,12 @@ mod tests { Ok(()) }); - tcp.event_loop.run(listener).unwrap(); + core.run(listener).unwrap(); }); std::thread::sleep(std::time::Duration::from_millis(100)); let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); - let mut tcp = Tcp::new().unwrap(); + let mut core = Core::new().unwrap(); + let tcp = Tcp::new(core.handle()).unwrap(); // Obtain a future socket through dialing let socket = tcp.dial(addr.clone()).unwrap(); // Define what to do with the socket once it's obtained @@ -188,7 +218,7 @@ mod tests { Err(x) => Err(x), }); // Execute the future in our event loop - tcp.event_loop.run(action).unwrap(); + core.run(action).unwrap(); std::thread::sleep(std::time::Duration::from_millis(100)); } } diff --git a/libp2p-transport/Cargo.toml b/libp2p-transport/Cargo.toml deleted file mode 100644 index 4e195088..00000000 --- a/libp2p-transport/Cargo.toml +++ /dev/null @@ -1,9 +0,0 @@ -[package] -name = "libp2p-transport" -version = "0.1.0" -authors = ["Parity Technologies "] - -[dependencies] -multiaddr = "0.2.0" -futures = "0.1" -tokio-io = "0.1" diff --git a/libp2p-transport/src/lib.rs b/libp2p-transport/src/lib.rs deleted file mode 100644 index 58dfe51f..00000000 --- a/libp2p-transport/src/lib.rs +++ /dev/null @@ -1,62 +0,0 @@ -//! Transport and I/O primitives for libp2p. - -extern crate futures; -extern crate tokio_io; - -/// Multi-address re-export. -pub extern crate multiaddr; - -use multiaddr::Multiaddr; -use futures::{IntoFuture, Future}; -use futures::stream::Stream; -use std::io::Error as IoError; -use tokio_io::{AsyncRead, AsyncWrite}; - -// Something more strongly-typed? -pub type ProtocolId = String; -pub type PeerId = String; - -/// A logical wire between us and a peer. We can read and write through this asynchronously. -/// -/// You can have multiple `Socket`s between you and any given peer. -pub trait Socket: AsyncRead + AsyncWrite + Sized { - type Conn: Conn; - - /// Get the protocol ID this socket uses. - fn protocol_id(&self) -> ProtocolId; - - /// Access the underlying connection. - fn conn(&self) -> &Self::Conn; -} - -/// A connection between you and a peer. -pub trait Conn { - /// The socket type this connection manages. - type Socket; - type SocketFuture: IntoFuture; - - /// Initiate a socket between you and the peer on the given protocol. - fn make_socket(&self, proto: ProtocolId) -> Self::SocketFuture; -} - -/// A transport is a stream producing incoming connections. -/// These are transports or wrappers around them. -pub trait Transport { - /// The raw connection. - type RawConn: AsyncRead + AsyncWrite; - - /// The listener produces incoming connections. - type Listener: Stream; - - /// A future which indicates currently dialing to a peer. - type Dial: IntoFuture; - - /// Listen on the given multi-addr. - /// Returns the address back if it isn't supported. - fn listen_on(&mut self, addr: Multiaddr) -> Result; - - /// Dial to the given multi-addr. - /// Returns either a future which may resolve to a connection, - /// or gives back the multiaddress. - fn dial(&mut self, addr: Multiaddr) -> Result; -}