Rename libp2p_swarm to libp2p_core (#189)

This commit is contained in:
Pierre Krieger
2018-05-16 12:59:36 +02:00
committed by GitHub
parent dc6d4df3f8
commit 5c1890e66a
62 changed files with 115 additions and 115 deletions

View File

@ -1,413 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Contains the `ConnectionReuse` struct. Stores open muxed connections to nodes so that dialing
//! a node reuses the same connection instead of opening a new one.
//!
//! A `ConnectionReuse` can only be created from an `UpgradedNode` whose `ConnectionUpgrade`
//! yields as `StreamMuxer`.
//!
//! # Behaviour
//!
//! The API exposed by the `ConnectionReuse` struct consists in the `Transport` trait
//! implementation, with the `dial` and `listen_on` methods.
//!
//! When called on a `ConnectionReuse`, the `listen_on` method will listen on the given
//! multiaddress (by using the underlying `Transport`), then will apply a `flat_map` on the
//! incoming connections so that we actually listen to the incoming substreams of each connection.
//!
//! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has
//! already been opened earlier, and open an outgoing substream on it. If none is available, it
//! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with
//! us. In order to handle these new substreams you should use the `next_incoming` method of the
//! `MuxedTransport` trait.
use fnv::FnvHashMap;
use futures::future::{self, Either, FutureResult, IntoFuture};
use futures::{Async, Future, Poll, Stream};
use futures::stream::Fuse as StreamFuse;
use futures::stream::FuturesUnordered;
use futures::sync::mpsc;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use parking_lot::Mutex;
use std::io::{self, Error as IoError};
use std::sync::Arc;
use tokio_io::{AsyncRead, AsyncWrite};
use transport::{MuxedTransport, Transport, UpgradedNode};
use upgrade::ConnectionUpgrade;
/// Allows reusing the same muxed connection multiple times.
///
/// Can be created from an `UpgradedNode` through the `From` trait.
///
/// Implements the `Transport` trait.
#[derive(Clone)]
pub struct ConnectionReuse<T, C>
where
T: Transport,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output>,
C::Output: StreamMuxer,
{
// Underlying transport and connection upgrade for when we need to dial or listen.
inner: UpgradedNode<T, C>,
// Struct shared between most of the `ConnectionReuse` infrastructure.
shared: Arc<Mutex<Shared<C::Output>>>,
}
struct Shared<M>
where
M: StreamMuxer,
{
// List of active muxers.
active_connections: FnvHashMap<Multiaddr, M>,
// List of pending inbound substreams from dialed nodes.
// Only add to this list elements received through `add_to_next_rx`.
next_incoming: Vec<(M, M::InboundSubstream, Multiaddr)>,
// New elements are not directly added to `next_incoming`. Instead they are sent to this
// channel. This is done so that we can wake up tasks whenever a new element is added.
add_to_next_rx: mpsc::UnboundedReceiver<(M, M::InboundSubstream, Multiaddr)>,
// Other side of `add_to_next_rx`.
add_to_next_tx: mpsc::UnboundedSender<(M, M::InboundSubstream, Multiaddr)>,
}
impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
where
T: Transport,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output>,
C::Output: StreamMuxer,
{
#[inline]
fn from(node: UpgradedNode<T, C>) -> ConnectionReuse<T, C> {
let (tx, rx) = mpsc::unbounded();
ConnectionReuse {
inner: node,
shared: Arc::new(Mutex::new(Shared {
active_connections: Default::default(),
next_incoming: Vec::new(),
add_to_next_rx: rx,
add_to_next_tx: tx,
})),
}
}
}
impl<T, C> Transport for ConnectionReuse<T, C>
where
T: Transport + 'static, // TODO: 'static :(
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'static, // TODO: 'static :(
C: Clone,
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone, // TODO: not elegant
{
type Output = <C::Output as StreamMuxer>::Substream;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = FutureResult<(Self::Output, Multiaddr), IoError>;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
Ok((l, a)) => (l, a),
Err((inner, addr)) => {
return Err((
ConnectionReuse {
inner: inner,
shared: self.shared,
},
addr,
));
}
};
let listener = ConnectionReuseListener {
shared: self.shared.clone(),
listener: listener.fuse(),
current_upgrades: FuturesUnordered::new(),
connections: Vec::new(),
};
Ok((Box::new(listener) as Box<_>, new_addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
// If we already have an active connection, use it!
let substream = if let Some(muxer) = self.shared
.lock()
.active_connections
.get(&addr)
.map(|muxer| muxer.clone())
{
let a = addr.clone();
Either::A(muxer.outbound().map(move |s| s.map(move |s| (s, a))))
} else {
Either::B(future::ok(None))
};
let shared = self.shared.clone();
let inner = self.inner;
let future = substream.and_then(move |outbound| {
if let Some(o) = outbound {
debug!(target: "libp2p-swarm", "Using existing multiplexed connection to {}", addr);
return Either::A(future::ok(o));
}
// The previous stream muxer did not yield a new substream => start new dial
debug!(target: "libp2p-swarm", "No existing connection to {}; dialing", addr);
match inner.dial(addr.clone()) {
Ok(dial) => {
let future = dial.into_future().and_then(move |(muxer, addr)| {
muxer.clone().outbound().and_then(move |substream| {
if let Some(s) = substream {
// Replace the active connection because we are the most recent.
let mut lock = shared.lock();
lock.active_connections.insert(addr.clone(), muxer.clone());
// TODO: doesn't need locking ; the sender could be extracted
let _ = lock.add_to_next_tx.unbounded_send((
muxer.clone(),
muxer.inbound(),
addr.clone(),
));
Ok((s, addr))
} else {
error!(target: "libp2p-swarm", "failed to dial to {}", addr);
shared.lock().active_connections.remove(&addr);
Err(io::Error::new(io::ErrorKind::Other, "dial failed"))
}
})
});
Either::B(Either::A(future))
}
Err(_) => {
let e = io::Error::new(io::ErrorKind::Other, "transport rejected dial");
Either::B(Either::B(future::err(e)))
}
}
});
Ok(Box::new(future) as Box<_>)
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.transport().nat_traversal(server, observed)
}
}
impl<T, C> MuxedTransport for ConnectionReuse<T, C>
where
T: Transport + 'static, // TODO: 'static :(
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'static, // TODO: 'static :(
C: Clone,
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone, // TODO: not elegant
{
type Incoming = ConnectionReuseIncoming<C::Output>;
type IncomingUpgrade =
future::FutureResult<(<C::Output as StreamMuxer>::Substream, Multiaddr), IoError>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
ConnectionReuseIncoming {
shared: self.shared.clone(),
}
}
}
/// Implementation of `Stream` for the connections incoming from listening on a specific address.
pub struct ConnectionReuseListener<S, F, M>
where
S: Stream<Item = F, Error = IoError>,
F: Future<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer,
{
// The main listener. `S` is from the underlying transport.
listener: StreamFuse<S>,
current_upgrades: FuturesUnordered<F>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
// Shared between the whole connection reuse mechanism.
shared: Arc<Mutex<Shared<M>>>,
}
impl<S, F, M> Stream for ConnectionReuseListener<S, F, M>
where
S: Stream<Item = F, Error = IoError>,
F: Future<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer + Clone + 'static, // TODO: 'static :(
{
type Item = FutureResult<(M::Substream, Multiaddr), IoError>;
type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Check for any incoming connection on the listening socket.
// Note that since `self.listener` is a `Fuse`, it's not a problem to continue polling even
// after it is finished or after it error'ed.
match self.listener.poll() {
Ok(Async::Ready(Some(upgrade))) => {
self.current_upgrades.push(upgrade);
}
Ok(Async::NotReady) => {}
Ok(Async::Ready(None)) => {
debug!(target: "libp2p-swarm", "listener has been closed");
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Ok(Async::Ready(None));
}
}
Err(err) => {
debug!(target: "libp2p-swarm", "error while polling listener: {:?}", err);
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Err(err);
}
}
};
// We extract everything at the start, then insert back the elements that we still want at
// the next iteration.
match self.current_upgrades.poll() {
Ok(Async::Ready(Some((muxer, client_addr)))) => {
let next_incoming = muxer.clone().inbound();
self.connections
.push((muxer.clone(), next_incoming, client_addr.clone()));
}
Err(err) => {
// Insert the rest of the pending upgrades, but not the current one.
debug!(target: "libp2p-swarm", "error while upgrading listener connection: \
{:?}", err);
return Ok(Async::Ready(Some(future::err(err))));
}
_ => {}
}
// Check whether any incoming substream is ready.
for n in (0..self.connections.len()).rev() {
let (muxer, mut next_incoming, client_addr) = self.connections.swap_remove(n);
match next_incoming.poll() {
Ok(Async::Ready(None)) => {
// stream muxer gave us a `None` => connection should be considered closed
debug!(target: "libp2p-swarm", "no more inbound substreams on {}", client_addr);
self.shared.lock().active_connections.remove(&client_addr);
}
Ok(Async::Ready(Some(incoming))) => {
// We overwrite any current active connection to that multiaddr because we
// are the freshest possible connection.
self.shared
.lock()
.active_connections
.insert(client_addr.clone(), muxer.clone());
// A new substream is ready.
let mut new_next = muxer.clone().inbound();
self.connections
.push((muxer, new_next, client_addr.clone()));
return Ok(Async::Ready(Some(
Ok((incoming, client_addr)).into_future(),
)));
}
Ok(Async::NotReady) => {
self.connections.push((muxer, next_incoming, client_addr));
}
Err(err) => {
debug!(target: "libp2p-swarm", "error while upgrading the \
multiplexed incoming connection: {:?}", err);
// Insert the rest of the pending connections, but not the current one.
return Ok(Async::Ready(Some(future::err(err))));
}
}
}
// Nothing is ready, return `NotReady`.
Ok(Async::NotReady)
}
}
/// Implementation of `Future` that yields the next incoming substream from a dialed connection.
pub struct ConnectionReuseIncoming<M>
where
M: StreamMuxer,
{
// Shared between the whole connection reuse system.
shared: Arc<Mutex<Shared<M>>>,
}
impl<M> Future for ConnectionReuseIncoming<M>
where
M: Clone + StreamMuxer,
{
type Item = future::FutureResult<(M::Substream, Multiaddr), IoError>;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut lock = self.shared.lock();
// Try to get any new muxer from `add_to_next_rx`.
// We push the new muxers to a channel instead of adding them to `next_incoming`, so that
// tasks are notified when something is pushed.
loop {
match lock.add_to_next_rx.poll() {
Ok(Async::Ready(Some(elem))) => {
lock.next_incoming.push(elem);
}
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) | Err(_) => unreachable!(
"the sender and receiver are both in the same struct, therefore \
the link can never break"
),
}
}
// Check whether any incoming substream is ready.
for n in (0..lock.next_incoming.len()).rev() {
let (muxer, mut future, addr) = lock.next_incoming.swap_remove(n);
match future.poll() {
Ok(Async::Ready(None)) => {
debug!(target: "libp2p-swarm", "no inbound substream for {}", addr);
lock.active_connections.remove(&addr);
}
Ok(Async::Ready(Some(value))) => {
// A substream is ready ; push back the muxer for the next time this function
// is called, then return.
debug!(target: "libp2p-swarm", "New incoming substream");
let next = muxer.clone().inbound();
lock.next_incoming.push((muxer, next, addr.clone()));
return Ok(Async::Ready(future::ok((value, addr))));
}
Ok(Async::NotReady) => {
lock.next_incoming.push((muxer, future, addr));
}
Err(err) => {
// In case of error, we just not push back the element, which drops it.
debug!(target: "libp2p-swarm", "ConnectionReuse incoming: one of the \
multiplexed substreams produced an error: {:?}",
err);
}
}
}
// Nothing is ready.
Ok(Async::NotReady)
}
}

