Refactor transport into multiple modules. (#170)

Addresses #158.

Besides the refactoring, the type `EitherIncomingStream` has been
removed as it is not used.
This commit is contained in:
Toralf Wittner
2018-05-02 11:50:48 +02:00
committed by Pierre Krieger
parent 3f2b6a5238
commit d346a6f495
23 changed files with 1513 additions and 1185 deletions

View File

@ -32,7 +32,8 @@ extern crate tokio_io;
use futures::{Future, Sink, Stream};
use futures::sync::oneshot;
use std::env;
use swarm::{DeniedConnectionUpgrade, SimpleProtocol, Transport, UpgradeExt};
use swarm::Transport;
use swarm::upgrade::{self, DeniedConnectionUpgrade, SimpleProtocol, UpgradeExt};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::AsyncRead;
@ -62,7 +63,7 @@ fn main() {
// On top of TCP/IP, we will use either the plaintext protocol or the secio protocol,
// depending on which one the remote supports.
.with_upgrade({
let plain_text = swarm::PlainTextConfig;
let plain_text = upgrade::PlainTextConfig;
let secio = {
let private_key = include_bytes!("test-private-key.pk8");

View File

@ -32,7 +32,8 @@ extern crate tokio_io;
use futures::future::{loop_fn, Future, IntoFuture, Loop};
use futures::{Sink, Stream};
use std::env;
use swarm::{SimpleProtocol, Transport, UpgradeExt};
use swarm::Transport;
use swarm::upgrade::{self, SimpleProtocol, UpgradeExt};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::AsyncRead;
@ -61,7 +62,7 @@ fn main() {
// On top of TCP/IP, we will use either the plaintext protocol or the secio protocol,
// depending on which one the remote supports.
.with_upgrade({
let plain_text = swarm::PlainTextConfig;
let plain_text = upgrade::PlainTextConfig;
let secio = {
let private_key = include_bytes!("test-private-key.pk8");

View File

@ -37,7 +37,8 @@ use futures::future::Future;
use futures::Stream;
use peerstore::PeerId;
use std::{env, mem};
use swarm::{Multiaddr, Transport, UpgradeExt};
use swarm::{Multiaddr, Transport};
use swarm::upgrade::{self, UpgradeExt};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use websocket::WsConfig;
@ -64,7 +65,7 @@ fn main() {
// On top of TCP/IP, we will use either the plaintext protocol or the secio protocol,
// depending on which one the remote supports.
.with_upgrade({
let plain_text = swarm::PlainTextConfig;
let plain_text = upgrade::PlainTextConfig;
let secio = {
let private_key = include_bytes!("test-private-key.pk8");

View File

@ -39,7 +39,8 @@ use peerstore::PeerId;
use std::env;
use std::sync::Arc;
use std::time::Duration;
use swarm::{Transport, UpgradeExt};
use swarm::Transport;
use swarm::upgrade::{self, UpgradeExt};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
@ -69,7 +70,7 @@ fn main() {
// On top of TCP/IP, we will use either the plaintext protocol or the secio protocol,
// depending on which one the remote supports.
.with_upgrade({
let plain_text = swarm::PlainTextConfig;
let plain_text = upgrade::PlainTextConfig;
let secio = {
let private_key = include_bytes!("test-private-key.pk8");

View File

@ -32,7 +32,8 @@ extern crate tokio_io;
use futures::Future;
use futures::sync::oneshot;
use std::env;
use swarm::{DeniedConnectionUpgrade, Transport, UpgradeExt};
use swarm::Transport;
use swarm::upgrade::{self, DeniedConnectionUpgrade, UpgradeExt};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
@ -54,7 +55,7 @@ fn main() {
// On top of TCP/IP, we will use either the plaintext protocol or the secio protocol,
// depending on which one the remote supports.
.with_upgrade({
let plain_text = swarm::PlainTextConfig;
let plain_text = upgrade::PlainTextConfig;
let secio = {
let private_key = include_bytes!("test-private-key.pk8");

View File

@ -92,8 +92,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use futures::{Future, Sink, Stream};
use futures::future::{loop_fn, FutureResult, IntoFuture, Loop};
use futures::sync::{mpsc, oneshot};
use libp2p_swarm::Multiaddr;
use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint};
use libp2p_swarm::{ConnectionUpgrade, Endpoint, Multiaddr};
use log::Level;
use parking_lot::Mutex;
use rand::Rand;
@ -310,7 +309,7 @@ mod tests {
use futures::future::join_all;
use futures::Future;
use futures::Stream;
use libp2p_swarm::transport::{ConnectionUpgrade, Endpoint};
use libp2p_swarm::{ConnectionUpgrade, Endpoint};
#[test]
fn ping_pong() {

View File

@ -50,7 +50,8 @@ use muxing::StreamMuxer;
use parking_lot::Mutex;
use std::io::Error as IoError;
use std::sync::Arc;
use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode};
use transport::{MuxedTransport, Transport, UpgradedNode};
use upgrade::ConnectionUpgrade;
/// Allows reusing the same muxed connection multiple times.
///

216
swarm/src/either.rs Normal file
View File

@ -0,0 +1,216 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use std::io::{Error as IoError, Read, Write};
use tokio_io::{AsyncRead, AsyncWrite};
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
/// either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
pub enum EitherSocket<A, B> {
First(A),
Second(B),
}
impl<A, B> AsyncRead for EitherSocket<A, B>
where
A: AsyncRead,
B: AsyncRead,
{
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match self {
&EitherSocket::First(ref a) => a.prepare_uninitialized_buffer(buf),
&EitherSocket::Second(ref b) => b.prepare_uninitialized_buffer(buf),
}
}
}
impl<A, B> Read for EitherSocket<A, B>
where
A: Read,
B: Read,
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
match self {
&mut EitherSocket::First(ref mut a) => a.read(buf),
&mut EitherSocket::Second(ref mut b) => b.read(buf),
}
}
}
impl<A, B> AsyncWrite for EitherSocket<A, B>
where
A: AsyncWrite,
B: AsyncWrite,
{
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> {
match self {
&mut EitherSocket::First(ref mut a) => a.shutdown(),
&mut EitherSocket::Second(ref mut b) => b.shutdown(),
}
}
}
impl<A, B> Write for EitherSocket<A, B>
where
A: Write,
B: Write,
{
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
match self {
&mut EitherSocket::First(ref mut a) => a.write(buf),
&mut EitherSocket::Second(ref mut b) => b.write(buf),
}
}
#[inline]
fn flush(&mut self) -> Result<(), IoError> {
match self {
&mut EitherSocket::First(ref mut a) => a.flush(),
&mut EitherSocket::Second(ref mut b) => b.flush(),
}
}
}
impl<A, B> StreamMuxer for EitherSocket<A, B>
where
A: StreamMuxer,
B: StreamMuxer,
{
type Substream = EitherSocket<A::Substream, B::Substream>;
type InboundSubstream = EitherTransportFuture<A::InboundSubstream, B::InboundSubstream>;
type OutboundSubstream = EitherTransportFuture<A::OutboundSubstream, B::OutboundSubstream>;
#[inline]
fn inbound(self) -> Self::InboundSubstream {
match self {
EitherSocket::First(a) => EitherTransportFuture::First(a.inbound()),
EitherSocket::Second(b) => EitherTransportFuture::Second(b.inbound()),
}
}
#[inline]
fn outbound(self) -> Self::OutboundSubstream {
match self {
EitherSocket::First(a) => EitherTransportFuture::First(a.outbound()),
EitherSocket::Second(b) => EitherTransportFuture::Second(b.outbound()),
}
}
}
/// Implements `Future` and redirects calls to either `First` or `Second`.
///
/// Additionally, the output will be wrapped inside a `EitherSocket`.
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
// modifiers to it. This custom enum is a combination of Either and these modifiers.
#[derive(Debug, Copy, Clone)]
pub enum EitherTransportFuture<A, B> {
First(A),
Second(B),
}
impl<A, B> Future for EitherTransportFuture<A, B>
where
A: Future<Error = IoError>,
B: Future<Error = IoError>,
{
type Item = EitherSocket<A::Item, B::Item>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherTransportFuture::First(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(EitherSocket::First(item)))
}
&mut EitherTransportFuture::Second(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(EitherSocket::Second(item)))
}
}
}
}
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
pub enum EitherListenStream<A, B> {
First(A),
Second(B),
}
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
where
AStream: Stream<Item = AInner, Error = IoError>,
BStream: Stream<Item = BInner, Error = IoError>,
{
type Item = EitherListenUpgrade<AInner, BInner>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut EitherListenStream::First(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(EitherListenUpgrade::First))),
&mut EitherListenStream::Second(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(EitherListenUpgrade::Second))),
}
}
}
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
// modifiers to it. This custom enum is a combination of Either and these modifiers.
#[derive(Debug, Copy, Clone)]
pub enum EitherListenUpgrade<A, B> {
First(A),
Second(B),
}
impl<A, B, Ao, Bo> Future for EitherListenUpgrade<A, B>
where
A: Future<Item = (Ao, Multiaddr), Error = IoError>,
B: Future<Item = (Bo, Multiaddr), Error = IoError>,
{
type Item = (EitherSocket<Ao, Bo>, Multiaddr);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherListenUpgrade::First(ref mut a) => {
let (item, addr) = try_ready!(a.poll());
Ok(Async::Ready((EitherSocket::First(item), addr)))
}
&mut EitherListenUpgrade::Second(ref mut b) => {
let (item, addr) = try_ready!(b.poll());
Ok(Async::Ready((EitherSocket::Second(item), addr)))
}
}
}
}

View File

@ -94,7 +94,7 @@
//! # fn main() {
//! let tokio_core = tokio_core::reactor::Core::new().unwrap();
//! let tcp_transport = libp2p_tcp_transport::TcpConfig::new(tokio_core.handle());
//! let upgraded = tcp_transport.with_upgrade(libp2p_swarm::PlainTextConfig);
//! let upgraded = tcp_transport.with_upgrade(libp2p_swarm::upgrade::PlainTextConfig);
//!
//! // upgraded.dial(...) // automatically applies the plain text protocol on the socket
//! # }
@ -217,14 +217,16 @@ extern crate tokio_io;
pub extern crate multiaddr;
mod connection_reuse;
pub mod swarm;
mod either;
pub mod muxing;
pub mod swarm;
pub mod transport;
pub mod upgrade;
pub use self::connection_reuse::ConnectionReuse;
pub use self::multiaddr::{AddrComponent, Multiaddr};
pub use self::muxing::StreamMuxer;
pub use self::swarm::{swarm, SwarmController, SwarmFuture};
pub use self::transport::{ConnectionUpgrade, OrUpgrade, PlainTextConfig, Transport, UpgradedNode};
pub use self::transport::{Endpoint, MuxedTransport, SimpleProtocol, UpgradeExt};
pub use self::transport::DeniedConnectionUpgrade;
pub use self::transport::{MuxedTransport, Transport};
pub use self::upgrade::{ConnectionUpgrade, Endpoint};

View File

@ -23,7 +23,8 @@ use std::io::Error as IoError;
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
use futures::stream::{FuturesUnordered, StreamFuture};
use futures::sync::mpsc;
use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode};
use transport::UpgradedNode;
use {ConnectionUpgrade, Multiaddr, MuxedTransport};
/// Creates a swarm.
///

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,111 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use either::{EitherListenStream, EitherListenUpgrade, EitherSocket};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::{MuxedTransport, Transport};
/// Struct returned by `or_transport()`.
#[derive(Debug, Copy, Clone)]
pub struct OrTransport<A, B>(A, B);
impl<A, B> OrTransport<A, B> {
pub fn new(a: A, b: B) -> OrTransport<A, B> {
OrTransport(a, b)
}
}
impl<A, B> Transport for OrTransport<A, B>
where
A: Transport,
B: Transport,
{
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherListenUpgrade<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial =
EitherListenUpgrade<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (first, addr) = match self.0.listen_on(addr) {
Ok((connec, addr)) => return Ok((EitherListenStream::First(connec), addr)),
Err(err) => err,
};
match self.1.listen_on(addr) {
Ok((connec, addr)) => Ok((EitherListenStream::Second(connec), addr)),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let (first, addr) = match self.0.dial(addr) {
Ok(connec) => return Ok(EitherListenUpgrade::First(connec.into_future())),
Err(err) => err,
};
match self.1.dial(addr) {
Ok(connec) => Ok(EitherListenUpgrade::Second(connec.into_future())),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
let first = self.0.nat_traversal(server, observed);
if let Some(first) = first {
return Some(first);
}
self.1.nat_traversal(server, observed)
}
}
impl<A, B> MuxedTransport for OrTransport<A, B>
where
A: MuxedTransport,
B: MuxedTransport,
A::Incoming: 'static, // TODO: meh :-/
B::Incoming: 'static, // TODO: meh :-/
A::IncomingUpgrade: 'static, // TODO: meh :-/
B::IncomingUpgrade: 'static, // TODO: meh :-/
A::RawConn: 'static, // TODO: meh :-/
B::RawConn: 'static, // TODO: meh :-/
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade =
Box<Future<Item = (EitherSocket<A::RawConn, B::RawConn>, Multiaddr), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
let first = self.0.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherSocket::First(v), addr));
Box::new(fut) as Box<Future<Item = _, Error = _>>
});
let second = self.1.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherSocket::Second(v), addr));
Box::new(fut) as Box<Future<Item = _, Error = _>>
});
let future = first.select(second).map(|(i, _)| i).map_err(|(e, _)| e);
Box::new(future) as Box<_>
}
}

