diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index 8cbbd0f3..fb4e8854 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -32,7 +32,8 @@ extern crate tokio_io; use futures::{Future, Sink, Stream}; use futures::sync::oneshot; use std::env; -use swarm::{DeniedConnectionUpgrade, SimpleProtocol, Transport, UpgradeExt}; +use swarm::Transport; +use swarm::upgrade::{self, DeniedConnectionUpgrade, SimpleProtocol, UpgradeExt}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::AsyncRead; @@ -62,7 +63,7 @@ fn main() { // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. .with_upgrade({ - let plain_text = swarm::PlainTextConfig; + let plain_text = upgrade::PlainTextConfig; let secio = { let private_key = include_bytes!("test-private-key.pk8"); diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index aa0fca83..4c11f66f 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -32,7 +32,8 @@ extern crate tokio_io; use futures::future::{loop_fn, Future, IntoFuture, Loop}; use futures::{Sink, Stream}; use std::env; -use swarm::{SimpleProtocol, Transport, UpgradeExt}; +use swarm::Transport; +use swarm::upgrade::{self, SimpleProtocol, UpgradeExt}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::AsyncRead; @@ -61,7 +62,7 @@ fn main() { // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. .with_upgrade({ - let plain_text = swarm::PlainTextConfig; + let plain_text = upgrade::PlainTextConfig; let secio = { let private_key = include_bytes!("test-private-key.pk8"); diff --git a/example/examples/floodsub.rs b/example/examples/floodsub.rs index 3da19aa2..54edd4fc 100644 --- a/example/examples/floodsub.rs +++ b/example/examples/floodsub.rs @@ -37,7 +37,8 @@ use futures::future::Future; use futures::Stream; use peerstore::PeerId; use std::{env, mem}; -use swarm::{Multiaddr, Transport, UpgradeExt}; +use swarm::{Multiaddr, Transport}; +use swarm::upgrade::{self, UpgradeExt}; use tcp::TcpConfig; use tokio_core::reactor::Core; use websocket::WsConfig; @@ -64,7 +65,7 @@ fn main() { // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. .with_upgrade({ - let plain_text = swarm::PlainTextConfig; + let plain_text = upgrade::PlainTextConfig; let secio = { let private_key = include_bytes!("test-private-key.pk8"); diff --git a/example/examples/kademlia.rs b/example/examples/kademlia.rs index cfb3b763..6ed14742 100644 --- a/example/examples/kademlia.rs +++ b/example/examples/kademlia.rs @@ -39,7 +39,8 @@ use peerstore::PeerId; use std::env; use std::sync::Arc; use std::time::Duration; -use swarm::{Transport, UpgradeExt}; +use swarm::Transport; +use swarm::upgrade::{self, UpgradeExt}; use tcp::TcpConfig; use tokio_core::reactor::Core; @@ -69,7 +70,7 @@ fn main() { // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. .with_upgrade({ - let plain_text = swarm::PlainTextConfig; + let plain_text = upgrade::PlainTextConfig; let secio = { let private_key = include_bytes!("test-private-key.pk8"); diff --git a/example/examples/ping-client.rs b/example/examples/ping-client.rs index 2a721408..5e7a5abc 100644 --- a/example/examples/ping-client.rs +++ b/example/examples/ping-client.rs @@ -32,7 +32,8 @@ extern crate tokio_io; use futures::Future; use futures::sync::oneshot; use std::env; -use swarm::{DeniedConnectionUpgrade, Transport, UpgradeExt}; +use swarm::Transport; +use swarm::upgrade::{self, DeniedConnectionUpgrade, UpgradeExt}; use tcp::TcpConfig; use tokio_core::reactor::Core; @@ -54,7 +55,7 @@ fn main() { // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. .with_upgrade({ - let plain_text = swarm::PlainTextConfig; + let plain_text = upgrade::PlainTextConfig; let secio = { let private_key = include_bytes!("test-private-key.pk8"); diff --git a/ping/src/lib.rs b/ping/src/lib.rs index daea084e..4b6daf5b 100644 --- a/ping/src/lib.rs +++ b/ping/src/lib.rs @@ -92,8 +92,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use futures::{Future, Sink, Stream}; use futures::future::{loop_fn, FutureResult, IntoFuture, Loop}; use futures::sync::{mpsc, oneshot}; -use libp2p_swarm::Multiaddr; -use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint}; +use libp2p_swarm::{ConnectionUpgrade, Endpoint, Multiaddr}; use log::Level; use parking_lot::Mutex; use rand::Rand; @@ -310,7 +309,7 @@ mod tests { use futures::future::join_all; use futures::Future; use futures::Stream; - use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint}; + use libp2p_swarm::{ConnectionUpgrade, Endpoint}; #[test] fn ping_pong() { diff --git a/swarm/src/connection_reuse.rs b/swarm/src/connection_reuse.rs index b3e822ee..a771257e 100644 --- a/swarm/src/connection_reuse.rs +++ b/swarm/src/connection_reuse.rs @@ -50,7 +50,8 @@ use muxing::StreamMuxer; use parking_lot::Mutex; use std::io::Error as IoError; use std::sync::Arc; -use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode}; +use transport::{MuxedTransport, Transport, UpgradedNode}; +use upgrade::ConnectionUpgrade; /// Allows reusing the same muxed connection multiple times. /// diff --git a/swarm/src/either.rs b/swarm/src/either.rs new file mode 100644 index 00000000..e143b368 --- /dev/null +++ b/swarm/src/either.rs @@ -0,0 +1,216 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use multiaddr::Multiaddr; +use muxing::StreamMuxer; +use std::io::{Error as IoError, Read, Write}; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to +/// either `First` or `Second`. +#[derive(Debug, Copy, Clone)] +pub enum EitherSocket { + First(A), + Second(B), +} + +impl AsyncRead for EitherSocket +where + A: AsyncRead, + B: AsyncRead, +{ + #[inline] + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + match self { + &EitherSocket::First(ref a) => a.prepare_uninitialized_buffer(buf), + &EitherSocket::Second(ref b) => b.prepare_uninitialized_buffer(buf), + } + } +} + +impl Read for EitherSocket +where + A: Read, + B: Read, +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + match self { + &mut EitherSocket::First(ref mut a) => a.read(buf), + &mut EitherSocket::Second(ref mut b) => b.read(buf), + } + } +} + +impl AsyncWrite for EitherSocket +where + A: AsyncWrite, + B: AsyncWrite, +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + match self { + &mut EitherSocket::First(ref mut a) => a.shutdown(), + &mut EitherSocket::Second(ref mut b) => b.shutdown(), + } + } +} + +impl Write for EitherSocket +where + A: Write, + B: Write, +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> Result { + match self { + &mut EitherSocket::First(ref mut a) => a.write(buf), + &mut EitherSocket::Second(ref mut b) => b.write(buf), + } + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + match self { + &mut EitherSocket::First(ref mut a) => a.flush(), + &mut EitherSocket::Second(ref mut b) => b.flush(), + } + } +} + +impl StreamMuxer for EitherSocket +where + A: StreamMuxer, + B: StreamMuxer, +{ + type Substream = EitherSocket; + type InboundSubstream = EitherTransportFuture; + type OutboundSubstream = EitherTransportFuture; + + #[inline] + fn inbound(self) -> Self::InboundSubstream { + match self { + EitherSocket::First(a) => EitherTransportFuture::First(a.inbound()), + EitherSocket::Second(b) => EitherTransportFuture::Second(b.inbound()), + } + } + + #[inline] + fn outbound(self) -> Self::OutboundSubstream { + match self { + EitherSocket::First(a) => EitherTransportFuture::First(a.outbound()), + EitherSocket::Second(b) => EitherTransportFuture::Second(b.outbound()), + } + } +} + +/// Implements `Future` and redirects calls to either `First` or `Second`. +/// +/// Additionally, the output will be wrapped inside a `EitherSocket`. +// TODO: This type is needed because of the lack of `impl Trait` in stable Rust. +// If Rust had impl Trait we could use the Either enum from the futures crate and add some +// modifiers to it. This custom enum is a combination of Either and these modifiers. +#[derive(Debug, Copy, Clone)] +pub enum EitherTransportFuture { + First(A), + Second(B), +} + +impl Future for EitherTransportFuture +where + A: Future, + B: Future, +{ + type Item = EitherSocket; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self { + &mut EitherTransportFuture::First(ref mut a) => { + let item = try_ready!(a.poll()); + Ok(Async::Ready(EitherSocket::First(item))) + } + &mut EitherTransportFuture::Second(ref mut b) => { + let item = try_ready!(b.poll()); + Ok(Async::Ready(EitherSocket::Second(item))) + } + } + } +} + +/// Implements `Stream` and dispatches all method calls to either `First` or `Second`. +#[derive(Debug, Copy, Clone)] +pub enum EitherListenStream { + First(A), + Second(B), +} + +impl Stream for EitherListenStream +where + AStream: Stream, + BStream: Stream, +{ + type Item = EitherListenUpgrade; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + match self { + &mut EitherListenStream::First(ref mut a) => a.poll() + .map(|i| i.map(|v| v.map(EitherListenUpgrade::First))), + &mut EitherListenStream::Second(ref mut a) => a.poll() + .map(|i| i.map(|v| v.map(EitherListenUpgrade::Second))), + } + } +} + +// TODO: This type is needed because of the lack of `impl Trait` in stable Rust. +// If Rust had impl Trait we could use the Either enum from the futures crate and add some +// modifiers to it. This custom enum is a combination of Either and these modifiers. +#[derive(Debug, Copy, Clone)] +pub enum EitherListenUpgrade { + First(A), + Second(B), +} + +impl Future for EitherListenUpgrade +where + A: Future, + B: Future, +{ + type Item = (EitherSocket, Multiaddr); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self { + &mut EitherListenUpgrade::First(ref mut a) => { + let (item, addr) = try_ready!(a.poll()); + Ok(Async::Ready((EitherSocket::First(item), addr))) + } + &mut EitherListenUpgrade::Second(ref mut b) => { + let (item, addr) = try_ready!(b.poll()); + Ok(Async::Ready((EitherSocket::Second(item), addr))) + } + } + } +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index b042ddeb..730a8f79 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -94,7 +94,7 @@ //! # fn main() { //! let tokio_core = tokio_core::reactor::Core::new().unwrap(); //! let tcp_transport = libp2p_tcp_transport::TcpConfig::new(tokio_core.handle()); -//! let upgraded = tcp_transport.with_upgrade(libp2p_swarm::PlainTextConfig); +//! let upgraded = tcp_transport.with_upgrade(libp2p_swarm::upgrade::PlainTextConfig); //! //! // upgraded.dial(...) // automatically applies the plain text protocol on the socket //! # } @@ -217,14 +217,16 @@ extern crate tokio_io; pub extern crate multiaddr; mod connection_reuse; -pub mod swarm; +mod either; + pub mod muxing; +pub mod swarm; pub mod transport; +pub mod upgrade; pub use self::connection_reuse::ConnectionReuse; pub use self::multiaddr::{AddrComponent, Multiaddr}; pub use self::muxing::StreamMuxer; pub use self::swarm::{swarm, SwarmController, SwarmFuture}; -pub use self::transport::{ConnectionUpgrade, OrUpgrade, PlainTextConfig, Transport, UpgradedNode}; -pub use self::transport::{Endpoint, MuxedTransport, SimpleProtocol, UpgradeExt}; -pub use self::transport::DeniedConnectionUpgrade; +pub use self::transport::{MuxedTransport, Transport}; +pub use self::upgrade::{ConnectionUpgrade, Endpoint}; diff --git a/swarm/src/swarm.rs b/swarm/src/swarm.rs index 34962790..3a8242d4 100644 --- a/swarm/src/swarm.rs +++ b/swarm/src/swarm.rs @@ -23,7 +23,8 @@ use std::io::Error as IoError; use futures::{future, Async, Future, IntoFuture, Poll, Stream}; use futures::stream::{FuturesUnordered, StreamFuture}; use futures::sync::mpsc; -use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode}; +use transport::UpgradedNode; +use {ConnectionUpgrade, Multiaddr, MuxedTransport}; /// Creates a swarm. /// diff --git a/swarm/src/transport.rs b/swarm/src/transport.rs deleted file mode 100644 index 1a733c08..00000000 --- a/swarm/src/transport.rs +++ /dev/null @@ -1,1165 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Handles entering a connection with a peer. -//! -//! The two main elements of this module are the `Transport` and `ConnectionUpgrade` traits. -//! `Transport` is implemented on objects that allow dialing and listening. `ConnectionUpgrade` is -//! implemented on objects that make it possible to upgrade a connection (for example by adding an -//! encryption middleware to the connection). -//! -//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and -//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades -//! together in a complex chain of protocols negotiation. - -use bytes::Bytes; -use connection_reuse::ConnectionReuse; -use futures::{stream, Async, Poll, Stream}; -use futures::future::{self, FromErr, Future, FutureResult, IntoFuture}; -use multiaddr::Multiaddr; -use multistream_select; -use muxing::StreamMuxer; -use std::io::{Cursor, Error as IoError, ErrorKind as IoErrorKind, Read, Write}; -use std::iter; -use std::sync::Arc; -use tokio_io::{AsyncRead, AsyncWrite}; - -/// A transport is an object that can be used to produce connections by listening or dialing a -/// peer. -/// -/// This trait is implemented on concrete transports (eg. TCP, UDP, etc.), but also on wrappers -/// around them. -/// -/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other -/// > words, listening or dialing consumes the transport object. This has been designed -/// > so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly -/// > on `Foo`. -pub trait Transport { - /// The raw connection to a peer. - type RawConn: AsyncRead + AsyncWrite; - - /// The listener produces incoming connections. - /// - /// An item should be produced whenever a connection is received at the lowest level of the - /// transport stack. The item is a `Future` that is signalled once some pre-processing has - /// taken place, and that connection has been upgraded to the wanted protocols. - type Listener: Stream; - - /// After a connection has been received, we may need to do some asynchronous pre-processing - /// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we - /// want to be able to continue polling on the listener. - type ListenerUpgrade: Future; - - /// A future which indicates that we are currently dialing to a peer. - type Dial: IntoFuture; - - /// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified - /// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised - /// to other nodes, instead of the one passed as parameter. - /// - /// Returns the address back if it isn't supported. - /// - /// > **Note**: The reason why we need to change the `Multiaddr` on success is to handle - /// > situations such as turning `/ip4/127.0.0.1/tcp/0` into - /// > `/ip4/127.0.0.1/tcp/`. - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> - where - Self: Sized; - - /// Dial to the given multi-addr. - /// - /// Returns either a future which may resolve to a connection, or gives back the multiaddress. - fn dial(self, addr: Multiaddr) -> Result - where - Self: Sized; - - /// Takes a multiaddress we're listening on (`server`), and tries to convert it to an - /// externally-visible multiaddress. In order to do so, we pass an `observed` address which - /// a remote node observes for one of our dialers. - /// - /// For example, if `server` is `/ip4/0.0.0.0/tcp/3000` and `observed` is - /// `/ip4/80.81.82.83/tcp/29601`, then we should return `/ip4/80.81.82.83/tcp/3000`. Each - /// implementation of `Transport` is only responsible for handling the protocols it supports. - /// - /// Returns `None` if nothing can be determined. This happens if this trait implementation - /// doesn't recognize the protocols, or if `server` and `observed` are related. - fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; - - /// Builds a new struct that implements `Transport` that contains both `self` and `other`. - /// - /// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial` - /// return an error then `other` will be tried. - #[inline] - fn or_transport(self, other: T) -> OrTransport - where - Self: Sized, - { - OrTransport(self, other) - } - - /// Wraps this transport inside an upgrade. Whenever a connection that uses this transport - /// is established, it is wrapped inside the upgrade. - /// - /// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio* - /// > (communication encryption), *multiplex*, but also a protocol handler. - #[inline] - fn with_upgrade(self, upgrade: U) -> UpgradedNode - where - Self: Sized, - U: ConnectionUpgrade, - { - UpgradedNode { - transports: self, - upgrade: upgrade, - } - } - - /// Builds a dummy implementation of `MuxedTransport` that uses this transport. - /// - /// The resulting object will not actually use muxing. This means that dialing the same node - /// twice will result in two different connections instead of two substreams on the same - /// connection. - #[inline] - fn with_dummy_muxing(self) -> DummyMuxing - where - Self: Sized, - { - DummyMuxing { inner: self } - } -} - -/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which -/// the dialed node can dial you back. -pub trait MuxedTransport: Transport { - /// Future resolving to a future that will resolve to an incoming connection. - type Incoming: Future; - /// Future resolving to an incoming connection. - type IncomingUpgrade: Future; - - /// Returns the next incoming substream opened by a node that we dialed ourselves. - /// - /// > **Note**: Doesn't produce incoming substreams coming from addresses we are listening on. - /// > This only concerns nodes that we dialed with `dial()`. - fn next_incoming(self) -> Self::Incoming - where - Self: Sized; - - /// Returns a stream of incoming connections. - #[inline] - fn incoming( - self, - ) -> stream::AndThen, fn(Self) -> Self::Incoming, Self::Incoming> - where - Self: Sized + Clone, - { - stream::repeat(self).and_then(|me| me.next_incoming()) - } -} - -/// Dummy implementation of `Transport` that just denies every single attempt. -#[derive(Debug, Copy, Clone)] -pub struct DeniedTransport; - -impl Transport for DeniedTransport { - // TODO: could use `!` for associated types once stable - type RawConn = Cursor>; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; - - #[inline] - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - Err((DeniedTransport, addr)) - } - - #[inline] - fn dial(self, addr: Multiaddr) -> Result { - Err((DeniedTransport, addr)) - } - - #[inline] - fn nat_traversal(&self, _: &Multiaddr, _: &Multiaddr) -> Option { - None - } -} - -impl MuxedTransport for DeniedTransport { - type Incoming = future::Empty; - type IncomingUpgrade = future::Empty<(Self::RawConn, Multiaddr), IoError>; - - #[inline] - fn next_incoming(self) -> Self::Incoming { - future::empty() - } -} - -/// Struct returned by `or_transport()`. -#[derive(Debug, Copy, Clone)] -pub struct OrTransport(A, B); - -impl Transport for OrTransport -where - A: Transport, - B: Transport, -{ - type RawConn = EitherSocket; - type Listener = EitherListenStream; - type ListenerUpgrade = EitherListenUpgrade; - type Dial = - EitherListenUpgrade<::Future, ::Future>; - - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let (first, addr) = match self.0.listen_on(addr) { - Ok((connec, addr)) => return Ok((EitherListenStream::First(connec), addr)), - Err(err) => err, - }; - - match self.1.listen_on(addr) { - Ok((connec, addr)) => Ok((EitherListenStream::Second(connec), addr)), - Err((second, addr)) => Err((OrTransport(first, second), addr)), - } - } - - fn dial(self, addr: Multiaddr) -> Result { - let (first, addr) = match self.0.dial(addr) { - Ok(connec) => return Ok(EitherListenUpgrade::First(connec.into_future())), - Err(err) => err, - }; - - match self.1.dial(addr) { - Ok(connec) => Ok(EitherListenUpgrade::Second(connec.into_future())), - Err((second, addr)) => Err((OrTransport(first, second), addr)), - } - } - - #[inline] - fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - let first = self.0.nat_traversal(server, observed); - if let Some(first) = first { - return Some(first); - } - - self.1.nat_traversal(server, observed) - } -} - -/// Implementation of `ConnectionUpgrade`. Convenient to use with small protocols. -#[derive(Debug)] -pub struct SimpleProtocol { - name: Bytes, - // Note: we put the closure `F` in an `Arc` because Rust closures aren't automatically clonable - // yet. - upgrade: Arc, -} - -impl SimpleProtocol { - /// Builds a `SimpleProtocol`. - #[inline] - pub fn new(name: N, upgrade: F) -> SimpleProtocol - where - N: Into, - { - SimpleProtocol { - name: name.into(), - upgrade: Arc::new(upgrade), - } - } -} - -impl Clone for SimpleProtocol { - #[inline] - fn clone(&self) -> Self { - SimpleProtocol { - name: self.name.clone(), - upgrade: self.upgrade.clone(), - } - } -} - -impl MuxedTransport for OrTransport -where - A: MuxedTransport, - B: MuxedTransport, - A::Incoming: 'static, // TODO: meh :-/ - B::Incoming: 'static, // TODO: meh :-/ - A::IncomingUpgrade: 'static, // TODO: meh :-/ - B::IncomingUpgrade: 'static, // TODO: meh :-/ - A::RawConn: 'static, // TODO: meh :-/ - B::RawConn: 'static, // TODO: meh :-/ -{ - type Incoming = Box>; - type IncomingUpgrade = - Box, Multiaddr), Error = IoError>>; - - #[inline] - fn next_incoming(self) -> Self::Incoming { - let first = self.0.next_incoming().map(|out| { - let fut = out.map(move |(v, addr)| (EitherSocket::First(v), addr)); - Box::new(fut) as Box> - }); - let second = self.1.next_incoming().map(|out| { - let fut = out.map(move |(v, addr)| (EitherSocket::Second(v), addr)); - Box::new(fut) as Box> - }); - let future = first.select(second).map(|(i, _)| i).map_err(|(e, _)| e); - Box::new(future) as Box<_> - } -} - -impl ConnectionUpgrade for SimpleProtocol -where - C: AsyncRead + AsyncWrite, - F: Fn(C) -> O, - O: IntoFuture, -{ - type NamesIter = iter::Once<(Bytes, ())>; - type UpgradeIdentifier = (); - - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once((self.name.clone(), ())) - } - - type Output = O::Item; - type Future = FromErr; - - #[inline] - fn upgrade(self, socket: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future { - let upgrade = &self.upgrade; - upgrade(socket).into_future().from_err() - } -} - -/// Implements `Stream` and dispatches all method calls to either `First` or `Second`. -#[derive(Debug, Copy, Clone)] -pub enum EitherListenStream { - First(A), - Second(B), -} - -impl Stream for EitherListenStream -where - AStream: Stream, - BStream: Stream, -{ - type Item = EitherListenUpgrade; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - match self { - &mut EitherListenStream::First(ref mut a) => a.poll() - .map(|i| i.map(|v| v.map(EitherListenUpgrade::First))), - &mut EitherListenStream::Second(ref mut a) => a.poll() - .map(|i| i.map(|v| v.map(EitherListenUpgrade::Second))), - } - } -} - -/// Implements `Stream` and dispatches all method calls to either `First` or `Second`. -#[derive(Debug, Copy, Clone)] -pub enum EitherIncomingStream { - First(A), - Second(B), -} - -impl Stream for EitherIncomingStream -where - A: Stream, - B: Stream, -{ - type Item = EitherSocket; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - match self { - &mut EitherIncomingStream::First(ref mut a) => { - a.poll().map(|i| i.map(|v| v.map(EitherSocket::First))) - } - &mut EitherIncomingStream::Second(ref mut a) => { - a.poll().map(|i| i.map(|v| v.map(EitherSocket::Second))) - } - } - } -} - -// TODO: This type is needed because of the lack of `impl Trait` in stable Rust. -// If Rust had impl Trait we could use the Either enum from the futures crate and add some -// modifiers to it. This custom enum is a combination of Either and these modifiers. -#[derive(Debug, Copy, Clone)] -pub enum EitherListenUpgrade { - First(A), - Second(B), -} - -impl Future for EitherListenUpgrade -where - A: Future, - B: Future, -{ - type Item = (EitherSocket, Multiaddr); - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { - match self { - &mut EitherListenUpgrade::First(ref mut a) => { - let (item, addr) = try_ready!(a.poll()); - Ok(Async::Ready((EitherSocket::First(item), addr))) - } - &mut EitherListenUpgrade::Second(ref mut b) => { - let (item, addr) = try_ready!(b.poll()); - Ok(Async::Ready((EitherSocket::Second(item), addr))) - } - } - } -} - -/// Implements `Future` and redirects calls to either `First` or `Second`. -/// -/// Additionally, the output will be wrapped inside a `EitherSocket`. -// TODO: This type is needed because of the lack of `impl Trait` in stable Rust. -// If Rust had impl Trait we could use the Either enum from the futures crate and add some -// modifiers to it. This custom enum is a combination of Either and these modifiers. -#[derive(Debug, Copy, Clone)] -pub enum EitherTransportFuture { - First(A), - Second(B), -} - -impl Future for EitherTransportFuture -where - A: Future, - B: Future, -{ - type Item = EitherSocket; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { - match self { - &mut EitherTransportFuture::First(ref mut a) => { - let item = try_ready!(a.poll()); - Ok(Async::Ready(EitherSocket::First(item))) - } - &mut EitherTransportFuture::Second(ref mut b) => { - let item = try_ready!(b.poll()); - Ok(Async::Ready(EitherSocket::Second(item))) - } - } - } -} - -/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to either `First` or -/// `Second`. -#[derive(Debug, Copy, Clone)] -pub enum EitherSocket { - First(A), - Second(B), -} - -impl AsyncRead for EitherSocket -where - A: AsyncRead, - B: AsyncRead, -{ - #[inline] - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - match self { - &EitherSocket::First(ref a) => a.prepare_uninitialized_buffer(buf), - &EitherSocket::Second(ref b) => b.prepare_uninitialized_buffer(buf), - } - } -} - -impl Read for EitherSocket -where - A: Read, - B: Read, -{ - #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { - match self { - &mut EitherSocket::First(ref mut a) => a.read(buf), - &mut EitherSocket::Second(ref mut b) => b.read(buf), - } - } -} - -impl AsyncWrite for EitherSocket -where - A: AsyncWrite, - B: AsyncWrite, -{ - #[inline] - fn shutdown(&mut self) -> Poll<(), IoError> { - match self { - &mut EitherSocket::First(ref mut a) => a.shutdown(), - &mut EitherSocket::Second(ref mut b) => b.shutdown(), - } - } -} - -impl Write for EitherSocket -where - A: Write, - B: Write, -{ - #[inline] - fn write(&mut self, buf: &[u8]) -> Result { - match self { - &mut EitherSocket::First(ref mut a) => a.write(buf), - &mut EitherSocket::Second(ref mut b) => b.write(buf), - } - } - - #[inline] - fn flush(&mut self) -> Result<(), IoError> { - match self { - &mut EitherSocket::First(ref mut a) => a.flush(), - &mut EitherSocket::Second(ref mut b) => b.flush(), - } - } -} - -impl StreamMuxer for EitherSocket -where - A: StreamMuxer, - B: StreamMuxer, -{ - type Substream = EitherSocket; - type InboundSubstream = EitherTransportFuture; - type OutboundSubstream = EitherTransportFuture; - - #[inline] - fn inbound(self) -> Self::InboundSubstream { - match self { - EitherSocket::First(a) => EitherTransportFuture::First(a.inbound()), - EitherSocket::Second(b) => EitherTransportFuture::Second(b.inbound()), - } - } - - #[inline] - fn outbound(self) -> Self::OutboundSubstream { - match self { - EitherSocket::First(a) => EitherTransportFuture::First(a.outbound()), - EitherSocket::Second(b) => EitherTransportFuture::Second(b.outbound()), - } - } -} - -/// Implemented on structs that describe a possible upgrade to a connection between two peers. -/// -/// The generic `C` is the type of the incoming connection before it is upgraded. -/// -/// > **Note**: The `upgrade` method of this trait uses `self` and not `&self` or `&mut self`. -/// > This has been designed so that you would implement this trait on `&Foo` or -/// > `&mut Foo` instead of directly on `Foo`. -pub trait ConnectionUpgrade { - /// Iterator returned by `protocol_names`. - type NamesIter: Iterator; - /// Type that serves as an identifier for the protocol. This type only exists to be returned - /// by the `NamesIter` and then be passed to `upgrade`. - /// - /// This is only useful on implementations that dispatch between multiple possible upgrades. - /// Any basic implementation will probably just use the `()` type. - type UpgradeIdentifier; - - /// Returns the name of the protocols to advertise to the remote. - fn protocol_names(&self) -> Self::NamesIter; - - /// Type of the stream that has been upgraded. Generally wraps around `C` and `Self`. - /// - /// > **Note**: For upgrades that add an intermediary layer (such as `secio` or `multiplex`), - /// > this associated type must implement `AsyncRead + AsyncWrite`. - type Output; - /// Type of the future that will resolve to `Self::Output`. - type Future: Future; - - /// This method is called after protocol negotiation has been performed. - /// - /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), - /// this function returns a future instead of the direct output. - fn upgrade( - self, - socket: C, - id: Self::UpgradeIdentifier, - ty: Endpoint, - remote_addr: &Multiaddr, - ) -> Self::Future; -} - -/// Type of connection for the upgrade. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum Endpoint { - /// The socket comes from a dialer. - Dialer, - /// The socket comes from a listener. - Listener, -} - -/// Implementation of `ConnectionUpgrade` that always fails to negotiate. -#[derive(Debug, Copy, Clone)] -pub struct DeniedConnectionUpgrade; - -impl ConnectionUpgrade for DeniedConnectionUpgrade -where - C: AsyncRead + AsyncWrite, -{ - type NamesIter = iter::Empty<(Bytes, ())>; - type UpgradeIdentifier = (); // TODO: could use `!` - type Output = (); // TODO: could use `!` - type Future = Box>; // TODO: could use `!` - - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::empty() - } - - #[inline] - fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr) -> Self::Future { - unreachable!("the denied connection upgrade always fails to negotiate") - } -} - -/// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything. -pub trait UpgradeExt { - /// Builds a struct that will choose an upgrade between `self` and `other`, depending on what - /// the remote supports. - fn or_upgrade(self, other: T) -> OrUpgrade - where - Self: Sized; -} - -impl UpgradeExt for T { - #[inline] - fn or_upgrade(self, other: U) -> OrUpgrade { - OrUpgrade(self, other) - } -} - -/// See `or_upgrade()`. -#[derive(Debug, Copy, Clone)] -pub struct OrUpgrade(A, B); - -impl ConnectionUpgrade for OrUpgrade -where - C: AsyncRead + AsyncWrite, - A: ConnectionUpgrade, - B: ConnectionUpgrade, -{ - type NamesIter = NamesIterChain; - type UpgradeIdentifier = EitherUpgradeIdentifier; - - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - NamesIterChain { - first: self.0.protocol_names(), - second: self.1.protocol_names(), - } - } - - type Output = EitherSocket; - type Future = EitherConnUpgrFuture; - - #[inline] - fn upgrade( - self, - socket: C, - id: Self::UpgradeIdentifier, - ty: Endpoint, - remote_addr: &Multiaddr, - ) -> Self::Future { - match id { - EitherUpgradeIdentifier::First(id) => { - EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty, remote_addr)) - } - EitherUpgradeIdentifier::Second(id) => { - EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty, remote_addr)) - } - } - } -} - -/// Internal struct used by the `OrUpgrade` trait. -#[derive(Debug, Copy, Clone)] -pub enum EitherUpgradeIdentifier { - First(A), - Second(B), -} - -/// Implements `Future` and redirects calls to either `First` or `Second`. -/// -/// Additionally, the output will be wrapped inside a `EitherSocket`. -/// -// TODO: This type is needed because of the lack of `impl Trait` in stable Rust. -// If Rust had impl Trait we could use the Either enum from the futures crate and add some -// modifiers to it. This custom enum is a combination of Either and these modifiers. -#[derive(Debug, Copy, Clone)] -pub enum EitherConnUpgrFuture { - First(A), - Second(B), -} - -impl Future for EitherConnUpgrFuture -where - A: Future, - B: Future, -{ - type Item = EitherSocket; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { - match self { - &mut EitherConnUpgrFuture::First(ref mut a) => { - let item = try_ready!(a.poll()); - Ok(Async::Ready(EitherSocket::First(item))) - } - &mut EitherConnUpgrFuture::Second(ref mut b) => { - let item = try_ready!(b.poll()); - Ok(Async::Ready(EitherSocket::Second(item))) - } - } - } -} - -/// Internal type used by the `OrUpgrade` struct. -/// -/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be -/// > removed eventually. -#[derive(Debug, Copy, Clone)] -pub struct NamesIterChain { - first: A, - second: B, -} - -impl Iterator for NamesIterChain -where - A: Iterator, - B: Iterator, -{ - type Item = (Bytes, EitherUpgradeIdentifier); - - #[inline] - fn next(&mut self) -> Option { - if let Some((name, id)) = self.first.next() { - return Some((name, EitherUpgradeIdentifier::First(id))); - } - if let Some((name, id)) = self.second.next() { - return Some((name, EitherUpgradeIdentifier::Second(id))); - } - None - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - let (min1, max1) = self.first.size_hint(); - let (min2, max2) = self.second.size_hint(); - let max = match (max1, max2) { - (Some(max1), Some(max2)) => max1.checked_add(max2), - _ => None, - }; - (min1.saturating_add(min2), max) - } -} - -/// Implementation of the `ConnectionUpgrade` that negotiates the `/plaintext/1.0.0` protocol and -/// simply passes communications through without doing anything more. -/// -/// > **Note**: Generally used as an alternative to `secio` if a security layer is not desirable. -// TODO: move `PlainTextConfig` to a separate crate? -#[derive(Debug, Copy, Clone)] -pub struct PlainTextConfig; - -impl ConnectionUpgrade for PlainTextConfig -where - C: AsyncRead + AsyncWrite, -{ - type Output = C; - type Future = FutureResult; - type UpgradeIdentifier = (); - type NamesIter = iter::Once<(Bytes, ())>; - - #[inline] - fn upgrade(self, i: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future { - future::ok(i) - } - - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/plaintext/1.0.0"), ())) - } -} - -/// Dummy implementation of `MuxedTransport` that uses an inner `Transport`. -#[derive(Debug, Copy, Clone)] -pub struct DummyMuxing { - inner: T, -} - -impl MuxedTransport for DummyMuxing -where - T: Transport, -{ - type Incoming = future::Empty; - type IncomingUpgrade = future::Empty<(T::RawConn, Multiaddr), IoError>; - - fn next_incoming(self) -> Self::Incoming - where - Self: Sized, - { - future::empty() - } -} - -impl Transport for DummyMuxing -where - T: Transport, -{ - type RawConn = T::RawConn; - type Listener = T::Listener; - type ListenerUpgrade = T::ListenerUpgrade; - type Dial = T::Dial; - - #[inline] - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> - where - Self: Sized, - { - self.inner - .listen_on(addr) - .map_err(|(inner, addr)| (DummyMuxing { inner }, addr)) - } - - #[inline] - fn dial(self, addr: Multiaddr) -> Result - where - Self: Sized, - { - self.inner - .dial(addr) - .map_err(|(inner, addr)| (DummyMuxing { inner }, addr)) - } - - #[inline] - fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.nat_traversal(server, observed) - } -} - -/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received -/// connection. -/// -/// See the `Transport::with_upgrade` method. -#[derive(Debug, Clone)] -pub struct UpgradedNode { - transports: T, - upgrade: C, -} - -impl<'a, T, C> UpgradedNode -where - T: Transport + 'a, - C: ConnectionUpgrade + 'a, -{ - /// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the - /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. - #[inline] - pub fn into_connection_reuse(self) -> ConnectionReuse - where - C::Output: StreamMuxer, - { - From::from(self) - } - - /// Returns a reference to the inner `Transport`. - #[inline] - pub fn transport(&self) -> &T { - &self.transports - } - - /// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade - /// the connection. - /// - /// Note that this does the same as `Transport::dial`, but with less restrictions on the trait - /// requirements. - #[inline] - pub fn dial( - self, - addr: Multiaddr, - ) -> Result + 'a>, (Self, Multiaddr)> - { - let upgrade = self.upgrade; - - let dialed_fut = match self.transports.dial(addr.clone()) { - Ok(f) => f.into_future(), - Err((trans, addr)) => { - let builder = UpgradedNode { - transports: trans, - upgrade: upgrade, - }; - - return Err((builder, addr)); - } - }; - - let future = dialed_fut - // Try to negotiate the protocol. - .and_then(move |(connection, client_addr)| { - let iter = upgrade.protocol_names() - .map(|(name, id)| (name, ::eq, id)); - debug!(target: "libp2p-swarm", "Starting protocol negotiation (dialer)"); - let negotiated = multistream_select::dialer_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)); - negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr)) - }) - .then(|negotiated| { - match negotiated { - Ok((_, _, _, ref client_addr)) => { - debug!(target: "libp2p-swarm", "Successfully negotiated protocol \ - upgrade with {}", client_addr) - }, - Err(ref err) => { - debug!(target: "libp2p-swarm", "Error while negotiated protocol \ - upgrade: {:?}", err) - }, - }; - negotiated - }) - .and_then(move |(upgrade_id, connection, upgrade, client_addr)| { - let f = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &client_addr); - debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}", - client_addr); - f.map(|v| (v, client_addr)) - }) - .then(|val| { - match val { - Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \ - protocol"), - Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"), - } - val - }); - - Ok(Box::new(future)) - } - - /// If the underlying transport is a `MuxedTransport`, then after calling `dial` we may receive - /// substreams opened by the dialed nodes. - /// - /// This function returns the next incoming substream. You are strongly encouraged to call it - /// if you have a muxed transport. - pub fn next_incoming( - self, - ) -> Box< - Future< - Item = Box + 'a>, - Error = IoError, - > - + 'a, - > - where - T: MuxedTransport, - C::NamesIter: Clone, // TODO: not elegant - C: Clone, - { - let upgrade = self.upgrade; - - let future = self.transports.next_incoming().map(|future| { - // Try to negotiate the protocol. - let future = future - .and_then(move |(connection, addr)| { - let iter = upgrade - .protocol_names() - .map::<_, fn(_) -> _>(|(name, id)| (name, ::eq, id)); - debug!(target: "libp2p-swarm", "Starting protocol negotiation (incoming)"); - let negotiated = multistream_select::listener_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)); - negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) - }) - .then(|negotiated| { - match negotiated { - Ok((_, _, _, ref client_addr)) => { - debug!(target: "libp2p-swarm", "Successfully negotiated protocol \ - upgrade with {}", client_addr) - } - Err(ref err) => { - debug!(target: "libp2p-swarm", "Error while negotiated protocol \ - upgrade: {:?}", err) - } - }; - negotiated - }) - .and_then(move |(upgrade_id, connection, upgrade, addr)| { - let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Listener, &addr); - debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}", - addr); - upg.map(|u| (u, addr)) - }) - .then(|val| { - match val { - Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \ - protocol"), - Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated \ - protocol"), - } - val - }); - - Box::new(future) as Box> - }); - - Box::new(future) as Box<_> - } - - /// Start listening on the multiaddr using the transport that was passed to `new`. - /// Then whenever a connection is opened, it is upgraded. - /// - /// Note that this does the same as `Transport::listen_on`, but with less restrictions on the - /// trait requirements. - #[inline] - pub fn listen_on( - self, - addr: Multiaddr, - ) -> Result< - ( - Box< - Stream< - Item = Box + 'a>, - Error = IoError, - > - + 'a, - >, - Multiaddr, - ), - (Self, Multiaddr), - > - where - C::NamesIter: Clone, // TODO: not elegant - C: Clone, - { - let upgrade = self.upgrade; - - let (listening_stream, new_addr) = match self.transports.listen_on(addr) { - Ok((l, new_addr)) => (l, new_addr), - Err((trans, addr)) => { - let builder = UpgradedNode { - transports: trans, - upgrade: upgrade, - }; - - return Err((builder, addr)); - } - }; - - // Try to negotiate the protocol. - // Note that failing to negotiate a protocol will never produce a future with an error. - // Instead the `stream` will produce `Ok(Err(...))`. - // `stream` can only produce an `Err` if `listening_stream` produces an `Err`. - let stream = listening_stream.map(move |connection| { - let upgrade = upgrade.clone(); - let connection = connection - // Try to negotiate the protocol - .and_then(move |(connection, remote_addr)| { - let iter = upgrade.protocol_names() - .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); - let remote_addr2 = remote_addr.clone(); - debug!(target: "libp2p-swarm", "Starting protocol negotiation (listener)"); - multistream_select::listener_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .then(move |negotiated| { - match negotiated { - Ok(_) => { - debug!(target: "libp2p-swarm", "Successfully negotiated \ - protocol upgrade with {}", remote_addr2) - }, - Err(ref err) => { - debug!(target: "libp2p-swarm", "Error while negotiated \ - protocol upgrade: {:?}", err) - }, - }; - negotiated - }) - .and_then(move |(upgrade_id, connection)| { - let fut = upgrade.upgrade( - connection, - upgrade_id, - Endpoint::Listener, - &remote_addr, - ); - fut.map(move |c| (c, remote_addr)) - }) - .into_future() - }); - - Box::new(connection) as Box<_> - }); - - Ok((Box::new(stream), new_addr)) - } -} - -impl Transport for UpgradedNode -where - T: Transport + 'static, - C: ConnectionUpgrade + 'static, - C::Output: AsyncRead + AsyncWrite, - C::NamesIter: Clone, // TODO: not elegant - C: Clone, -{ - type RawConn = C::Output; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; - - #[inline] - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - self.listen_on(addr) - } - - #[inline] - fn dial(self, addr: Multiaddr) -> Result { - self.dial(addr) - } - - #[inline] - fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.transports.nat_traversal(server, observed) - } -} - -impl MuxedTransport for UpgradedNode -where - T: MuxedTransport + 'static, - C: ConnectionUpgrade + 'static, - C::Output: AsyncRead + AsyncWrite, - C::NamesIter: Clone, // TODO: not elegant - C: Clone, -{ - type Incoming = Box>; - type IncomingUpgrade = Box>; - - #[inline] - fn next_incoming(self) -> Self::Incoming { - self.next_incoming() - } -} diff --git a/swarm/src/transport/choice.rs b/swarm/src/transport/choice.rs new file mode 100644 index 00000000..dd37a121 --- /dev/null +++ b/swarm/src/transport/choice.rs @@ -0,0 +1,111 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use either::{EitherListenStream, EitherListenUpgrade, EitherSocket}; +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::io::Error as IoError; +use transport::{MuxedTransport, Transport}; + +/// Struct returned by `or_transport()`. +#[derive(Debug, Copy, Clone)] +pub struct OrTransport(A, B); + +impl OrTransport { + pub fn new(a: A, b: B) -> OrTransport { + OrTransport(a, b) + } +} + +impl Transport for OrTransport +where + A: Transport, + B: Transport, +{ + type RawConn = EitherSocket; + type Listener = EitherListenStream; + type ListenerUpgrade = EitherListenUpgrade; + type Dial = + EitherListenUpgrade<::Future, ::Future>; + + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + let (first, addr) = match self.0.listen_on(addr) { + Ok((connec, addr)) => return Ok((EitherListenStream::First(connec), addr)), + Err(err) => err, + }; + + match self.1.listen_on(addr) { + Ok((connec, addr)) => Ok((EitherListenStream::Second(connec), addr)), + Err((second, addr)) => Err((OrTransport(first, second), addr)), + } + } + + fn dial(self, addr: Multiaddr) -> Result { + let (first, addr) = match self.0.dial(addr) { + Ok(connec) => return Ok(EitherListenUpgrade::First(connec.into_future())), + Err(err) => err, + }; + + match self.1.dial(addr) { + Ok(connec) => Ok(EitherListenUpgrade::Second(connec.into_future())), + Err((second, addr)) => Err((OrTransport(first, second), addr)), + } + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + let first = self.0.nat_traversal(server, observed); + if let Some(first) = first { + return Some(first); + } + + self.1.nat_traversal(server, observed) + } +} + +impl MuxedTransport for OrTransport +where + A: MuxedTransport, + B: MuxedTransport, + A::Incoming: 'static, // TODO: meh :-/ + B::Incoming: 'static, // TODO: meh :-/ + A::IncomingUpgrade: 'static, // TODO: meh :-/ + B::IncomingUpgrade: 'static, // TODO: meh :-/ + A::RawConn: 'static, // TODO: meh :-/ + B::RawConn: 'static, // TODO: meh :-/ +{ + type Incoming = Box>; + type IncomingUpgrade = + Box, Multiaddr), Error = IoError>>; + + #[inline] + fn next_incoming(self) -> Self::Incoming { + let first = self.0.next_incoming().map(|out| { + let fut = out.map(move |(v, addr)| (EitherSocket::First(v), addr)); + Box::new(fut) as Box> + }); + let second = self.1.next_incoming().map(|out| { + let fut = out.map(move |(v, addr)| (EitherSocket::Second(v), addr)); + Box::new(fut) as Box> + }); + let future = first.select(second).map(|(i, _)| i).map_err(|(e, _)| e); + Box::new(future) as Box<_> + } +} diff --git a/swarm/src/transport/denied.rs b/swarm/src/transport/denied.rs new file mode 100644 index 00000000..b59df8bd --- /dev/null +++ b/swarm/src/transport/denied.rs @@ -0,0 +1,63 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::future; +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::io::{self, Cursor}; +use transport::Transport; +use transport::MuxedTransport; + +/// Dummy implementation of `Transport` that just denies every single attempt. +#[derive(Debug, Copy, Clone)] +pub struct DeniedTransport; + +impl Transport for DeniedTransport { + // TODO: could use `!` for associated types once stable + type RawConn = Cursor>; + type Listener = Box>; + type ListenerUpgrade = Box>; + type Dial = Box>; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + Err((DeniedTransport, addr)) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result { + Err((DeniedTransport, addr)) + } + + #[inline] + fn nat_traversal(&self, _: &Multiaddr, _: &Multiaddr) -> Option { + None + } +} + +impl MuxedTransport for DeniedTransport { + type Incoming = future::Empty; + type IncomingUpgrade = future::Empty<(Self::RawConn, Multiaddr), io::Error>; + + #[inline] + fn next_incoming(self) -> Self::Incoming { + future::empty() + } +} diff --git a/swarm/src/transport/dummy.rs b/swarm/src/transport/dummy.rs new file mode 100644 index 00000000..8a3940fa --- /dev/null +++ b/swarm/src/transport/dummy.rs @@ -0,0 +1,86 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::future; +use multiaddr::Multiaddr; +use std::io::Error as IoError; +use transport::{MuxedTransport, Transport}; + +/// Dummy implementation of `MuxedTransport` that uses an inner `Transport`. +#[derive(Debug, Copy, Clone)] +pub struct DummyMuxing { + inner: T, +} + +impl DummyMuxing { + pub fn new(transport: T) -> DummyMuxing { + DummyMuxing { inner: transport } + } +} + +impl MuxedTransport for DummyMuxing +where + T: Transport, +{ + type Incoming = future::Empty; + type IncomingUpgrade = future::Empty<(T::RawConn, Multiaddr), IoError>; + + fn next_incoming(self) -> Self::Incoming + where + Self: Sized, + { + future::empty() + } +} + +impl Transport for DummyMuxing +where + T: Transport, +{ + type RawConn = T::RawConn; + type Listener = T::Listener; + type ListenerUpgrade = T::ListenerUpgrade; + type Dial = T::Dial; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> + where + Self: Sized, + { + self.inner + .listen_on(addr) + .map_err(|(inner, addr)| (DummyMuxing { inner }, addr)) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result + where + Self: Sized, + { + self.inner + .dial(addr) + .map_err(|(inner, addr)| (DummyMuxing { inner }, addr)) + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.nat_traversal(server, observed) + } +} diff --git a/swarm/src/transport/mod.rs b/swarm/src/transport/mod.rs new file mode 100644 index 00000000..865ea389 --- /dev/null +++ b/swarm/src/transport/mod.rs @@ -0,0 +1,149 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Handles entering a connection with a peer. +//! +//! The two main elements of this module are the `Transport` and `ConnectionUpgrade` traits. +//! `Transport` is implemented on objects that allow dialing and listening. `ConnectionUpgrade` is +//! implemented on objects that make it possible to upgrade a connection (for example by adding an +//! encryption middleware to the connection). +//! +//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and +//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades +//! together in a complex chain of protocols negotiation. + +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::io::Error as IoError; +use tokio_io::{AsyncRead, AsyncWrite}; +use upgrade::ConnectionUpgrade; + +pub mod choice; +pub mod denied; +pub mod dummy; +pub mod muxed; +pub mod upgrade; + +pub use self::choice::OrTransport; +pub use self::denied::DeniedTransport; +pub use self::dummy::DummyMuxing; +pub use self::muxed::MuxedTransport; +pub use self::upgrade::UpgradedNode; + +/// A transport is an object that can be used to produce connections by listening or dialing a +/// peer. +/// +/// This trait is implemented on concrete transports (eg. TCP, UDP, etc.), but also on wrappers +/// around them. +/// +/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other +/// > words, listening or dialing consumes the transport object. This has been designed +/// > so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly +/// > on `Foo`. +pub trait Transport { + /// The raw connection to a peer. + type RawConn: AsyncRead + AsyncWrite; + + /// The listener produces incoming connections. + /// + /// An item should be produced whenever a connection is received at the lowest level of the + /// transport stack. The item is a `Future` that is signalled once some pre-processing has + /// taken place, and that connection has been upgraded to the wanted protocols. + type Listener: Stream; + + /// After a connection has been received, we may need to do some asynchronous pre-processing + /// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we + /// want to be able to continue polling on the listener. + type ListenerUpgrade: Future; + + /// A future which indicates that we are currently dialing to a peer. + type Dial: IntoFuture; + + /// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified + /// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised + /// to other nodes, instead of the one passed as parameter. + /// + /// Returns the address back if it isn't supported. + /// + /// > **Note**: The reason why we need to change the `Multiaddr` on success is to handle + /// > situations such as turning `/ip4/127.0.0.1/tcp/0` into + /// > `/ip4/127.0.0.1/tcp/`. + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> + where + Self: Sized; + + /// Dial to the given multi-addr. + /// + /// Returns either a future which may resolve to a connection, or gives back the multiaddress. + fn dial(self, addr: Multiaddr) -> Result + where + Self: Sized; + + /// Takes a multiaddress we're listening on (`server`), and tries to convert it to an + /// externally-visible multiaddress. In order to do so, we pass an `observed` address which + /// a remote node observes for one of our dialers. + /// + /// For example, if `server` is `/ip4/0.0.0.0/tcp/3000` and `observed` is + /// `/ip4/80.81.82.83/tcp/29601`, then we should return `/ip4/80.81.82.83/tcp/3000`. Each + /// implementation of `Transport` is only responsible for handling the protocols it supports. + /// + /// Returns `None` if nothing can be determined. This happens if this trait implementation + /// doesn't recognize the protocols, or if `server` and `observed` are related. + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; + + /// Builds a new struct that implements `Transport` that contains both `self` and `other`. + /// + /// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial` + /// return an error then `other` will be tried. + #[inline] + fn or_transport(self, other: T) -> OrTransport + where + Self: Sized, + { + OrTransport::new(self, other) + } + + /// Wraps this transport inside an upgrade. Whenever a connection that uses this transport + /// is established, it is wrapped inside the upgrade. + /// + /// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio* + /// > (communication encryption), *multiplex*, but also a protocol handler. + #[inline] + fn with_upgrade(self, upgrade: U) -> UpgradedNode + where + Self: Sized, + U: ConnectionUpgrade, + { + UpgradedNode::new(self, upgrade) + } + + /// Builds a dummy implementation of `MuxedTransport` that uses this transport. + /// + /// The resulting object will not actually use muxing. This means that dialing the same node + /// twice will result in two different connections instead of two substreams on the same + /// connection. + #[inline] + fn with_dummy_muxing(self) -> DummyMuxing + where + Self: Sized, + { + DummyMuxing::new(self) + } +} diff --git a/swarm/src/transport/muxed.rs b/swarm/src/transport/muxed.rs new file mode 100644 index 00000000..42798d92 --- /dev/null +++ b/swarm/src/transport/muxed.rs @@ -0,0 +1,53 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use futures::stream; +use multiaddr::Multiaddr; +use std::io::Error as IoError; +use transport::Transport; + +/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which +/// the dialed node can dial you back. +pub trait MuxedTransport: Transport { + /// Future resolving to a future that will resolve to an incoming connection. + type Incoming: Future; + /// Future resolving to an incoming connection. + type IncomingUpgrade: Future; + + /// Returns the next incoming substream opened by a node that we dialed ourselves. + /// + /// > **Note**: Doesn't produce incoming substreams coming from addresses we are listening on. + /// > This only concerns nodes that we dialed with `dial()`. + fn next_incoming(self) -> Self::Incoming + where + Self: Sized; + + /// Returns a stream of incoming connections. + #[inline] + fn incoming( + self, + ) -> stream::AndThen, fn(Self) -> Self::Incoming, Self::Incoming> + where + Self: Sized + Clone, + { + stream::repeat(self).and_then(|me| me.next_incoming()) + } +} diff --git a/swarm/src/transport/upgrade.rs b/swarm/src/transport/upgrade.rs new file mode 100644 index 00000000..63aade23 --- /dev/null +++ b/swarm/src/transport/upgrade.rs @@ -0,0 +1,337 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; +use connection_reuse::ConnectionReuse; +use futures::prelude::*; +use multiaddr::Multiaddr; +use multistream_select; +use muxing::StreamMuxer; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use tokio_io::{AsyncRead, AsyncWrite}; +use transport::{MuxedTransport, Transport}; +use upgrade::{ConnectionUpgrade, Endpoint}; + +/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received +/// connection. +/// +/// See the `Transport::with_upgrade` method. +#[derive(Debug, Clone)] +pub struct UpgradedNode { + transports: T, + upgrade: C, +} + +impl UpgradedNode { + pub fn new(transports: T, upgrade: C) -> UpgradedNode { + UpgradedNode { + transports, + upgrade, + } + } +} + +impl<'a, T, C> UpgradedNode +where + T: Transport + 'a, + C: ConnectionUpgrade + 'a, +{ + /// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the + /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. + #[inline] + pub fn into_connection_reuse(self) -> ConnectionReuse + where + C::Output: StreamMuxer, + { + From::from(self) + } + + /// Returns a reference to the inner `Transport`. + #[inline] + pub fn transport(&self) -> &T { + &self.transports + } + + /// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade + /// the connection. + /// + /// Note that this does the same as `Transport::dial`, but with less restrictions on the trait + /// requirements. + #[inline] + pub fn dial( + self, + addr: Multiaddr, + ) -> Result + 'a>, (Self, Multiaddr)> + { + let upgrade = self.upgrade; + + let dialed_fut = match self.transports.dial(addr.clone()) { + Ok(f) => f.into_future(), + Err((trans, addr)) => { + let builder = UpgradedNode { + transports: trans, + upgrade: upgrade, + }; + + return Err((builder, addr)); + } + }; + + let future = dialed_fut + // Try to negotiate the protocol. + .and_then(move |(connection, client_addr)| { + let iter = upgrade.protocol_names() + .map(|(name, id)| (name, ::eq, id)); + debug!(target: "libp2p-swarm", "Starting protocol negotiation (dialer)"); + let negotiated = multistream_select::dialer_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)); + negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr)) + }) + .then(|negotiated| { + match negotiated { + Ok((_, _, _, ref client_addr)) => { + debug!(target: "libp2p-swarm", "Successfully negotiated protocol \ + upgrade with {}", client_addr) + }, + Err(ref err) => { + debug!(target: "libp2p-swarm", "Error while negotiated protocol \ + upgrade: {:?}", err) + }, + }; + negotiated + }) + .and_then(move |(upgrade_id, connection, upgrade, client_addr)| { + let f = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &client_addr); + debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}", + client_addr); + f.map(|v| (v, client_addr)) + }) + .then(|val| { + match val { + Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \ + protocol"), + Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"), + } + val + }); + + Ok(Box::new(future)) + } + + /// If the underlying transport is a `MuxedTransport`, then after calling `dial` we may receive + /// substreams opened by the dialed nodes. + /// + /// This function returns the next incoming substream. You are strongly encouraged to call it + /// if you have a muxed transport. + pub fn next_incoming( + self, + ) -> Box< + Future< + Item = Box + 'a>, + Error = IoError, + > + + 'a, + > + where + T: MuxedTransport, + C::NamesIter: Clone, // TODO: not elegant + C: Clone, + { + let upgrade = self.upgrade; + + let future = self.transports.next_incoming().map(|future| { + // Try to negotiate the protocol. + let future = future + .and_then(move |(connection, addr)| { + let iter = upgrade + .protocol_names() + .map::<_, fn(_) -> _>(|(name, id)| (name, ::eq, id)); + debug!(target: "libp2p-swarm", "Starting protocol negotiation (incoming)"); + let negotiated = multistream_select::listener_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)); + negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) + }) + .then(|negotiated| { + match negotiated { + Ok((_, _, _, ref client_addr)) => { + debug!(target: "libp2p-swarm", "Successfully negotiated protocol \ + upgrade with {}", client_addr) + } + Err(ref err) => { + debug!(target: "libp2p-swarm", "Error while negotiated protocol \ + upgrade: {:?}", err) + } + }; + negotiated + }) + .and_then(move |(upgrade_id, connection, upgrade, addr)| { + let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Listener, &addr); + debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}", + addr); + upg.map(|u| (u, addr)) + }) + .then(|val| { + match val { + Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \ + protocol"), + Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated \ + protocol"), + } + val + }); + + Box::new(future) as Box> + }); + + Box::new(future) as Box<_> + } + + /// Start listening on the multiaddr using the transport that was passed to `new`. + /// Then whenever a connection is opened, it is upgraded. + /// + /// Note that this does the same as `Transport::listen_on`, but with less restrictions on the + /// trait requirements. + #[inline] + pub fn listen_on( + self, + addr: Multiaddr, + ) -> Result< + ( + Box< + Stream< + Item = Box + 'a>, + Error = IoError, + > + + 'a, + >, + Multiaddr, + ), + (Self, Multiaddr), + > + where + C::NamesIter: Clone, // TODO: not elegant + C: Clone, + { + let upgrade = self.upgrade; + + let (listening_stream, new_addr) = match self.transports.listen_on(addr) { + Ok((l, new_addr)) => (l, new_addr), + Err((trans, addr)) => { + let builder = UpgradedNode { + transports: trans, + upgrade: upgrade, + }; + + return Err((builder, addr)); + } + }; + + // Try to negotiate the protocol. + // Note that failing to negotiate a protocol will never produce a future with an error. + // Instead the `stream` will produce `Ok(Err(...))`. + // `stream` can only produce an `Err` if `listening_stream` produces an `Err`. + let stream = listening_stream.map(move |connection| { + let upgrade = upgrade.clone(); + let connection = connection + // Try to negotiate the protocol + .and_then(move |(connection, remote_addr)| { + let iter = upgrade.protocol_names() + .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); + let remote_addr2 = remote_addr.clone(); + debug!(target: "libp2p-swarm", "Starting protocol negotiation (listener)"); + multistream_select::listener_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .then(move |negotiated| { + match negotiated { + Ok(_) => { + debug!(target: "libp2p-swarm", "Successfully negotiated \ + protocol upgrade with {}", remote_addr2) + }, + Err(ref err) => { + debug!(target: "libp2p-swarm", "Error while negotiated \ + protocol upgrade: {:?}", err) + }, + }; + negotiated + }) + .and_then(move |(upgrade_id, connection)| { + let fut = upgrade.upgrade( + connection, + upgrade_id, + Endpoint::Listener, + &remote_addr, + ); + fut.map(move |c| (c, remote_addr)) + }) + .into_future() + }); + + Box::new(connection) as Box<_> + }); + + Ok((Box::new(stream), new_addr)) + } +} + +impl Transport for UpgradedNode +where + T: Transport + 'static, + C: ConnectionUpgrade + 'static, + C::Output: AsyncRead + AsyncWrite, + C::NamesIter: Clone, // TODO: not elegant + C: Clone, +{ + type RawConn = C::Output; + type Listener = Box>; + type ListenerUpgrade = Box>; + type Dial = Box>; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + self.listen_on(addr) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result { + self.dial(addr) + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transports.nat_traversal(server, observed) + } +} + +impl MuxedTransport for UpgradedNode +where + T: MuxedTransport + 'static, + C: ConnectionUpgrade + 'static, + C::Output: AsyncRead + AsyncWrite, + C::NamesIter: Clone, // TODO: not elegant + C: Clone, +{ + type Incoming = Box>; + type IncomingUpgrade = Box>; + + #[inline] + fn next_incoming(self) -> Self::Incoming { + self.next_incoming() + } +} diff --git a/swarm/src/upgrade/choice.rs b/swarm/src/upgrade/choice.rs new file mode 100644 index 00000000..1fd5aaaf --- /dev/null +++ b/swarm/src/upgrade/choice.rs @@ -0,0 +1,159 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; +use either::EitherSocket; +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::io::Error as IoError; +use tokio_io::{AsyncRead, AsyncWrite}; +use upgrade::{ConnectionUpgrade, Endpoint}; + +/// See `transport::Transport::or_upgrade()`. +#[derive(Debug, Copy, Clone)] +pub struct OrUpgrade(A, B); + +impl OrUpgrade { + pub fn new(a: A, b: B) -> OrUpgrade { + OrUpgrade(a, b) + } +} + +impl ConnectionUpgrade for OrUpgrade +where + C: AsyncRead + AsyncWrite, + A: ConnectionUpgrade, + B: ConnectionUpgrade, +{ + type NamesIter = NamesIterChain; + type UpgradeIdentifier = EitherUpgradeIdentifier; + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + NamesIterChain { + first: self.0.protocol_names(), + second: self.1.protocol_names(), + } + } + + type Output = EitherSocket; + type Future = EitherConnUpgrFuture; + + #[inline] + fn upgrade( + self, + socket: C, + id: Self::UpgradeIdentifier, + ty: Endpoint, + remote_addr: &Multiaddr, + ) -> Self::Future { + match id { + EitherUpgradeIdentifier::First(id) => { + EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty, remote_addr)) + } + EitherUpgradeIdentifier::Second(id) => { + EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty, remote_addr)) + } + } + } +} + +/// Internal struct used by the `OrUpgrade` trait. +#[derive(Debug, Copy, Clone)] +pub enum EitherUpgradeIdentifier { + First(A), + Second(B), +} + +/// Implements `Future` and redirects calls to either `First` or `Second`. +/// +/// Additionally, the output will be wrapped inside a `EitherSocket`. +/// +// TODO: This type is needed because of the lack of `impl Trait` in stable Rust. +// If Rust had impl Trait we could use the Either enum from the futures crate and add some +// modifiers to it. This custom enum is a combination of Either and these modifiers. +#[derive(Debug, Copy, Clone)] +pub enum EitherConnUpgrFuture { + First(A), + Second(B), +} + +impl Future for EitherConnUpgrFuture +where + A: Future, + B: Future, +{ + type Item = EitherSocket; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self { + &mut EitherConnUpgrFuture::First(ref mut a) => { + let item = try_ready!(a.poll()); + Ok(Async::Ready(EitherSocket::First(item))) + } + &mut EitherConnUpgrFuture::Second(ref mut b) => { + let item = try_ready!(b.poll()); + Ok(Async::Ready(EitherSocket::Second(item))) + } + } + } +} + +/// Internal type used by the `OrUpgrade` struct. +/// +/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be +/// > removed eventually. +#[derive(Debug, Copy, Clone)] +pub struct NamesIterChain { + first: A, + second: B, +} + +impl Iterator for NamesIterChain +where + A: Iterator, + B: Iterator, +{ + type Item = (Bytes, EitherUpgradeIdentifier); + + #[inline] + fn next(&mut self) -> Option { + if let Some((name, id)) = self.first.next() { + return Some((name, EitherUpgradeIdentifier::First(id))); + } + if let Some((name, id)) = self.second.next() { + return Some((name, EitherUpgradeIdentifier::Second(id))); + } + None + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + let (min1, max1) = self.first.size_hint(); + let (min2, max2) = self.second.size_hint(); + let max = match (max1, max2) { + (Some(max1), Some(max2)) => max1.checked_add(max2), + _ => None, + }; + (min1.saturating_add(min2), max) + } +} diff --git a/swarm/src/upgrade/denied.rs b/swarm/src/upgrade/denied.rs new file mode 100644 index 00000000..b5938965 --- /dev/null +++ b/swarm/src/upgrade/denied.rs @@ -0,0 +1,50 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; +use upgrade::{ConnectionUpgrade, Endpoint}; +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::{io, iter}; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Implementation of `ConnectionUpgrade` that always fails to negotiate. +#[derive(Debug, Copy, Clone)] +pub struct DeniedConnectionUpgrade; + +impl ConnectionUpgrade for DeniedConnectionUpgrade +where + C: AsyncRead + AsyncWrite, +{ + type NamesIter = iter::Empty<(Bytes, ())>; + type UpgradeIdentifier = (); // TODO: could use `!` + type Output = (); // TODO: could use `!` + type Future = Box>; // TODO: could use `!` + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::empty() + } + + #[inline] + fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr) -> Self::Future { + unreachable!("the denied connection upgrade always fails to negotiate") + } +} diff --git a/swarm/src/upgrade/mod.rs b/swarm/src/upgrade/mod.rs new file mode 100644 index 00000000..2db5df18 --- /dev/null +++ b/swarm/src/upgrade/mod.rs @@ -0,0 +1,31 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +pub mod denied; +pub mod traits; +pub mod choice; +pub mod plaintext; +pub mod simple; + +pub use self::choice::OrUpgrade; +pub use self::denied::DeniedConnectionUpgrade; +pub use self::plaintext::PlainTextConfig; +pub use self::simple::SimpleProtocol; +pub use self::traits::{ConnectionUpgrade, Endpoint, UpgradeExt}; diff --git a/swarm/src/upgrade/plaintext.rs b/swarm/src/upgrade/plaintext.rs new file mode 100644 index 00000000..7250759d --- /dev/null +++ b/swarm/src/upgrade/plaintext.rs @@ -0,0 +1,54 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; +use futures::future::{self, FutureResult}; +use multiaddr::Multiaddr; +use std::{iter, io::Error as IoError}; +use tokio_io::{AsyncRead, AsyncWrite}; +use upgrade::{ConnectionUpgrade, Endpoint}; + +/// Implementation of the `ConnectionUpgrade` that negotiates the `/plaintext/1.0.0` protocol and +/// simply passes communications through without doing anything more. +/// +/// > **Note**: Generally used as an alternative to `secio` if a security layer is not desirable. +// TODO: move to a separate crate? +#[derive(Debug, Copy, Clone)] +pub struct PlainTextConfig; + +impl ConnectionUpgrade for PlainTextConfig +where + C: AsyncRead + AsyncWrite, +{ + type Output = C; + type Future = FutureResult; + type UpgradeIdentifier = (); + type NamesIter = iter::Once<(Bytes, ())>; + + #[inline] + fn upgrade(self, i: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future { + future::ok(i) + } + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once((Bytes::from("/plaintext/1.0.0"), ())) + } +} diff --git a/swarm/src/upgrade/simple.rs b/swarm/src/upgrade/simple.rs new file mode 100644 index 00000000..2d75a671 --- /dev/null +++ b/swarm/src/upgrade/simple.rs @@ -0,0 +1,84 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; +use futures::future::FromErr; +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::{iter, io::Error as IoError, sync::Arc}; +use tokio_io::{AsyncRead, AsyncWrite}; +use upgrade::{ConnectionUpgrade, Endpoint}; + +/// Implementation of `ConnectionUpgrade`. Convenient to use with small protocols. +#[derive(Debug)] +pub struct SimpleProtocol { + name: Bytes, + // Note: we put the closure `F` in an `Arc` because Rust closures aren't automatically clonable + // yet. + upgrade: Arc, +} + +impl SimpleProtocol { + /// Builds a `SimpleProtocol`. + #[inline] + pub fn new(name: N, upgrade: F) -> SimpleProtocol + where + N: Into, + { + SimpleProtocol { + name: name.into(), + upgrade: Arc::new(upgrade), + } + } +} + +impl Clone for SimpleProtocol { + #[inline] + fn clone(&self) -> Self { + SimpleProtocol { + name: self.name.clone(), + upgrade: self.upgrade.clone(), + } + } +} + +impl ConnectionUpgrade for SimpleProtocol +where + C: AsyncRead + AsyncWrite, + F: Fn(C) -> O, + O: IntoFuture, +{ + type NamesIter = iter::Once<(Bytes, ())>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once((self.name.clone(), ())) + } + + type Output = O::Item; + type Future = FromErr; + + #[inline] + fn upgrade(self, socket: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future { + let upgrade = &self.upgrade; + upgrade(socket).into_future().from_err() + } +} diff --git a/swarm/src/upgrade/traits.rs b/swarm/src/upgrade/traits.rs new file mode 100644 index 00000000..465d1964 --- /dev/null +++ b/swarm/src/upgrade/traits.rs @@ -0,0 +1,92 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; +use futures::future::Future; +use multiaddr::Multiaddr; +use std::io::Error as IoError; +use tokio_io::{AsyncRead, AsyncWrite}; +use upgrade::choice::OrUpgrade; + +/// Type of connection for the upgrade. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Endpoint { + /// The socket comes from a dialer. + Dialer, + /// The socket comes from a listener. + Listener, +} + +/// Implemented on structs that describe a possible upgrade to a connection between two peers. +/// +/// The generic `C` is the type of the incoming connection before it is upgraded. +/// +/// > **Note**: The `upgrade` method of this trait uses `self` and not `&self` or `&mut self`. +/// > This has been designed so that you would implement this trait on `&Foo` or +/// > `&mut Foo` instead of directly on `Foo`. +pub trait ConnectionUpgrade { + /// Iterator returned by `protocol_names`. + type NamesIter: Iterator; + /// Type that serves as an identifier for the protocol. This type only exists to be returned + /// by the `NamesIter` and then be passed to `upgrade`. + /// + /// This is only useful on implementations that dispatch between multiple possible upgrades. + /// Any basic implementation will probably just use the `()` type. + type UpgradeIdentifier; + + /// Returns the name of the protocols to advertise to the remote. + fn protocol_names(&self) -> Self::NamesIter; + + /// Type of the stream that has been upgraded. Generally wraps around `C` and `Self`. + /// + /// > **Note**: For upgrades that add an intermediary layer (such as `secio` or `multiplex`), + /// > this associated type must implement `AsyncRead + AsyncWrite`. + type Output; + /// Type of the future that will resolve to `Self::Output`. + type Future: Future; + + /// This method is called after protocol negotiation has been performed. + /// + /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), + /// this function returns a future instead of the direct output. + fn upgrade( + self, + socket: C, + id: Self::UpgradeIdentifier, + ty: Endpoint, + remote_addr: &Multiaddr, + ) -> Self::Future; +} + +/// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything. +pub trait UpgradeExt { + /// Builds a struct that will choose an upgrade between `self` and `other`, depending on what + /// the remote supports. + fn or_upgrade(self, other: T) -> OrUpgrade + where + Self: Sized; +} + +impl UpgradeExt for T { + #[inline] + fn or_upgrade(self, other: U) -> OrUpgrade { + OrUpgrade::new(self, other) + } +}