View File

@ -1,239 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use std::io::{Error as IoError, Read, Write};
use tokio_io::{AsyncRead, AsyncWrite};
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
/// either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
pub enum EitherSocket<A, B> {
First(A),
Second(B),
}
impl<A, B> AsyncRead for EitherSocket<A, B>
where
A: AsyncRead,
B: AsyncRead,
{
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
match self {
&EitherSocket::First(ref a) => a.prepare_uninitialized_buffer(buf),
&EitherSocket::Second(ref b) => b.prepare_uninitialized_buffer(buf),
}
}
}
impl<A, B> Read for EitherSocket<A, B>
where
A: Read,
B: Read,
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
match self {
&mut EitherSocket::First(ref mut a) => a.read(buf),
&mut EitherSocket::Second(ref mut b) => b.read(buf),
}
}
}
impl<A, B> AsyncWrite for EitherSocket<A, B>
where
A: AsyncWrite,
B: AsyncWrite,
{
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> {
match self {
&mut EitherSocket::First(ref mut a) => a.shutdown(),
&mut EitherSocket::Second(ref mut b) => b.shutdown(),
}
}
}
impl<A, B> Write for EitherSocket<A, B>
where
A: Write,
B: Write,
{
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
match self {
&mut EitherSocket::First(ref mut a) => a.write(buf),
&mut EitherSocket::Second(ref mut b) => b.write(buf),
}
}
#[inline]
fn flush(&mut self) -> Result<(), IoError> {
match self {
&mut EitherSocket::First(ref mut a) => a.flush(),
&mut EitherSocket::Second(ref mut b) => b.flush(),
}
}
}
impl<A, B> StreamMuxer for EitherSocket<A, B>
where
A: StreamMuxer,
B: StreamMuxer,
{
type Substream = EitherSocket<A::Substream, B::Substream>;
type InboundSubstream = EitherInbound<A, B>;
type OutboundSubstream = EitherOutbound<A, B>;
#[inline]
fn inbound(self) -> Self::InboundSubstream {
match self {
EitherSocket::First(a) => EitherInbound::A(a.inbound()),
EitherSocket::Second(b) => EitherInbound::B(b.inbound()),
}
}
#[inline]
fn outbound(self) -> Self::OutboundSubstream {
match self {
EitherSocket::First(a) => EitherOutbound::A(a.outbound()),
EitherSocket::Second(b) => EitherOutbound::B(b.outbound()),
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum EitherInbound<A: StreamMuxer, B: StreamMuxer> {
A(A::InboundSubstream),
B(B::InboundSubstream),
}
impl<A, B> Future for EitherInbound<A, B>
where
A: StreamMuxer,
B: StreamMuxer,
{
type Item = Option<EitherSocket<A::Substream, B::Substream>>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
EitherInbound::A(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(item.map(EitherSocket::First)))
}
EitherInbound::B(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(item.map(EitherSocket::Second)))
}
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}
impl<A, B> Future for EitherOutbound<A, B>
where
A: StreamMuxer,
B: StreamMuxer,
{
type Item = Option<EitherSocket<A::Substream, B::Substream>>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match *self {
EitherOutbound::A(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(item.map(EitherSocket::First)))
}
EitherOutbound::B(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(item.map(EitherSocket::Second)))
}
}
}
}
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
pub enum EitherListenStream<A, B> {
First(A),
Second(B),
}
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
where
AStream: Stream<Item = AInner, Error = IoError>,
BStream: Stream<Item = BInner, Error = IoError>,
{
type Item = EitherListenUpgrade<AInner, BInner>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut EitherListenStream::First(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(EitherListenUpgrade::First))),
&mut EitherListenStream::Second(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(EitherListenUpgrade::Second))),
}
}
}
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
// modifiers to it. This custom enum is a combination of Either and these modifiers.
#[derive(Debug, Copy, Clone)]
pub enum EitherListenUpgrade<A, B> {
First(A),
Second(B),
}
impl<A, B, Ao, Bo> Future for EitherListenUpgrade<A, B>
where
A: Future<Item = (Ao, Multiaddr), Error = IoError>,
B: Future<Item = (Bo, Multiaddr), Error = IoError>,
{
type Item = (EitherSocket<Ao, Bo>, Multiaddr);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherListenUpgrade::First(ref mut a) => {
let (item, addr) = try_ready!(a.poll());
Ok(Async::Ready((EitherSocket::First(item), addr)))
}
&mut EitherListenUpgrade::Second(ref mut b) => {
let (item, addr) = try_ready!(b.poll());
Ok(Async::Ready((EitherSocket::Second(item), addr)))
}
}
}
}