View File

@ -0,0 +1,63 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::future;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::{self, Cursor};
use transport::Transport;
use transport::MuxedTransport;
/// Dummy implementation of `Transport` that just denies every single attempt.
#[derive(Debug, Copy, Clone)]
pub struct DeniedTransport;
impl Transport for DeniedTransport {
// TODO: could use `!` for associated types once stable
type RawConn = Cursor<Vec<u8>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = io::Error>>;
type ListenerUpgrade = Box<Future<Item = (Self::RawConn, Multiaddr), Error = io::Error>>;
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = io::Error>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Err((DeniedTransport, addr))
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
Err((DeniedTransport, addr))
}
#[inline]
fn nat_traversal(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
}
impl MuxedTransport for DeniedTransport {
type Incoming = future::Empty<Self::IncomingUpgrade, io::Error>;
type IncomingUpgrade = future::Empty<(Self::RawConn, Multiaddr), io::Error>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
future::empty()
}
}

View File

@ -0,0 +1,86 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::future;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::{MuxedTransport, Transport};
/// Dummy implementation of `MuxedTransport` that uses an inner `Transport`.
#[derive(Debug, Copy, Clone)]
pub struct DummyMuxing<T> {
inner: T,
}
impl<T> DummyMuxing<T> {
pub fn new(transport: T) -> DummyMuxing<T> {
DummyMuxing { inner: transport }
}
}
impl<T> MuxedTransport for DummyMuxing<T>
where
T: Transport,
{
type Incoming = future::Empty<Self::IncomingUpgrade, IoError>;
type IncomingUpgrade = future::Empty<(T::RawConn, Multiaddr), IoError>;
fn next_incoming(self) -> Self::Incoming
where
Self: Sized,
{
future::empty()
}
}
impl<T> Transport for DummyMuxing<T>
where
T: Transport,
{
type RawConn = T::RawConn;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where
Self: Sized,
{
self.inner
.listen_on(addr)
.map_err(|(inner, addr)| (DummyMuxing { inner }, addr))
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where
Self: Sized,
{
self.inner
.dial(addr)
.map_err(|(inner, addr)| (DummyMuxing { inner }, addr))
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.nat_traversal(server, observed)
}
}

149
swarm/src/transport/mod.rs Normal file
View File

@ -0,0 +1,149 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Handles entering a connection with a peer.
//!
//! The two main elements of this module are the `Transport` and `ConnectionUpgrade` traits.
//! `Transport` is implemented on objects that allow dialing and listening. `ConnectionUpgrade` is
//! implemented on objects that make it possible to upgrade a connection (for example by adding an
//! encryption middleware to the connection).
//!
//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and
//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades
//! together in a complex chain of protocols negotiation.
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::ConnectionUpgrade;
pub mod choice;
pub mod denied;
pub mod dummy;
pub mod muxed;
pub mod upgrade;
pub use self::choice::OrTransport;
pub use self::denied::DeniedTransport;
pub use self::dummy::DummyMuxing;
pub use self::muxed::MuxedTransport;
pub use self::upgrade::UpgradedNode;
/// A transport is an object that can be used to produce connections by listening or dialing a
/// peer.
///
/// This trait is implemented on concrete transports (eg. TCP, UDP, etc.), but also on wrappers
/// around them.
///
/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other
/// > words, listening or dialing consumes the transport object. This has been designed
/// > so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly
/// > on `Foo`.
pub trait Transport {
/// The raw connection to a peer.
type RawConn: AsyncRead + AsyncWrite;
/// The listener produces incoming connections.
///
/// An item should be produced whenever a connection is received at the lowest level of the
/// transport stack. The item is a `Future` that is signalled once some pre-processing has
/// taken place, and that connection has been upgraded to the wanted protocols.
type Listener: Stream<Item = Self::ListenerUpgrade, Error = IoError>;
/// After a connection has been received, we may need to do some asynchronous pre-processing
/// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we
/// want to be able to continue polling on the listener.
type ListenerUpgrade: Future<Item = (Self::RawConn, Multiaddr), Error = IoError>;
/// A future which indicates that we are currently dialing to a peer.
type Dial: IntoFuture<Item = (Self::RawConn, Multiaddr), Error = IoError>;
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
/// to other nodes, instead of the one passed as parameter.
///
/// Returns the address back if it isn't supported.
///
/// > **Note**: The reason why we need to change the `Multiaddr` on success is to handle
/// > situations such as turning `/ip4/127.0.0.1/tcp/0` into
/// > `/ip4/127.0.0.1/tcp/<actual port>`.
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where
Self: Sized;
/// Dial to the given multi-addr.
///
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where
Self: Sized;
/// Takes a multiaddress we're listening on (`server`), and tries to convert it to an
/// externally-visible multiaddress. In order to do so, we pass an `observed` address which
/// a remote node observes for one of our dialers.
///
/// For example, if `server` is `/ip4/0.0.0.0/tcp/3000` and `observed` is
/// `/ip4/80.81.82.83/tcp/29601`, then we should return `/ip4/80.81.82.83/tcp/3000`. Each
/// implementation of `Transport` is only responsible for handling the protocols it supports.
///
/// Returns `None` if nothing can be determined. This happens if this trait implementation
/// doesn't recognize the protocols, or if `server` and `observed` are related.
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
/// Builds a new struct that implements `Transport` that contains both `self` and `other`.
///
/// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial`
/// return an error then `other` will be tried.
#[inline]
fn or_transport<T>(self, other: T) -> OrTransport<Self, T>
where
Self: Sized,
{
OrTransport::new(self, other)
}
/// Wraps this transport inside an upgrade. Whenever a connection that uses this transport
/// is established, it is wrapped inside the upgrade.
///
/// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio*
/// > (communication encryption), *multiplex*, but also a protocol handler.
#[inline]
fn with_upgrade<U>(self, upgrade: U) -> UpgradedNode<Self, U>
where
Self: Sized,
U: ConnectionUpgrade<Self::RawConn>,
{
UpgradedNode::new(self, upgrade)
}
/// Builds a dummy implementation of `MuxedTransport` that uses this transport.
///
/// The resulting object will not actually use muxing. This means that dialing the same node
/// twice will result in two different connections instead of two substreams on the same
/// connection.
#[inline]
fn with_dummy_muxing(self) -> DummyMuxing<Self>
where
Self: Sized,
{
DummyMuxing::new(self)
}
}

View File

@ -0,0 +1,53 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use futures::stream;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::Transport;
/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which
/// the dialed node can dial you back.
pub trait MuxedTransport: Transport {
/// Future resolving to a future that will resolve to an incoming connection.
type Incoming: Future<Item = Self::IncomingUpgrade, Error = IoError>;
/// Future resolving to an incoming connection.
type IncomingUpgrade: Future<Item = (Self::RawConn, Multiaddr), Error = IoError>;
/// Returns the next incoming substream opened by a node that we dialed ourselves.
///
/// > **Note**: Doesn't produce incoming substreams coming from addresses we are listening on.
/// > This only concerns nodes that we dialed with `dial()`.
fn next_incoming(self) -> Self::Incoming
where
Self: Sized;
/// Returns a stream of incoming connections.
#[inline]
fn incoming(
self,
) -> stream::AndThen<stream::Repeat<Self, IoError>, fn(Self) -> Self::Incoming, Self::Incoming>
where
Self: Sized + Clone,
{
stream::repeat(self).and_then(|me| me.next_incoming())
}
}

View File

@ -0,0 +1,337 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use connection_reuse::ConnectionReuse;
use futures::prelude::*;
use multiaddr::Multiaddr;
use multistream_select;
use muxing::StreamMuxer;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use tokio_io::{AsyncRead, AsyncWrite};
use transport::{MuxedTransport, Transport};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received
/// connection.
///
/// See the `Transport::with_upgrade` method.
#[derive(Debug, Clone)]
pub struct UpgradedNode<T, C> {
transports: T,
upgrade: C,
}
impl<T, C> UpgradedNode<T, C> {
pub fn new(transports: T, upgrade: C) -> UpgradedNode<T, C> {
UpgradedNode {
transports,
upgrade,
}
}
}
impl<'a, T, C> UpgradedNode<T, C>
where
T: Transport + 'a,
C: ConnectionUpgrade<T::RawConn> + 'a,
{
/// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the
/// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`.
#[inline]
pub fn into_connection_reuse(self) -> ConnectionReuse<T, C>
where
C::Output: StreamMuxer,
{
From::from(self)
}
/// Returns a reference to the inner `Transport`.
#[inline]
pub fn transport(&self) -> &T {
&self.transports
}
/// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade
/// the connection.
///
/// Note that this does the same as `Transport::dial`, but with less restrictions on the trait
/// requirements.
#[inline]
pub fn dial(
self,
addr: Multiaddr,
) -> Result<Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, (Self, Multiaddr)>
{
let upgrade = self.upgrade;
let dialed_fut = match self.transports.dial(addr.clone()) {
Ok(f) => f.into_future(),
Err((trans, addr)) => {
let builder = UpgradedNode {
transports: trans,
upgrade: upgrade,
};
return Err((builder, addr));
}
};
let future = dialed_fut
// Try to negotiate the protocol.
.and_then(move |(connection, client_addr)| {
let iter = upgrade.protocol_names()
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
debug!(target: "libp2p-swarm", "Starting protocol negotiation (dialer)");
let negotiated = multistream_select::dialer_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr))
})
.then(|negotiated| {
match negotiated {
Ok((_, _, _, ref client_addr)) => {
debug!(target: "libp2p-swarm", "Successfully negotiated protocol \
upgrade with {}", client_addr)
},
Err(ref err) => {
debug!(target: "libp2p-swarm", "Error while negotiated protocol \
upgrade: {:?}", err)
},
};
negotiated
})
.and_then(move |(upgrade_id, connection, upgrade, client_addr)| {
let f = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &client_addr);
debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}",
client_addr);
f.map(|v| (v, client_addr))
})
.then(|val| {
match val {
Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \
protocol"),
Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"),
}
val
});
Ok(Box::new(future))
}
/// If the underlying transport is a `MuxedTransport`, then after calling `dial` we may receive
/// substreams opened by the dialed nodes.
///
/// This function returns the next incoming substream. You are strongly encouraged to call it
/// if you have a muxed transport.
pub fn next_incoming(
self,
) -> Box<
Future<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
Error = IoError,
>
+ 'a,
>
where
T: MuxedTransport,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
let upgrade = self.upgrade;
let future = self.transports.next_incoming().map(|future| {
// Try to negotiate the protocol.
let future = future
.and_then(move |(connection, addr)| {
let iter = upgrade
.protocol_names()
.map::<_, fn(_) -> _>(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
debug!(target: "libp2p-swarm", "Starting protocol negotiation (incoming)");
let negotiated = multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
})
.then(|negotiated| {
match negotiated {
Ok((_, _, _, ref client_addr)) => {
debug!(target: "libp2p-swarm", "Successfully negotiated protocol \
upgrade with {}", client_addr)
}
Err(ref err) => {
debug!(target: "libp2p-swarm", "Error while negotiated protocol \
upgrade: {:?}", err)
}
};
negotiated
})
.and_then(move |(upgrade_id, connection, upgrade, addr)| {
let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Listener, &addr);
debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}",
addr);
upg.map(|u| (u, addr))
})
.then(|val| {
match val {
Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \
protocol"),
Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated \
protocol"),
}
val
});
Box::new(future) as Box<Future<Item = _, Error = _>>
});
Box::new(future) as Box<_>
}
/// Start listening on the multiaddr using the transport that was passed to `new`.
/// Then whenever a connection is opened, it is upgraded.
///
/// Note that this does the same as `Transport::listen_on`, but with less restrictions on the
/// trait requirements.
#[inline]
pub fn listen_on(
self,
addr: Multiaddr,
) -> Result<
(
Box<
Stream<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
Error = IoError,
>
+ 'a,
>,
Multiaddr,
),
(Self, Multiaddr),
>
where
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
let upgrade = self.upgrade;
let (listening_stream, new_addr) = match self.transports.listen_on(addr) {
Ok((l, new_addr)) => (l, new_addr),
Err((trans, addr)) => {
let builder = UpgradedNode {
transports: trans,
upgrade: upgrade,
};
return Err((builder, addr));
}
};
// Try to negotiate the protocol.
// Note that failing to negotiate a protocol will never produce a future with an error.
// Instead the `stream` will produce `Ok(Err(...))`.
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
let stream = listening_stream.map(move |connection| {
let upgrade = upgrade.clone();
let connection = connection
// Try to negotiate the protocol
.and_then(move |(connection, remote_addr)| {
let iter = upgrade.protocol_names()
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
let remote_addr2 = remote_addr.clone();
debug!(target: "libp2p-swarm", "Starting protocol negotiation (listener)");
multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.then(move |negotiated| {
match negotiated {
Ok(_) => {
debug!(target: "libp2p-swarm", "Successfully negotiated \
protocol upgrade with {}", remote_addr2)
},
Err(ref err) => {
debug!(target: "libp2p-swarm", "Error while negotiated \
protocol upgrade: {:?}", err)
},
};
negotiated
})
.and_then(move |(upgrade_id, connection)| {
let fut = upgrade.upgrade(
connection,
upgrade_id,
Endpoint::Listener,
&remote_addr,
);
fut.map(move |c| (c, remote_addr))
})
.into_future()
});
Box::new(connection) as Box<_>
});
Ok((Box::new(stream), new_addr))
}
}
impl<T, C> Transport for UpgradedNode<T, C>
where
T: Transport + 'static,
C: ConnectionUpgrade<T::RawConn> + 'static,
C::Output: AsyncRead + AsyncWrite,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
type RawConn = C::Output;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
self.listen_on(addr)
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
self.dial(addr)
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transports.nat_traversal(server, observed)
}
}
impl<T, C> MuxedTransport for UpgradedNode<T, C>
where
T: MuxedTransport + 'static,
C: ConnectionUpgrade<T::RawConn> + 'static,
C::Output: AsyncRead + AsyncWrite,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
self.next_incoming()
}
}