View File

@ -1,233 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
// TODO: use this once stable ; for now we just copy-paste the content of the README.md
//#![doc(include = "../README.md")]
//! Transport, protocol upgrade and swarm systems of *libp2p*.
//!
//! This crate contains all the core traits and mechanisms of the transport and swarm systems
//! of *libp2p*.
//!
//! # The `Transport` trait
//!
//! The main trait that this crate provides is `Transport`, which provides the `dial` and
//! `listen_on` methods and can be used to dial or listen on a multiaddress. The `swarm` crate
//! itself does not provide any concrete (ie. non-dummy, non-adapter) implementation of this trait.
//! It is implemented on structs that are provided by external crates, such as `TcpConfig` from
//! `tcp-transport`, `UdpConfig`, or `WebsocketConfig` (note: as of the writing of this
//! documentation, the last two structs don't exist yet).
//!
//! Each implementation of `Transport` only supports *some* multiaddress protocols, for example
//! the `TcpConfig` struct only supports multiaddresses that look like `/ip*/*.*.*.*/tcp/*`. It is
//! possible to group two implementations of `Transport` with the `or_transport` method, in order
//! to obtain a single object that supports the protocols of both objects at once. This can be done
//! multiple times in a row in order to chain as many implementations as you want.
//!
//! // TODO: right now only tcp-transport exists, we need to add an example for chaining
//! // multiple transports once that makes sense
//!
//! ## The `MuxedTransport` trait
//!
//! The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on
//! transports that can receive incoming connections on streams that have been opened with `dial()`.
//!
//! The trait provides the `next_incoming()` method, which returns a future that will resolve to
//! the next substream that arrives from a dialed node.
//!
//! > **Note**: This trait is mainly implemented for transports that provide stream muxing
//! > capabilities, but it can also be implemented in a dummy way by returning an empty
//! > iterator.
//!
//! # Connection upgrades
//!
//! Once a socket has been opened with a remote through a `Transport`, it can be *upgraded*. This
//! consists in negotiating a protocol with the remote (through `multistream-select`), and applying
//! that protocol on the socket.
//!
//! A potential connection upgrade is represented with the `ConnectionUpgrade` trait. The trait
//! consists in a protocol name plus a method that turns the socket into an `Output` object whose
//! nature and type is specific to each upgrade.
//!
//! There exists three kinds of connection upgrades: middlewares, muxers, and actual protocols.
//!
//! ## Middlewares
//!
//! Examples of middleware connection upgrades include `PlainTextConfig` (dummy upgrade) or
//! `SecioConfig` (encyption layer, provided by the `secio` crate).
//!
//! The output of a middleware connection upgrade implements the `AsyncRead` and `AsyncWrite`
//! traits, just like sockets do.
//!
//! A middleware can be applied on a transport by using the `with_upgrade` method of the
//! `Transport` trait. The return value of this method also implements the `Transport` trait, which
//! means that you can call `dial()` and `listen_on()` on it in order to directly obtain an
//! upgraded connection or a listener that will yield upgraded connections. Similarly, the
//! `next_incoming()` method will automatically apply the upgrade on both the dialer and the
//! listener. An error is produced if the remote doesn't support the protocol corresponding to the
//! connection upgrade.
//!
//! ```
//! extern crate libp2p_swarm;
//! extern crate libp2p_tcp_transport;
//! extern crate tokio_core;
//!
//! use libp2p_swarm::Transport;
//!
//! # fn main() {
//! let tokio_core = tokio_core::reactor::Core::new().unwrap();
//! let tcp_transport = libp2p_tcp_transport::TcpConfig::new(tokio_core.handle());
//! let upgraded = tcp_transport.with_upgrade(libp2p_swarm::upgrade::PlainTextConfig);
//!
//! // upgraded.dial(...) // automatically applies the plain text protocol on the socket
//! # }
//! ```
//!
//! ## Muxers
//!
//! The concept of *muxing* consists in using a single stream as if it was multiple substreams.
//!
//! If the output of the connection upgrade instead implements the `StreamMuxer` and `Clone`
//! traits, then you can turn the `UpgradedNode` struct into a `ConnectionReuse` struct by calling
//! `ConnectionReuse::from(upgraded_node)`.
//!
//! The `ConnectionReuse` struct then implements the `Transport` and `MuxedTransport` traits, and
//! can be used to dial or listen to multiaddresses, just like any other transport. The only
//! difference is that dialing a node will try to open a new substream on an existing connection
//! instead of opening a new one every time.
//!
//! > **Note**: Right now the `ConnectionReuse` struct is not fully implemented.
//!
//! TODO: add an example once the multiplex pull request is merged
//!
//! ## Actual protocols
//!
//! *Actual protocols* work the same way as middlewares, except that their `Output` doesn't
//! implement the `AsyncRead` and `AsyncWrite` traits. This means that that the return value of
//! `with_upgrade` does **not** implement the `Transport` trait and thus cannot be used as a
//! transport.
//!
//! However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named
//! `dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`,
//! which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific
//! way to use the protocol.
//!
//! ```no_run
//! extern crate futures;
//! extern crate libp2p_ping;
//! extern crate libp2p_swarm;
//! extern crate libp2p_tcp_transport;
//! extern crate tokio_core;
//!
//! use futures::Future;
//! use libp2p_ping::Ping;
//! use libp2p_swarm::Transport;
//!
//! # fn main() {
//! let mut core = tokio_core::reactor::Core::new().unwrap();
//!
//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
//! // We have a `TcpConfig` struct that implements `Transport`, and apply a `Ping` upgrade on it.
//! .with_upgrade(Ping)
//! // TODO: right now the only available protocol is ping, but we want to replace it with
//! // something that is more simple to use
//! .dial("127.0.0.1:12345".parse::<libp2p_swarm::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
//! .and_then(|((mut pinger, service), _)| {
//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!())
//! });
//!
//! // Runs until the ping arrives.
//! core.run(ping_finished_future).unwrap();
//! # }
//! ```
//!
//! ## Grouping protocols
//!
//! You can use the `.or_upgrade()` method to group multiple upgrades together. The return value
//! also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the
//! ones supported.
//!
//! # Swarm
//!
//! Once you have created an object that implements the `Transport` trait, you can put it in a
//! *swarm*. This is done by calling the `swarm()` freestanding function with the transport
//! alongside with a function or a closure that will turn the output of the upgrade (usually an
//! actual protocol, as explained above) into a `Future` producing `()`.
//!
//! ```no_run
//! extern crate futures;
//! extern crate libp2p_ping;
//! extern crate libp2p_swarm;
//! extern crate libp2p_tcp_transport;
//! extern crate tokio_core;
//!
//! use futures::Future;
//! use libp2p_ping::Ping;
//! use libp2p_swarm::Transport;
//!
//! # fn main() {
//! let mut core = tokio_core::reactor::Core::new().unwrap();
//!
//! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
//! .with_dummy_muxing();
//!
//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport.with_upgrade(Ping),
//! |(mut pinger, service), client_addr| {
//! pinger.ping().map_err(|_| panic!())
//! .select(service).map_err(|_| panic!())
//! .map(|_| ())
//! });
//!
//! // The `swarm_controller` can then be used to do some operations.
//! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());
//!
//! // Runs until everything is finished.
//! core.run(swarm_future).unwrap();
//! # }
//! ```
extern crate bytes;
extern crate fnv;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate log;
extern crate multistream_select;
extern crate parking_lot;
extern crate smallvec;
extern crate tokio_io;
/// Multi-address re-export.
pub extern crate multiaddr;
mod connection_reuse;
mod either;
pub mod muxing;
pub mod swarm;
pub mod transport;
pub mod upgrade;
pub use self::connection_reuse::ConnectionReuse;
pub use self::multiaddr::{AddrComponent, Multiaddr};
pub use self::muxing::StreamMuxer;
pub use self::swarm::{swarm, SwarmController, SwarmFuture};
pub use self::transport::{MuxedTransport, Transport};
pub use self::upgrade::{ConnectionUpgrade, Endpoint};