159
swarm/src/upgrade/choice.rs Normal file
View File

@ -0,0 +1,159 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use either::EitherSocket;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// See `transport::Transport::or_upgrade()`.
#[derive(Debug, Copy, Clone)]
pub struct OrUpgrade<A, B>(A, B);
impl<A, B> OrUpgrade<A, B> {
pub fn new(a: A, b: B) -> OrUpgrade<A, B> {
OrUpgrade(a, b)
}
}
impl<C, A, B> ConnectionUpgrade<C> for OrUpgrade<A, B>
where
C: AsyncRead + AsyncWrite,
A: ConnectionUpgrade<C>,
B: ConnectionUpgrade<C>,
{
type NamesIter = NamesIterChain<A::NamesIter, B::NamesIter>;
type UpgradeIdentifier = EitherUpgradeIdentifier<A::UpgradeIdentifier, B::UpgradeIdentifier>;
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
NamesIterChain {
first: self.0.protocol_names(),
second: self.1.protocol_names(),
}
}
type Output = EitherSocket<A::Output, B::Output>;
type Future = EitherConnUpgrFuture<A::Future, B::Future>;
#[inline]
fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr,
) -> Self::Future {
match id {
EitherUpgradeIdentifier::First(id) => {
EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty, remote_addr))
}
EitherUpgradeIdentifier::Second(id) => {
EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty, remote_addr))
}
}
}
}
/// Internal struct used by the `OrUpgrade` trait.
#[derive(Debug, Copy, Clone)]
pub enum EitherUpgradeIdentifier<A, B> {
First(A),
Second(B),
}
/// Implements `Future` and redirects calls to either `First` or `Second`.
///
/// Additionally, the output will be wrapped inside a `EitherSocket`.
///
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
// modifiers to it. This custom enum is a combination of Either and these modifiers.
#[derive(Debug, Copy, Clone)]
pub enum EitherConnUpgrFuture<A, B> {
First(A),
Second(B),
}
impl<A, B> Future for EitherConnUpgrFuture<A, B>
where
A: Future<Error = IoError>,
B: Future<Error = IoError>,
{
type Item = EitherSocket<A::Item, B::Item>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherConnUpgrFuture::First(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(EitherSocket::First(item)))
}
&mut EitherConnUpgrFuture::Second(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(EitherSocket::Second(item)))
}
}
}
}
/// Internal type used by the `OrUpgrade` struct.
///
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
/// > removed eventually.
#[derive(Debug, Copy, Clone)]
pub struct NamesIterChain<A, B> {
first: A,
second: B,
}
impl<A, B, AId, BId> Iterator for NamesIterChain<A, B>
where
A: Iterator<Item = (Bytes, AId)>,
B: Iterator<Item = (Bytes, BId)>,
{
type Item = (Bytes, EitherUpgradeIdentifier<AId, BId>);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some((name, id)) = self.first.next() {
return Some((name, EitherUpgradeIdentifier::First(id)));
}
if let Some((name, id)) = self.second.next() {
return Some((name, EitherUpgradeIdentifier::Second(id)));
}
None
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let (min1, max1) = self.first.size_hint();
let (min2, max2) = self.second.size_hint();
let max = match (max1, max2) {
(Some(max1), Some(max2)) => max1.checked_add(max2),
_ => None,
};
(min1.saturating_add(min2), max)
}
}