View File

@ -1,51 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::future::Future;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
/// Implemented on objects that can be turned into a substream.
///
/// > **Note**: The methods of this trait consume the object, but if the object implements `Clone`
/// > then you can clone it and keep the original in order to open additional substreams.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
type Substream: AsyncRead + AsyncWrite;
/// Future that will be resolved when a new incoming substream is open.
///
/// A `None` item signals that the underlying resource has been exhausted and
/// no more substreams can be created.
type InboundSubstream: Future<Item = Option<Self::Substream>, Error = IoError>;
/// Future that will be resolved when the outgoing substream is open.
///
/// A `None` item signals that the underlying resource has been exhausted and
/// no more substreams can be created.
type OutboundSubstream: Future<Item = Option<Self::Substream>, Error = IoError>;
/// Produces a future that will be resolved when a new incoming substream arrives.
fn inbound(self) -> Self::InboundSubstream;
/// Opens a new outgoing substream, and produces a future that will be resolved when it becomes
/// available.
fn outbound(self) -> Self::OutboundSubstream;
}

View File

@ -1,328 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::stream::{FuturesUnordered, StreamFuture};
use futures::sync::mpsc;
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
use std::fmt;
use std::io::Error as IoError;
use {Multiaddr, MuxedTransport, Transport};
/// Creates a swarm.
///
/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a
/// `Future` that produces a `()`.
///
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
/// control, and the `Future` must be driven to completion in order for things to work.
///
pub fn swarm<T, H, F>(
transport: T,
handler: H,
) -> (SwarmController<T>, SwarmFuture<T, H, F::Future>)
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
H: FnMut(T::Output, Multiaddr) -> F,
F: IntoFuture<Item = (), Error = IoError>,
{
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded();
let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded();
let future = SwarmFuture {
transport: transport.clone(),
handler: handler,
new_listeners: new_listeners_rx,
next_incoming: transport.clone().next_incoming(),
listeners: FuturesUnordered::new(),
listeners_upgrade: FuturesUnordered::new(),
dialers: FuturesUnordered::new(),
new_dialers: new_dialers_rx,
to_process: FuturesUnordered::new(),
new_toprocess: new_toprocess_rx,
};
let controller = SwarmController {
transport: transport,
new_listeners: new_listeners_tx,
new_dialers: new_dialers_tx,
new_toprocess: new_toprocess_tx,
};
(controller, future)
}
/// Allows control of what the swarm is doing.
pub struct SwarmController<T>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
{
transport: T,
new_listeners: mpsc::UnboundedSender<T::Listener>,
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
}
impl<T> fmt::Debug for SwarmController<T>
where
T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SwarmController")
.field(&self.transport)
.finish()
}
}
impl<T> Clone for SwarmController<T>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
{
fn clone(&self) -> SwarmController<T> {
SwarmController {
transport: self.transport.clone(),
new_listeners: self.new_listeners.clone(),
new_dialers: self.new_dialers.clone(),
new_toprocess: self.new_toprocess.clone(),
}
}
}
impl<T> SwarmController<T>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
{
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
/// calling `swarm`.
// TODO: consider returning a future so that errors can be processed?
pub fn dial_to_handler<Du>(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<T::Output>,
{
trace!(target: "libp2p-swarm", "Swarm dialing {}", multiaddr);
match transport.dial(multiaddr.clone()) {
Ok(dial) => {
let dial = Box::new(
dial.into_future()
.map(|(d, client_addr)| (d.into(), client_addr)),
) as Box<Future<Item = _, Error = _>>;
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_dialers.unbounded_send(dial);
Ok(())
}
Err((_, multiaddr)) => Err(multiaddr),
}
}
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is then passed to `and_then`.
///
/// Contrary to `dial_to_handler`, the output of the upgrade is not given to the handler that
/// was passed at initialization.
// TODO: consider returning a future so that errors can be processed?
pub fn dial_custom_handler<Du, Df, Dfu>(
&self,
multiaddr: Multiaddr,
transport: Du,
and_then: Df,
) -> Result<(), Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
{
trace!(target: "libp2p-swarm", "Swarm dialing {} with custom handler", multiaddr);
match transport.dial(multiaddr) {
Ok(dial) => {
let dial = Box::new(dial.into_future().and_then(|(d, m)| and_then(d, m))) as Box<_>;
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_toprocess.unbounded_send(dial);
Ok(())
}
Err((_, multiaddr)) => Err(multiaddr),
}
}
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
/// was passed to `swarm`.
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
match self.transport.clone().listen_on(multiaddr) {
Ok((listener, new_addr)) => {
trace!(target: "libp2p-swarm", "Swarm listening on {}", new_addr);
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_listeners.unbounded_send(listener);
Ok(new_addr)
}
Err((_, multiaddr)) => Err(multiaddr),
}
}
}
/// Future that must be driven to completion in order for the swarm to work.
pub struct SwarmFuture<T, H, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
{
transport: T,
handler: H,
new_listeners: mpsc::UnboundedReceiver<T::Listener>,
next_incoming: T::Incoming,
listeners: FuturesUnordered<
StreamFuture<
Box<
Stream<
Item = Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>,
Error = IoError,
>,
>,
>,
>,
listeners_upgrade:
FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
dialers: FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
new_dialers:
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
}
impl<T, H, If, F> Future for SwarmFuture<T, H, F>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
H: FnMut(T::Output, Multiaddr) -> If,
If: IntoFuture<Future = F, Item = (), Error = IoError>,
F: Future<Item = (), Error = IoError>,
{
type Item = ();
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let handler = &mut self.handler;
match self.next_incoming.poll() {
Ok(Async::Ready(connec)) => {
debug!(target: "libp2p-swarm", "Swarm received new multiplexed \
incoming connection");
self.next_incoming = self.transport.clone().next_incoming();
self.listeners_upgrade.push(Box::new(connec) as Box<_>);
}
Ok(Async::NotReady) => {}
Err(err) => {
debug!(target: "libp2p-swarm", "Error in multiplexed incoming \
connection: {:?}", err);
self.next_incoming = self.transport.clone().next_incoming();
}
};
match self.new_listeners.poll() {
Ok(Async::Ready(Some(new_listener))) => {
let new_listener = Box::new(
new_listener.map(|f| Box::new(f) as Box<Future<Item = _, Error = _>>),
) as Box<Stream<Item = _, Error = _>>;
self.listeners.push(new_listener.into_future());
}
Ok(Async::Ready(None)) | Err(_) => {
// New listener sender has been closed.
}
Ok(Async::NotReady) => {}
};
match self.new_dialers.poll() {
Ok(Async::Ready(Some(new_dialer))) => {
self.dialers.push(new_dialer);
}
Ok(Async::Ready(None)) | Err(_) => {
// New dialers sender has been closed.
}
Ok(Async::NotReady) => {}
};
match self.new_toprocess.poll() {
Ok(Async::Ready(Some(new_toprocess))) => {
self.to_process.push(future::Either::B(new_toprocess));
}
Ok(Async::Ready(None)) | Err(_) => {
// New to-process sender has been closed.
}
Ok(Async::NotReady) => {}
};
match self.listeners.poll() {
Ok(Async::Ready(Some((Some(upgrade), remaining)))) => {
trace!(target: "libp2p-swarm", "Swarm received new connection on listener socket");
self.listeners_upgrade.push(upgrade);
self.listeners.push(remaining.into_future());
}
Err((err, _)) => {
warn!(target: "libp2p-swarm", "Error in listener: {:?}", err);
}
_ => {}
}
match self.listeners_upgrade.poll() {
Ok(Async::Ready(Some((output, client_addr)))) => {
debug!(
"Successfully upgraded incoming connection with {}",
client_addr
);
self.to_process.push(future::Either::A(
handler(output, client_addr).into_future(),
));
}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err);
}
_ => {}
}
match self.dialers.poll() {
Ok(Async::Ready(Some((output, addr)))) => {
trace!("Successfully upgraded dialed connection with {}", addr);
self.to_process
.push(future::Either::A(handler(output, addr).into_future()));
}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err);
}
_ => {}
}
match self.to_process.poll() {
Ok(Async::Ready(Some(()))) => {
trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to completion");
}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in processing: {:?}", err);
}
_ => {}
}
// TODO: we never return `Ok(Ready)` because there's no way to know whether
// `next_incoming()` can produce anything more in the future
Ok(Async::NotReady)
}
}