View File

@ -0,0 +1,50 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use upgrade::{ConnectionUpgrade, Endpoint};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{io, iter};
use tokio_io::{AsyncRead, AsyncWrite};
/// Implementation of `ConnectionUpgrade` that always fails to negotiate.
#[derive(Debug, Copy, Clone)]
pub struct DeniedConnectionUpgrade;
impl<C> ConnectionUpgrade<C> for DeniedConnectionUpgrade
where
C: AsyncRead + AsyncWrite,
{
type NamesIter = iter::Empty<(Bytes, ())>;
type UpgradeIdentifier = (); // TODO: could use `!`
type Output = (); // TODO: could use `!`
type Future = Box<Future<Item = (), Error = io::Error>>; // TODO: could use `!`
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::empty()
}
#[inline]
fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr) -> Self::Future {
unreachable!("the denied connection upgrade always fails to negotiate")
}
}

31
swarm/src/upgrade/mod.rs Normal file
View File

@ -0,0 +1,31 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
pub mod denied;
pub mod traits;
pub mod choice;
pub mod plaintext;
pub mod simple;
pub use self::choice::OrUpgrade;
pub use self::denied::DeniedConnectionUpgrade;
pub use self::plaintext::PlainTextConfig;
pub use self::simple::SimpleProtocol;
pub use self::traits::{ConnectionUpgrade, Endpoint, UpgradeExt};

View File

@ -0,0 +1,54 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::future::{self, FutureResult};
use multiaddr::Multiaddr;
use std::{iter, io::Error as IoError};
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Implementation of the `ConnectionUpgrade` that negotiates the `/plaintext/1.0.0` protocol and
/// simply passes communications through without doing anything more.
///
/// > **Note**: Generally used as an alternative to `secio` if a security layer is not desirable.
// TODO: move to a separate crate?
#[derive(Debug, Copy, Clone)]
pub struct PlainTextConfig;
impl<C> ConnectionUpgrade<C> for PlainTextConfig
where
C: AsyncRead + AsyncWrite,
{
type Output = C;
type Future = FutureResult<C, IoError>;
type UpgradeIdentifier = ();
type NamesIter = iter::Once<(Bytes, ())>;
#[inline]
fn upgrade(self, i: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
future::ok(i)
}
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/plaintext/1.0.0"), ()))
}
}

View File

@ -0,0 +1,84 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::future::FromErr;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{iter, io::Error as IoError, sync::Arc};
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Implementation of `ConnectionUpgrade`. Convenient to use with small protocols.
#[derive(Debug)]
pub struct SimpleProtocol<F> {
name: Bytes,
// Note: we put the closure `F` in an `Arc` because Rust closures aren't automatically clonable
// yet.
upgrade: Arc<F>,
}
impl<F> SimpleProtocol<F> {
/// Builds a `SimpleProtocol`.
#[inline]
pub fn new<N>(name: N, upgrade: F) -> SimpleProtocol<F>
where
N: Into<Bytes>,
{
SimpleProtocol {
name: name.into(),
upgrade: Arc::new(upgrade),
}
}
}
impl<F> Clone for SimpleProtocol<F> {
#[inline]
fn clone(&self) -> Self {
SimpleProtocol {
name: self.name.clone(),
upgrade: self.upgrade.clone(),
}
}
}
impl<C, F, O> ConnectionUpgrade<C> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(C) -> O,
O: IntoFuture<Error = IoError>,
{
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((self.name.clone(), ()))
}
type Output = O::Item;
type Future = FromErr<O::Future, IoError>;
#[inline]
fn upgrade(self, socket: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err()
}
}