View File

@ -1,141 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::{MuxedTransport, Transport};
use upgrade::Endpoint;
/// See the `Transport::and_then` method.
#[inline]
pub fn and_then<T, C>(transport: T, upgrade: C) -> AndThen<T, C> {
AndThen { transport, upgrade }
}
/// See the `Transport::and_then` method.
#[derive(Debug, Clone)]
pub struct AndThen<T, C> {
transport: T,
upgrade: C,
}
impl<T, C, F, O> Transport for AndThen<T, C>
where
T: Transport + 'static,
C: FnOnce(T::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
F: Future<Item = O, Error = IoError> + 'static,
{
type Output = O;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let upgrade = self.upgrade;
let (listening_stream, new_addr) = match self.transport.listen_on(addr) {
Ok((l, new_addr)) => (l, new_addr),
Err((trans, addr)) => {
let builder = AndThen {
transport: trans,
upgrade: upgrade,
};
return Err((builder, addr));
}
};
// Try to negotiate the protocol.
// Note that failing to negotiate a protocol will never produce a future with an error.
// Instead the `stream` will produce `Ok(Err(...))`.
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
let stream = listening_stream.map(move |connection| {
let upgrade = upgrade.clone();
let future = connection.and_then(move |(stream, client_addr)| {
upgrade(stream, Endpoint::Listener, client_addr.clone()).map(|o| (o, client_addr))
});
Box::new(future) as Box<_>
});
Ok((Box::new(stream), new_addr))
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let upgrade = self.upgrade;
let dialed_fut = match self.transport.dial(addr.clone()) {
Ok(f) => f.into_future(),
Err((trans, addr)) => {
let builder = AndThen {
transport: trans,
upgrade: upgrade,
};
return Err((builder, addr));
}
};
let future = dialed_fut
// Try to negotiate the protocol.
.and_then(move |(connection, client_addr)| {
upgrade(connection, Endpoint::Dialer, client_addr.clone())
.map(|o| (o, client_addr))
});
Ok(Box::new(future))
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.nat_traversal(server, observed)
}
}
impl<T, C, F, O> MuxedTransport for AndThen<T, C>
where
T: MuxedTransport + 'static,
C: FnOnce(T::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
F: Future<Item = O, Error = IoError> + 'static,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
let upgrade = self.upgrade;
let future = self.transport.next_incoming().map(|future| {
// Try to negotiate the protocol.
let future = future.and_then(move |(connection, client_addr)| {
let upgrade = upgrade.clone();
upgrade(connection, Endpoint::Listener, client_addr.clone())
.map(|o| (o, client_addr))
});
Box::new(future) as Box<Future<Item = _, Error = _>>
});
Box::new(future) as Box<_>
}
}

View File

@ -1,111 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use either::{EitherListenStream, EitherListenUpgrade, EitherSocket};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::{MuxedTransport, Transport};
/// Struct returned by `or_transport()`.
#[derive(Debug, Copy, Clone)]
pub struct OrTransport<A, B>(A, B);
impl<A, B> OrTransport<A, B> {
pub fn new(a: A, b: B) -> OrTransport<A, B> {
OrTransport(a, b)
}
}
impl<A, B> Transport for OrTransport<A, B>
where
A: Transport,
B: Transport,
{
type Output = EitherSocket<A::Output, B::Output>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherListenUpgrade<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial =
EitherListenUpgrade<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (first, addr) = match self.0.listen_on(addr) {
Ok((connec, addr)) => return Ok((EitherListenStream::First(connec), addr)),
Err(err) => err,
};
match self.1.listen_on(addr) {
Ok((connec, addr)) => Ok((EitherListenStream::Second(connec), addr)),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let (first, addr) = match self.0.dial(addr) {
Ok(connec) => return Ok(EitherListenUpgrade::First(connec.into_future())),
Err(err) => err,
};
match self.1.dial(addr) {
Ok(connec) => Ok(EitherListenUpgrade::Second(connec.into_future())),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
let first = self.0.nat_traversal(server, observed);
if let Some(first) = first {
return Some(first);
}
self.1.nat_traversal(server, observed)
}
}
impl<A, B> MuxedTransport for OrTransport<A, B>
where
A: MuxedTransport,
B: MuxedTransport,
A::Incoming: 'static, // TODO: meh :-/
B::Incoming: 'static, // TODO: meh :-/
A::IncomingUpgrade: 'static, // TODO: meh :-/
B::IncomingUpgrade: 'static, // TODO: meh :-/
A::Output: 'static, // TODO: meh :-/
B::Output: 'static, // TODO: meh :-/
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade =
Box<Future<Item = (EitherSocket<A::Output, B::Output>, Multiaddr), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
let first = self.0.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherSocket::First(v), addr));
Box::new(fut) as Box<Future<Item = _, Error = _>>
});
let second = self.1.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherSocket::Second(v), addr));
Box::new(fut) as Box<Future<Item = _, Error = _>>
});
let future = first.select(second).map(|(i, _)| i).map_err(|(e, _)| e);
Box::new(future) as Box<_>
}
}

View File

@ -1,63 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::future;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::{self, Cursor};
use transport::MuxedTransport;
use transport::Transport;
/// Dummy implementation of `Transport` that just denies every single attempt.
#[derive(Debug, Copy, Clone)]
pub struct DeniedTransport;
impl Transport for DeniedTransport {
// TODO: could use `!` for associated types once stable
type Output = Cursor<Vec<u8>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = io::Error>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = io::Error>>;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = io::Error>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Err((DeniedTransport, addr))
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
Err((DeniedTransport, addr))
}
#[inline]
fn nat_traversal(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
}
impl MuxedTransport for DeniedTransport {
type Incoming = future::Empty<Self::IncomingUpgrade, io::Error>;
type IncomingUpgrade = future::Empty<(Self::Output, Multiaddr), io::Error>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
future::empty()
}
}

View File

@ -1,86 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::future;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::{MuxedTransport, Transport};
/// Dummy implementation of `MuxedTransport` that uses an inner `Transport`.
#[derive(Debug, Copy, Clone)]
pub struct DummyMuxing<T> {
inner: T,
}
impl<T> DummyMuxing<T> {
pub fn new(transport: T) -> DummyMuxing<T> {
DummyMuxing { inner: transport }
}
}
impl<T> MuxedTransport for DummyMuxing<T>
where
T: Transport,
{
type Incoming = future::Empty<Self::IncomingUpgrade, IoError>;
type IncomingUpgrade = future::Empty<(T::Output, Multiaddr), IoError>;
fn next_incoming(self) -> Self::Incoming
where
Self: Sized,
{
future::empty()
}
}
impl<T> Transport for DummyMuxing<T>
where
T: Transport,
{
type Output = T::Output;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where
Self: Sized,
{
self.inner
.listen_on(addr)
.map_err(|(inner, addr)| (DummyMuxing { inner }, addr))
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where
Self: Sized,
{
self.inner
.dial(addr)
.map_err(|(inner, addr)| (DummyMuxing { inner }, addr))
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.nat_traversal(server, observed)
}
}

View File

@ -1,108 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::{MuxedTransport, Transport};
use Endpoint;
/// See `Transport::map`.
#[derive(Debug, Copy, Clone)]
pub struct Map<T, F> {
transport: T,
map: F,
}
impl<T, F> Map<T, F> {
/// Internal function that builds a `Map`.
#[inline]
pub(crate) fn new(transport: T, map: F) -> Map<T, F> {
Map { transport, map }
}
}
impl<T, F, D> Transport for Map<T, F>
where
T: Transport + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint, Multiaddr) -> D + Clone + 'static, // TODO: 'static :-/
{
type Output = D;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let map = self.map;
match self.transport.listen_on(addr) {
Ok((stream, listen_addr)) => {
let stream = stream.map(move |future| {
let map = map.clone();
let future = future
.into_future()
.map(move |(output, addr)| (map(output, Endpoint::Listener, addr.clone()), addr));
Box::new(future) as Box<_>
});
Ok((Box::new(stream), listen_addr))
}
Err((transport, addr)) => Err((Map { transport, map }, addr)),
}
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let map = self.map;
match self.transport.dial(addr) {
Ok(future) => {
let future = future
.into_future()
.map(move |(output, addr)| (map(output, Endpoint::Dialer, addr.clone()), addr));
Ok(Box::new(future))
}
Err((transport, addr)) => Err((Map { transport, map }, addr)),
}
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.nat_traversal(server, observed)
}
}
impl<T, F, D> MuxedTransport for Map<T, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint, Multiaddr) -> D + Clone + 'static, // TODO: 'static :-/
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
fn next_incoming(self) -> Self::Incoming {
let map = self.map;
let future = self.transport.next_incoming().map(move |upgrade| {
let future = upgrade.map(move |(output, addr)| {
(map(output, Endpoint::Listener, addr.clone()), addr)
});
Box::new(future) as Box<_>
});
Box::new(future)
}
}

View File

@ -1,177 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Handles entering a connection with a peer.
//!
//! The two main elements of this module are the `Transport` and `ConnectionUpgrade` traits.
//! `Transport` is implemented on objects that allow dialing and listening. `ConnectionUpgrade` is
//! implemented on objects that make it possible to upgrade a connection (for example by adding an
//! encryption middleware to the connection).
//!
//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and
//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades
//! together in a complex chain of protocols negotiation.
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
pub mod and_then;
pub mod choice;
pub mod denied;
pub mod dummy;
pub mod map;
pub mod muxed;
pub mod upgrade;
pub use self::choice::OrTransport;
pub use self::denied::DeniedTransport;
pub use self::dummy::DummyMuxing;
pub use self::muxed::MuxedTransport;
pub use self::upgrade::UpgradedNode;
/// A transport is an object that can be used to produce connections by listening or dialing a
/// peer.
///
/// This trait is implemented on concrete transports (eg. TCP, UDP, etc.), but also on wrappers
/// around them.
///
/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other
/// > words, listening or dialing consumes the transport object. This has been designed
/// > so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly
/// > on `Foo`.
pub trait Transport {
/// The raw connection to a peer.
type Output;
/// The listener produces incoming connections.
///
/// An item should be produced whenever a connection is received at the lowest level of the
/// transport stack. The item is a `Future` that is signalled once some pre-processing has
/// taken place, and that connection has been upgraded to the wanted protocols.
type Listener: Stream<Item = Self::ListenerUpgrade, Error = IoError>;
/// After a connection has been received, we may need to do some asynchronous pre-processing
/// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we
/// want to be able to continue polling on the listener.
type ListenerUpgrade: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
/// A future which indicates that we are currently dialing to a peer.
type Dial: IntoFuture<Item = (Self::Output, Multiaddr), Error = IoError>;
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
/// to other nodes, instead of the one passed as parameter.
///
/// Returns the address back if it isn't supported.
///
/// > **Note**: The reason why we need to change the `Multiaddr` on success is to handle
/// > situations such as turning `/ip4/127.0.0.1/tcp/0` into
/// > `/ip4/127.0.0.1/tcp/<actual port>`.
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where
Self: Sized;
/// Dial to the given multi-addr.
///
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where
Self: Sized;
/// Takes a multiaddress we're listening on (`server`), and tries to convert it to an
/// externally-visible multiaddress. In order to do so, we pass an `observed` address which
/// a remote node observes for one of our dialers.
///
/// For example, if `server` is `/ip4/0.0.0.0/tcp/3000` and `observed` is
/// `/ip4/80.81.82.83/tcp/29601`, then we should return `/ip4/80.81.82.83/tcp/3000`. Each
/// implementation of `Transport` is only responsible for handling the protocols it supports.
///
/// Returns `None` if nothing can be determined. This happens if this trait implementation
/// doesn't recognize the protocols, or if `server` and `observed` are related.
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
/// Applies a function on the output of the `Transport`.
#[inline]
fn map<F, O>(self, map: F) -> map::Map<Self, F>
where
Self: Sized,
F: FnOnce(Self::Output, Endpoint, Multiaddr) -> O + Clone + 'static, // TODO: 'static :-/
{
map::Map::new(self, map)
}
/// Builds a new struct that implements `Transport` that contains both `self` and `other`.
///
/// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial`
/// return an error then `other` will be tried.
#[inline]
fn or_transport<T>(self, other: T) -> OrTransport<Self, T>
where
Self: Sized,
{
OrTransport::new(self, other)
}
/// Wraps this transport inside an upgrade. Whenever a connection that uses this transport
/// is established, it is wrapped inside the upgrade.
///
/// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio*
/// > (communication encryption), *multiplex*, but also a protocol handler.
#[inline]
fn with_upgrade<U>(self, upgrade: U) -> UpgradedNode<Self, U>
where
Self: Sized,
Self::Output: AsyncRead + AsyncWrite,
U: ConnectionUpgrade<Self::Output>,
{
UpgradedNode::new(self, upgrade)
}
/// Wraps this transport inside an upgrade. Whenever a connection that uses this transport
/// is established, it is wrapped inside the upgrade.
///
/// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio*
/// > (communication encryption), *multiplex*, but also a protocol handler.
#[inline]
fn and_then<C, F>(self, upgrade: C) -> and_then::AndThen<Self, C>
where
Self: Sized,
C: FnOnce(Self::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
F: Future<Error = IoError> + 'static,
{
and_then::and_then(self, upgrade)
}
/// Builds a dummy implementation of `MuxedTransport` that uses this transport.
///
/// The resulting object will not actually use muxing. This means that dialing the same node
/// twice will result in two different connections instead of two substreams on the same
/// connection.
#[inline]
fn with_dummy_muxing(self) -> DummyMuxing<Self>
where
Self: Sized,
{
DummyMuxing::new(self)
}
}

View File

@ -1,53 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use futures::stream;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use transport::Transport;
/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which
/// the dialed node can dial you back.
pub trait MuxedTransport: Transport {
/// Future resolving to a future that will resolve to an incoming connection.
type Incoming: Future<Item = Self::IncomingUpgrade, Error = IoError>;
/// Future resolving to an incoming connection.
type IncomingUpgrade: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
/// Returns the next incoming substream opened by a node that we dialed ourselves.
///
/// > **Note**: Doesn't produce incoming substreams coming from addresses we are listening on.
/// > This only concerns nodes that we dialed with `dial()`.
fn next_incoming(self) -> Self::Incoming
where
Self: Sized;
/// Returns a stream of incoming connections.
#[inline]
fn incoming(
self,
) -> stream::AndThen<stream::Repeat<Self, IoError>, fn(Self) -> Self::Incoming, Self::Incoming>
where
Self: Sized + Clone,
{
stream::repeat(self).and_then(|me| me.next_incoming())
}
}

View File

@ -1,243 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use connection_reuse::ConnectionReuse;
use futures::prelude::*;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use transport::{MuxedTransport, Transport};
use upgrade::{apply, ConnectionUpgrade, Endpoint};
/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received
/// connection.
///
/// See the `Transport::with_upgrade` method.
#[derive(Debug, Clone)]
pub struct UpgradedNode<T, C> {
transports: T,
upgrade: C,
}
impl<T, C> UpgradedNode<T, C> {
pub fn new(transports: T, upgrade: C) -> UpgradedNode<T, C> {
UpgradedNode {
transports,
upgrade,
}
}
}
impl<'a, T, C> UpgradedNode<T, C>
where
T: Transport + 'a,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'a,
{
/// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the
/// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`.
#[inline]
pub fn into_connection_reuse(self) -> ConnectionReuse<T, C>
where
C::Output: StreamMuxer,
{
From::from(self)
}
/// Returns a reference to the inner `Transport`.
#[inline]
pub fn transport(&self) -> &T {
&self.transports
}
/// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade
/// the connection.
///
/// Note that this does the same as `Transport::dial`, but with less restrictions on the trait
/// requirements.
#[inline]
pub fn dial(
self,
addr: Multiaddr,
) -> Result<Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, (Self, Multiaddr)>
where
C::NamesIter: Clone, // TODO: not elegant
{
let upgrade = self.upgrade;
let dialed_fut = match self.transports.dial(addr.clone()) {
Ok(f) => f.into_future(),
Err((trans, addr)) => {
let builder = UpgradedNode {
transports: trans,
upgrade: upgrade,
};
return Err((builder, addr));
}
};
let future = dialed_fut
// Try to negotiate the protocol.
.and_then(move |(connection, client_addr)| {
apply(connection, upgrade, Endpoint::Dialer, client_addr)
});
Ok(Box::new(future))
}
/// If the underlying transport is a `MuxedTransport`, then after calling `dial` we may receive
/// substreams opened by the dialed nodes.
///
/// This function returns the next incoming substream. You are strongly encouraged to call it
/// if you have a muxed transport.
pub fn next_incoming(
self,
) -> Box<
Future<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
Error = IoError,
>
+ 'a,
>
where
T: MuxedTransport,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
let upgrade = self.upgrade;
let future = self.transports.next_incoming().map(|future| {
// Try to negotiate the protocol.
let future = future.and_then(move |(connection, client_addr)| {
apply(connection, upgrade, Endpoint::Listener, client_addr)
});
Box::new(future) as Box<Future<Item = _, Error = _>>
});
Box::new(future) as Box<_>
}
/// Start listening on the multiaddr using the transport that was passed to `new`.
/// Then whenever a connection is opened, it is upgraded.
///
/// Note that this does the same as `Transport::listen_on`, but with less restrictions on the
/// trait requirements.
#[inline]
pub fn listen_on(
self,
addr: Multiaddr,
) -> Result<
(
Box<
Stream<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
Error = IoError,
>
+ 'a,
>,
Multiaddr,
),
(Self, Multiaddr),
>
where
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
let upgrade = self.upgrade;
let (listening_stream, new_addr) = match self.transports.listen_on(addr) {
Ok((l, new_addr)) => (l, new_addr),
Err((trans, addr)) => {
let builder = UpgradedNode {
transports: trans,
upgrade: upgrade,
};
return Err((builder, addr));
}
};
// Try to negotiate the protocol.
// Note that failing to negotiate a protocol will never produce a future with an error.
// Instead the `stream` will produce `Ok(Err(...))`.
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
let stream = listening_stream.map(move |connection| {
let upgrade = upgrade.clone();
let connection = connection
// Try to negotiate the protocol.
.and_then(move |(connection, client_addr)| {
apply(connection, upgrade, Endpoint::Listener, client_addr)
});
Box::new(connection) as Box<_>
});
Ok((Box::new(stream), new_addr))
}
}
impl<T, C> Transport for UpgradedNode<T, C>
where
T: Transport + 'static,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'static,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
type Output = C::Output;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
type Dial = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
self.listen_on(addr)
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
self.dial(addr)
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transports.nat_traversal(server, observed)
}
}
impl<T, C> MuxedTransport for UpgradedNode<T, C>
where
T: MuxedTransport + 'static,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output> + 'static,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
self.next_incoming()
}
}