View File

@ -0,0 +1,92 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::future::Future;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::choice::OrUpgrade;
/// Type of connection for the upgrade.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Endpoint {
/// The socket comes from a dialer.
Dialer,
/// The socket comes from a listener.
Listener,
}
/// Implemented on structs that describe a possible upgrade to a connection between two peers.
///
/// The generic `C` is the type of the incoming connection before it is upgraded.
///
/// > **Note**: The `upgrade` method of this trait uses `self` and not `&self` or `&mut self`.
/// > This has been designed so that you would implement this trait on `&Foo` or
/// > `&mut Foo` instead of directly on `Foo`.
pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
/// Iterator returned by `protocol_names`.
type NamesIter: Iterator<Item = (Bytes, Self::UpgradeIdentifier)>;
/// Type that serves as an identifier for the protocol. This type only exists to be returned
/// by the `NamesIter` and then be passed to `upgrade`.
///
/// This is only useful on implementations that dispatch between multiple possible upgrades.
/// Any basic implementation will probably just use the `()` type.
type UpgradeIdentifier;
/// Returns the name of the protocols to advertise to the remote.
fn protocol_names(&self) -> Self::NamesIter;
/// Type of the stream that has been upgraded. Generally wraps around `C` and `Self`.
///
/// > **Note**: For upgrades that add an intermediary layer (such as `secio` or `multiplex`),
/// > this associated type must implement `AsyncRead + AsyncWrite`.
type Output;
/// Type of the future that will resolve to `Self::Output`.
type Future: Future<Item = Self::Output, Error = IoError>;
/// This method is called after protocol negotiation has been performed.
///
/// Because performing the upgrade may not be instantaneous (eg. it may require a handshake),
/// this function returns a future instead of the direct output.
fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr,
) -> Self::Future;
}
/// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything.
pub trait UpgradeExt {
/// Builds a struct that will choose an upgrade between `self` and `other`, depending on what
/// the remote supports.
fn or_upgrade<T>(self, other: T) -> OrUpgrade<Self, T>
where
Self: Sized;
}
impl<T> UpgradeExt for T {
#[inline]
fn or_upgrade<U>(self, other: U) -> OrUpgrade<Self, U> {
OrUpgrade::new(self, other)
}
}