View File

@ -1,81 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::prelude::*;
use multiaddr::Multiaddr;
use multistream_select;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Applies a connection upgrade on a socket.
///
/// Returns a `Future` that returns the outcome of the connection upgrade.
#[inline]
pub fn apply<'a, C, U>(
connection: C,
upgrade: U,
endpoint: Endpoint,
remote_addr: Multiaddr,
) -> Box<Future<Item = (U::Output, Multiaddr), Error = IoError> + 'a>
where
U: ConnectionUpgrade<C> + 'a,
U::NamesIter: Clone, // TODO: not elegant
C: AsyncRead + AsyncWrite + 'a,
{
let iter = upgrade
.protocol_names()
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
let remote_addr2 = remote_addr.clone();
debug!(target: "libp2p-swarm", "Starting protocol negotiation");
let negotiation = match endpoint {
Endpoint::Listener => multistream_select::listener_select_proto(connection, iter),
Endpoint::Dialer => multistream_select::dialer_select_proto(connection, iter),
};
let future = negotiation
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.then(move |negotiated| {
match negotiated {
Ok(_) => debug!(target: "libp2p-swarm", "Successfully negotiated \
protocol upgrade with {}", remote_addr2),
Err(ref err) => debug!(target: "libp2p-swarm", "Error while negotiated \
protocol upgrade: {:?}", err),
};
negotiated
})
.and_then(move |(upgrade_id, connection)| {
let fut = upgrade.upgrade(connection, upgrade_id, endpoint, &remote_addr);
fut.map(move |c| (c, remote_addr))
})
.into_future()
.then(|val| {
match val {
Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \
protocol"),
Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"),
}
val
});
Box::new(future)
}

View File

@ -1,162 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use either::EitherSocket;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Builds a new `ConnectionUpgrade` that chooses between `A` and `B`.
///
/// If both `A` and `B` are supported by the remote, then `A` will be chosen.
// TODO: write a test for this ^
#[inline]
pub fn or<A, B>(me: A, other: B) -> OrUpgrade<A, B> {
OrUpgrade(me, other)
}
/// See `upgrade::or`.
#[derive(Debug, Copy, Clone)]
pub struct OrUpgrade<A, B>(A, B);
impl<C, A, B> ConnectionUpgrade<C> for OrUpgrade<A, B>
where
C: AsyncRead + AsyncWrite,
A: ConnectionUpgrade<C>,
B: ConnectionUpgrade<C>,
{
type NamesIter = NamesIterChain<A::NamesIter, B::NamesIter>;
type UpgradeIdentifier = EitherUpgradeIdentifier<A::UpgradeIdentifier, B::UpgradeIdentifier>;
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
NamesIterChain {
first: self.0.protocol_names(),
second: self.1.protocol_names(),
}
}
type Output = EitherSocket<A::Output, B::Output>;
type Future = EitherConnUpgrFuture<A::Future, B::Future>;
#[inline]
fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr,
) -> Self::Future {
match id {
EitherUpgradeIdentifier::First(id) => {
EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty, remote_addr))
}
EitherUpgradeIdentifier::Second(id) => {
EitherConnUpgrFuture::Second(self.1.upgrade(socket, id, ty, remote_addr))
}
}
}
}
/// Internal struct used by the `OrUpgrade` trait.
#[derive(Debug, Copy, Clone)]
pub enum EitherUpgradeIdentifier<A, B> {
First(A),
Second(B),
}
/// Implements `Future` and redirects calls to either `First` or `Second`.
///
/// Additionally, the output will be wrapped inside a `EitherSocket`.
///
// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
// modifiers to it. This custom enum is a combination of Either and these modifiers.
#[derive(Debug, Copy, Clone)]
pub enum EitherConnUpgrFuture<A, B> {
First(A),
Second(B),
}
impl<A, B> Future for EitherConnUpgrFuture<A, B>
where
A: Future<Error = IoError>,
B: Future<Error = IoError>,
{
type Item = EitherSocket<A::Item, B::Item>;
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherConnUpgrFuture::First(ref mut a) => {
let item = try_ready!(a.poll());
Ok(Async::Ready(EitherSocket::First(item)))
}
&mut EitherConnUpgrFuture::Second(ref mut b) => {
let item = try_ready!(b.poll());
Ok(Async::Ready(EitherSocket::Second(item)))
}
}
}
}
/// Internal type used by the `OrUpgrade` struct.
///
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
/// > removed eventually.
#[derive(Debug, Copy, Clone)]
pub struct NamesIterChain<A, B> {
first: A,
second: B,
}
impl<A, B, AId, BId> Iterator for NamesIterChain<A, B>
where
A: Iterator<Item = (Bytes, AId)>,
B: Iterator<Item = (Bytes, BId)>,
{
type Item = (Bytes, EitherUpgradeIdentifier<AId, BId>);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if let Some((name, id)) = self.first.next() {
return Some((name, EitherUpgradeIdentifier::First(id)));
}
if let Some((name, id)) = self.second.next() {
return Some((name, EitherUpgradeIdentifier::Second(id)));
}
None
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let (min1, max1) = self.first.size_hint();
let (min2, max2) = self.second.size_hint();
let max = match (max1, max2) {
(Some(max1), Some(max2)) => max1.checked_add(max2),
_ => None,
};
(min1.saturating_add(min2), max)
}
}

View File

@ -1,50 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{io, iter};
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Implementation of `ConnectionUpgrade` that always fails to negotiate.
#[derive(Debug, Copy, Clone)]
pub struct DeniedConnectionUpgrade;
impl<C> ConnectionUpgrade<C> for DeniedConnectionUpgrade
where
C: AsyncRead + AsyncWrite,
{
type NamesIter = iter::Empty<(Bytes, ())>;
type UpgradeIdentifier = (); // TODO: could use `!`
type Output = (); // TODO: could use `!`
type Future = Box<Future<Item = (), Error = io::Error>>; // TODO: could use `!`
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::empty()
}
#[inline]
fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint, _: &Multiaddr) -> Self::Future {
unreachable!("the denied connection upgrade always fails to negotiate")
}
}

View File

@ -1,66 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{future, prelude::*};
use multiaddr::Multiaddr;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Applies a closure on the output of a connection upgrade.
#[inline]
pub fn map<U, F>(upgrade: U, map: F) -> Map<U, F> {
Map { upgrade, map }
}
/// Application of a closure on the output of a connection upgrade.
#[derive(Debug, Copy, Clone)]
pub struct Map<U, F> {
upgrade: U,
map: F,
}
impl<C, U, F, O> ConnectionUpgrade<C> for Map<U, F>
where
U: ConnectionUpgrade<C>,
C: AsyncRead + AsyncWrite,
F: FnOnce(U::Output) -> O,
{
type NamesIter = U::NamesIter;
type UpgradeIdentifier = U::UpgradeIdentifier;
fn protocol_names(&self) -> Self::NamesIter {
self.upgrade.protocol_names()
}
type Output = O;
type Future = future::Map<U::Future, F>;
fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr,
) -> Self::Future {
self.upgrade
.upgrade(socket, id, ty, remote_addr)
.map(self.map)
}
}

View File

@ -1,35 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
pub mod apply;
pub mod choice;
pub mod denied;
pub mod map;
pub mod plaintext;
pub mod simple;
pub mod traits;
pub use self::apply::apply;
pub use self::choice::{or, OrUpgrade};
pub use self::denied::DeniedConnectionUpgrade;
pub use self::map::map;
pub use self::plaintext::PlainTextConfig;
pub use self::simple::SimpleProtocol;
pub use self::traits::{ConnectionUpgrade, Endpoint};

View File

@ -1,54 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::future::{self, FutureResult};
use multiaddr::Multiaddr;
use std::{iter, io::Error as IoError};
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Implementation of the `ConnectionUpgrade` that negotiates the `/plaintext/1.0.0` protocol and
/// simply passes communications through without doing anything more.
///
/// > **Note**: Generally used as an alternative to `secio` if a security layer is not desirable.
// TODO: move to a separate crate?
#[derive(Debug, Copy, Clone)]
pub struct PlainTextConfig;
impl<C> ConnectionUpgrade<C> for PlainTextConfig
where
C: AsyncRead + AsyncWrite,
{
type Output = C;
type Future = FutureResult<C, IoError>;
type UpgradeIdentifier = ();
type NamesIter = iter::Once<(Bytes, ())>;
#[inline]
fn upgrade(self, i: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
future::ok(i)
}
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/plaintext/1.0.0"), ()))
}
}

View File

@ -1,84 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::future::FromErr;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{iter, io::Error as IoError, sync::Arc};
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
/// Implementation of `ConnectionUpgrade`. Convenient to use with small protocols.
#[derive(Debug)]
pub struct SimpleProtocol<F> {
name: Bytes,
// Note: we put the closure `F` in an `Arc` because Rust closures aren't automatically clonable
// yet.
upgrade: Arc<F>,
}
impl<F> SimpleProtocol<F> {
/// Builds a `SimpleProtocol`.
#[inline]
pub fn new<N>(name: N, upgrade: F) -> SimpleProtocol<F>
where
N: Into<Bytes>,
{
SimpleProtocol {
name: name.into(),
upgrade: Arc::new(upgrade),
}
}
}
impl<F> Clone for SimpleProtocol<F> {
#[inline]
fn clone(&self) -> Self {
SimpleProtocol {
name: self.name.clone(),
upgrade: self.upgrade.clone(),
}
}
}
impl<C, F, O> ConnectionUpgrade<C> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(C) -> O,
O: IntoFuture<Error = IoError>,
{
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((self.name.clone(), ()))
}
type Output = O::Item;
type Future = FromErr<O::Future, IoError>;
#[inline]
fn upgrade(self, socket: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future {
let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err()
}
}

View File

@ -1,75 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use futures::future::Future;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use tokio_io::{AsyncRead, AsyncWrite};
/// Type of connection for the upgrade.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Endpoint {
/// The socket comes from a dialer.
Dialer,
/// The socket comes from a listener.
Listener,
}
/// Implemented on structs that describe a possible upgrade to a connection between two peers.
///
/// The generic `C` is the type of the incoming connection before it is upgraded.
///
/// > **Note**: The `upgrade` method of this trait uses `self` and not `&self` or `&mut self`.
/// > This has been designed so that you would implement this trait on `&Foo` or
/// > `&mut Foo` instead of directly on `Foo`.
pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
/// Iterator returned by `protocol_names`.
type NamesIter: Iterator<Item = (Bytes, Self::UpgradeIdentifier)>;
/// Type that serves as an identifier for the protocol. This type only exists to be returned
/// by the `NamesIter` and then be passed to `upgrade`.
///
/// This is only useful on implementations that dispatch between multiple possible upgrades.
/// Any basic implementation will probably just use the `()` type.
type UpgradeIdentifier;
/// Returns the name of the protocols to advertise to the remote.
fn protocol_names(&self) -> Self::NamesIter;
/// Type of the stream that has been upgraded. Generally wraps around `C` and `Self`.
///
/// > **Note**: For upgrades that add an intermediary layer (such as `secio` or `multiplex`),
/// > this associated type must implement `AsyncRead + AsyncWrite`.
type Output;
/// Type of the future that will resolve to `Self::Output`.
type Future: Future<Item = Self::Output, Error = IoError>;
/// This method is called after protocol negotiation has been performed.
///
/// Because performing the upgrade may not be instantaneous (eg. it may require a handshake),
/// this function returns a future instead of the direct output.
fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
remote_addr: &Multiaddr,
) -> Self::Future;
}