mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-25 15:51:34 +00:00
Swarm rework (#182)
* Rename Transport::RawConn to Output * Remove AsyncRead + AsyncWrite bound on Transport::Output * UpgradedNode now always implements Transport * Add and tweak modifiers for Transport and ConnectionUpgrade * Secio upgrade now returns the pubkey in its output * Add upgrade::apply * Add Transport::and_then * Rework the swarm * Rustfmt * Fix concerns
This commit is contained in:
@ -42,14 +42,15 @@
|
||||
use fnv::FnvHashMap;
|
||||
use futures::future::{self, Either, FutureResult, IntoFuture};
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::stream::Fuse as StreamFuse;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::sync::mpsc;
|
||||
use multiaddr::Multiaddr;
|
||||
use muxing::StreamMuxer;
|
||||
use parking_lot::Mutex;
|
||||
use std::io::{self, Error as IoError};
|
||||
use std::sync::Arc;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use transport::{MuxedTransport, Transport, UpgradedNode};
|
||||
use upgrade::ConnectionUpgrade;
|
||||
|
||||
@ -62,7 +63,8 @@ use upgrade::ConnectionUpgrade;
|
||||
pub struct ConnectionReuse<T, C>
|
||||
where
|
||||
T: Transport,
|
||||
C: ConnectionUpgrade<T::RawConn>,
|
||||
T::Output: AsyncRead + AsyncWrite,
|
||||
C: ConnectionUpgrade<T::Output>,
|
||||
C::Output: StreamMuxer,
|
||||
{
|
||||
// Underlying transport and connection upgrade for when we need to dial or listen.
|
||||
@ -94,7 +96,8 @@ where
|
||||
impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
|
||||
where
|
||||
T: Transport,
|
||||
C: ConnectionUpgrade<T::RawConn>,
|
||||
T::Output: AsyncRead + AsyncWrite,
|
||||
C: ConnectionUpgrade<T::Output>,
|
||||
C::Output: StreamMuxer,
|
||||
{
|
||||
#[inline]
|
||||
@ -115,16 +118,17 @@ where
|
||||
|
||||
impl<T, C> Transport for ConnectionReuse<T, C>
|
||||
where
|
||||
T: Transport + 'static, // TODO: 'static :(
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :(
|
||||
T: Transport + 'static, // TODO: 'static :(
|
||||
T::Output: AsyncRead + AsyncWrite,
|
||||
C: ConnectionUpgrade<T::Output> + 'static, // TODO: 'static :(
|
||||
C: Clone,
|
||||
C::Output: StreamMuxer + Clone,
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
{
|
||||
type RawConn = <C::Output as StreamMuxer>::Substream;
|
||||
type Output = <C::Output as StreamMuxer>::Substream;
|
||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||
type ListenerUpgrade = FutureResult<(Self::RawConn, Multiaddr), IoError>;
|
||||
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = IoError>>;
|
||||
type ListenerUpgrade = FutureResult<(Self::Output, Multiaddr), IoError>;
|
||||
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
|
||||
@ -215,8 +219,9 @@ where
|
||||
|
||||
impl<T, C> MuxedTransport for ConnectionReuse<T, C>
|
||||
where
|
||||
T: Transport + 'static, // TODO: 'static :(
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :(
|
||||
T: Transport + 'static, // TODO: 'static :(
|
||||
T::Output: AsyncRead + AsyncWrite,
|
||||
C: ConnectionUpgrade<T::Output> + 'static, // TODO: 'static :(
|
||||
C: Clone,
|
||||
C::Output: StreamMuxer + Clone,
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
|
@ -188,11 +188,12 @@
|
||||
//! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
|
||||
//! .with_dummy_muxing();
|
||||
//!
|
||||
//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| {
|
||||
//! pinger.ping().map_err(|_| panic!())
|
||||
//! .select(service).map_err(|_| panic!())
|
||||
//! .map(|_| ())
|
||||
//! });
|
||||
//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport.with_upgrade(Ping),
|
||||
//! |(mut pinger, service), client_addr| {
|
||||
//! pinger.ping().map_err(|_| panic!())
|
||||
//! .select(service).map_err(|_| panic!())
|
||||
//! .map(|_| ())
|
||||
//! });
|
||||
//!
|
||||
//! // The `swarm_controller` can then be used to do some operations.
|
||||
//! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());
|
||||
|
@ -18,13 +18,12 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use std::fmt;
|
||||
use std::io::Error as IoError;
|
||||
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
|
||||
use futures::stream::{FuturesUnordered, StreamFuture};
|
||||
use futures::sync::mpsc;
|
||||
use transport::UpgradedNode;
|
||||
use {ConnectionUpgrade, Multiaddr, MuxedTransport};
|
||||
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
|
||||
use std::fmt;
|
||||
use std::io::Error as IoError;
|
||||
use {Multiaddr, MuxedTransport, Transport};
|
||||
|
||||
/// Creates a swarm.
|
||||
///
|
||||
@ -34,29 +33,24 @@ use {ConnectionUpgrade, Multiaddr, MuxedTransport};
|
||||
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
|
||||
/// control, and the `Future` must be driven to completion in order for things to work.
|
||||
///
|
||||
pub fn swarm<T, C, H, F>(
|
||||
pub fn swarm<T, H, F>(
|
||||
transport: T,
|
||||
upgrade: C,
|
||||
handler: H,
|
||||
) -> (SwarmController<T, C>, SwarmFuture<T, C, H, F::Future>)
|
||||
) -> (SwarmController<T>, SwarmFuture<T, H, F::Future>)
|
||||
where
|
||||
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
|
||||
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
H: FnMut(C::Output, Multiaddr) -> F,
|
||||
H: FnMut(T::Output, Multiaddr) -> F,
|
||||
F: IntoFuture<Item = (), Error = IoError>,
|
||||
{
|
||||
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
|
||||
let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded();
|
||||
let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded();
|
||||
|
||||
let upgraded = transport.clone().with_upgrade(upgrade);
|
||||
|
||||
let future = SwarmFuture {
|
||||
upgraded: upgraded.clone(),
|
||||
transport: transport.clone(),
|
||||
handler: handler,
|
||||
new_listeners: new_listeners_rx,
|
||||
next_incoming: upgraded.clone().next_incoming(),
|
||||
next_incoming: transport.clone().next_incoming(),
|
||||
listeners: FuturesUnordered::new(),
|
||||
listeners_upgrade: FuturesUnordered::new(),
|
||||
dialers: FuturesUnordered::new(),
|
||||
@ -67,7 +61,6 @@ where
|
||||
|
||||
let controller = SwarmController {
|
||||
transport: transport,
|
||||
upgraded: upgraded,
|
||||
new_listeners: new_listeners_tx,
|
||||
new_dialers: new_dialers_tx,
|
||||
new_toprocess: new_toprocess_tx,
|
||||
@ -77,46 +70,34 @@ where
|
||||
}
|
||||
|
||||
/// Allows control of what the swarm is doing.
|
||||
pub struct SwarmController<T, C>
|
||||
pub struct SwarmController<T>
|
||||
where
|
||||
T: MuxedTransport + 'static, // TODO: 'static :-/
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
|
||||
T: MuxedTransport + 'static, // TODO: 'static :-/
|
||||
{
|
||||
transport: T,
|
||||
upgraded: UpgradedNode<T, C>,
|
||||
new_listeners: mpsc::UnboundedSender<
|
||||
Box<
|
||||
Stream<
|
||||
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
|
||||
Error = IoError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
|
||||
new_listeners: mpsc::UnboundedSender<T::Listener>,
|
||||
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
|
||||
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
|
||||
}
|
||||
|
||||
impl<T, C> fmt::Debug for SwarmController<T, C>
|
||||
impl<T> fmt::Debug for SwarmController<T>
|
||||
where
|
||||
T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/
|
||||
C: fmt::Debug + ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
|
||||
{
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_tuple("SwarmController")
|
||||
.field(&self.upgraded)
|
||||
.field(&self.transport)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, C> Clone for SwarmController<T, C>
|
||||
impl<T> Clone for SwarmController<T>
|
||||
where
|
||||
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static + Clone, // TODO: 'static :-/
|
||||
{
|
||||
fn clone(&self) -> SwarmController<T, C> {
|
||||
fn clone(&self) -> SwarmController<T> {
|
||||
SwarmController {
|
||||
transport: self.transport.clone(),
|
||||
upgraded: self.upgraded.clone(),
|
||||
new_listeners: self.new_listeners.clone(),
|
||||
new_dialers: self.new_dialers.clone(),
|
||||
new_toprocess: self.new_toprocess.clone(),
|
||||
@ -124,31 +105,27 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, C> SwarmController<T, C>
|
||||
impl<T> SwarmController<T>
|
||||
where
|
||||
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
|
||||
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
{
|
||||
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
|
||||
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
|
||||
/// calling `swarm`.
|
||||
// TODO: consider returning a future so that errors can be processed?
|
||||
pub fn dial_to_handler<Du>(&self, multiaddr: Multiaddr, upgrade: Du) -> Result<(), Multiaddr>
|
||||
pub fn dial_to_handler<Du>(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr>
|
||||
where
|
||||
Du: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
|
||||
Du::Output: Into<C::Output>,
|
||||
Du: Transport + 'static, // TODO: 'static :-/
|
||||
Du::Output: Into<T::Output>,
|
||||
{
|
||||
trace!(target: "libp2p-swarm", "Swarm dialing {}", multiaddr);
|
||||
|
||||
match self.transport
|
||||
.clone()
|
||||
.with_upgrade(upgrade)
|
||||
.dial(multiaddr.clone())
|
||||
{
|
||||
match transport.dial(multiaddr.clone()) {
|
||||
Ok(dial) => {
|
||||
let dial = Box::new(dial.map(|(d, client_addr)| (d.into(), client_addr)))
|
||||
as Box<Future<Item = _, Error = _>>;
|
||||
let dial = Box::new(
|
||||
dial.into_future()
|
||||
.map(|(d, client_addr)| (d.into(), client_addr)),
|
||||
) as Box<Future<Item = _, Error = _>>;
|
||||
// Ignoring errors if the receiver has been closed, because in that situation
|
||||
// nothing is going to be processed anyway.
|
||||
let _ = self.new_dialers.unbounded_send(dial);
|
||||
@ -167,19 +144,19 @@ where
|
||||
pub fn dial_custom_handler<Du, Df, Dfu>(
|
||||
&self,
|
||||
multiaddr: Multiaddr,
|
||||
upgrade: Du,
|
||||
transport: Du,
|
||||
and_then: Df,
|
||||
) -> Result<(), Multiaddr>
|
||||
where
|
||||
Du: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
|
||||
Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/
|
||||
Du: Transport + 'static, // TODO: 'static :-/
|
||||
Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/
|
||||
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
|
||||
{
|
||||
trace!(target: "libp2p-swarm", "Swarm dialing {} with custom handler", multiaddr);
|
||||
|
||||
match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) {
|
||||
match transport.dial(multiaddr) {
|
||||
Ok(dial) => {
|
||||
let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>;
|
||||
let dial = Box::new(dial.into_future().and_then(|(d, m)| and_then(d, m))) as Box<_>;
|
||||
// Ignoring errors if the receiver has been closed, because in that situation
|
||||
// nothing is going to be processed anyway.
|
||||
let _ = self.new_toprocess.unbounded_send(dial);
|
||||
@ -192,7 +169,7 @@ where
|
||||
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
|
||||
/// was passed to `swarm`.
|
||||
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
|
||||
match self.upgraded.clone().listen_on(multiaddr) {
|
||||
match self.transport.clone().listen_on(multiaddr) {
|
||||
Ok((listener, new_addr)) => {
|
||||
trace!(target: "libp2p-swarm", "Swarm listening on {}", new_addr);
|
||||
// Ignoring errors if the receiver has been closed, because in that situation
|
||||
@ -206,49 +183,37 @@ where
|
||||
}
|
||||
|
||||
/// Future that must be driven to completion in order for the swarm to work.
|
||||
pub struct SwarmFuture<T, C, H, F>
|
||||
pub struct SwarmFuture<T, H, F>
|
||||
where
|
||||
T: MuxedTransport + 'static, // TODO: 'static :-/
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
|
||||
T: MuxedTransport + 'static, // TODO: 'static :-/
|
||||
{
|
||||
upgraded: UpgradedNode<T, C>,
|
||||
transport: T,
|
||||
handler: H,
|
||||
new_listeners: mpsc::UnboundedReceiver<
|
||||
Box<
|
||||
Stream<
|
||||
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
|
||||
Error = IoError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
next_incoming: Box<
|
||||
Future<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>,
|
||||
>,
|
||||
new_listeners: mpsc::UnboundedReceiver<T::Listener>,
|
||||
next_incoming: T::Incoming,
|
||||
listeners: FuturesUnordered<
|
||||
StreamFuture<
|
||||
Box<
|
||||
Stream<
|
||||
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
|
||||
Item = Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>,
|
||||
Error = IoError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
listeners_upgrade:
|
||||
FuturesUnordered<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
|
||||
dialers: FuturesUnordered<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
|
||||
FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
|
||||
dialers: FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
|
||||
new_dialers:
|
||||
mpsc::UnboundedReceiver<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
|
||||
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
|
||||
to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
|
||||
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
|
||||
}
|
||||
|
||||
impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
|
||||
impl<T, H, If, F> Future for SwarmFuture<T, H, F>
|
||||
where
|
||||
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
|
||||
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
H: FnMut(C::Output, Multiaddr) -> If,
|
||||
H: FnMut(T::Output, Multiaddr) -> If,
|
||||
If: IntoFuture<Future = F, Item = (), Error = IoError>,
|
||||
F: Future<Item = (), Error = IoError>,
|
||||
{
|
||||
@ -262,19 +227,22 @@ where
|
||||
Ok(Async::Ready(connec)) => {
|
||||
debug!(target: "libp2p-swarm", "Swarm received new multiplexed \
|
||||
incoming connection");
|
||||
self.next_incoming = self.upgraded.clone().next_incoming();
|
||||
self.listeners_upgrade.push(connec);
|
||||
self.next_incoming = self.transport.clone().next_incoming();
|
||||
self.listeners_upgrade.push(Box::new(connec) as Box<_>);
|
||||
}
|
||||
Ok(Async::NotReady) => {}
|
||||
Err(err) => {
|
||||
debug!(target: "libp2p-swarm", "Error in multiplexed incoming \
|
||||
connection: {:?}", err);
|
||||
self.next_incoming = self.upgraded.clone().next_incoming();
|
||||
self.next_incoming = self.transport.clone().next_incoming();
|
||||
}
|
||||
};
|
||||
|
||||
match self.new_listeners.poll() {
|
||||
Ok(Async::Ready(Some(new_listener))) => {
|
||||
let new_listener = Box::new(
|
||||
new_listener.map(|f| Box::new(f) as Box<Future<Item = _, Error = _>>),
|
||||
) as Box<Stream<Item = _, Error = _>>;
|
||||
self.listeners.push(new_listener.into_future());
|
||||
}
|
||||
Ok(Async::Ready(None)) | Err(_) => {
|
||||
|
141
swarm/src/transport/and_then.rs
Normal file
141
swarm/src/transport/and_then.rs
Normal file
@ -0,0 +1,141 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use futures::prelude::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::io::Error as IoError;
|
||||
use transport::{MuxedTransport, Transport};
|
||||
use upgrade::Endpoint;
|
||||
|
||||
/// See the `Transport::and_then` method.
|
||||
#[inline]
|
||||
pub fn and_then<T, C>(transport: T, upgrade: C) -> AndThen<T, C> {
|
||||
AndThen { transport, upgrade }
|
||||
}
|
||||
|
||||
/// See the `Transport::and_then` method.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AndThen<T, C> {
|
||||
transport: T,
|
||||
upgrade: C,
|
||||
}
|
||||
|
||||
impl<T, C, F, O> Transport for AndThen<T, C>
|
||||
where
|
||||
T: Transport + 'static,
|
||||
C: FnOnce(T::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
|
||||
F: Future<Item = O, Error = IoError> + 'static,
|
||||
{
|
||||
type Output = O;
|
||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||
type ListenerUpgrade = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
|
||||
type Dial = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
|
||||
|
||||
#[inline]
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||
let upgrade = self.upgrade;
|
||||
|
||||
let (listening_stream, new_addr) = match self.transport.listen_on(addr) {
|
||||
Ok((l, new_addr)) => (l, new_addr),
|
||||
Err((trans, addr)) => {
|
||||
let builder = AndThen {
|
||||
transport: trans,
|
||||
upgrade: upgrade,
|
||||
};
|
||||
|
||||
return Err((builder, addr));
|
||||
}
|
||||
};
|
||||
|
||||
// Try to negotiate the protocol.
|
||||
// Note that failing to negotiate a protocol will never produce a future with an error.
|
||||
// Instead the `stream` will produce `Ok(Err(...))`.
|
||||
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
|
||||
let stream = listening_stream.map(move |connection| {
|
||||
let upgrade = upgrade.clone();
|
||||
let future = connection.and_then(move |(stream, client_addr)| {
|
||||
upgrade(stream, Endpoint::Listener, client_addr.clone()).map(|o| (o, client_addr))
|
||||
});
|
||||
|
||||
Box::new(future) as Box<_>
|
||||
});
|
||||
|
||||
Ok((Box::new(stream), new_addr))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||
let upgrade = self.upgrade;
|
||||
|
||||
let dialed_fut = match self.transport.dial(addr.clone()) {
|
||||
Ok(f) => f.into_future(),
|
||||
Err((trans, addr)) => {
|
||||
let builder = AndThen {
|
||||
transport: trans,
|
||||
upgrade: upgrade,
|
||||
};
|
||||
|
||||
return Err((builder, addr));
|
||||
}
|
||||
};
|
||||
|
||||
let future = dialed_fut
|
||||
// Try to negotiate the protocol.
|
||||
.and_then(move |(connection, client_addr)| {
|
||||
upgrade(connection, Endpoint::Dialer, client_addr.clone())
|
||||
.map(|o| (o, client_addr))
|
||||
});
|
||||
|
||||
Ok(Box::new(future))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
||||
self.transport.nat_traversal(server, observed)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, C, F, O> MuxedTransport for AndThen<T, C>
|
||||
where
|
||||
T: MuxedTransport + 'static,
|
||||
C: FnOnce(T::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
|
||||
F: Future<Item = O, Error = IoError> + 'static,
|
||||
{
|
||||
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
|
||||
type IncomingUpgrade = Box<Future<Item = (O, Multiaddr), Error = IoError>>;
|
||||
|
||||
#[inline]
|
||||
fn next_incoming(self) -> Self::Incoming {
|
||||
let upgrade = self.upgrade;
|
||||
|
||||
let future = self.transport.next_incoming().map(|future| {
|
||||
// Try to negotiate the protocol.
|
||||
let future = future.and_then(move |(connection, client_addr)| {
|
||||
let upgrade = upgrade.clone();
|
||||
upgrade(connection, Endpoint::Listener, client_addr.clone())
|
||||
.map(|o| (o, client_addr))
|
||||
});
|
||||
|
||||
Box::new(future) as Box<Future<Item = _, Error = _>>
|
||||
});
|
||||
|
||||
Box::new(future) as Box<_>
|
||||
}
|
||||
}
|
@ -39,7 +39,7 @@ where
|
||||
A: Transport,
|
||||
B: Transport,
|
||||
{
|
||||
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
|
||||
type Output = EitherSocket<A::Output, B::Output>;
|
||||
type Listener = EitherListenStream<A::Listener, B::Listener>;
|
||||
type ListenerUpgrade = EitherListenUpgrade<A::ListenerUpgrade, B::ListenerUpgrade>;
|
||||
type Dial =
|
||||
@ -88,12 +88,12 @@ where
|
||||
B::Incoming: 'static, // TODO: meh :-/
|
||||
A::IncomingUpgrade: 'static, // TODO: meh :-/
|
||||
B::IncomingUpgrade: 'static, // TODO: meh :-/
|
||||
A::RawConn: 'static, // TODO: meh :-/
|
||||
B::RawConn: 'static, // TODO: meh :-/
|
||||
A::Output: 'static, // TODO: meh :-/
|
||||
B::Output: 'static, // TODO: meh :-/
|
||||
{
|
||||
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
|
||||
type IncomingUpgrade =
|
||||
Box<Future<Item = (EitherSocket<A::RawConn, B::RawConn>, Multiaddr), Error = IoError>>;
|
||||
Box<Future<Item = (EitherSocket<A::Output, B::Output>, Multiaddr), Error = IoError>>;
|
||||
|
||||
#[inline]
|
||||
fn next_incoming(self) -> Self::Incoming {
|
||||
|
@ -22,8 +22,8 @@ use futures::future;
|
||||
use futures::prelude::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::io::{self, Cursor};
|
||||
use transport::Transport;
|
||||
use transport::MuxedTransport;
|
||||
use transport::Transport;
|
||||
|
||||
/// Dummy implementation of `Transport` that just denies every single attempt.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
@ -31,10 +31,10 @@ pub struct DeniedTransport;
|
||||
|
||||
impl Transport for DeniedTransport {
|
||||
// TODO: could use `!` for associated types once stable
|
||||
type RawConn = Cursor<Vec<u8>>;
|
||||
type Output = Cursor<Vec<u8>>;
|
||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = io::Error>>;
|
||||
type ListenerUpgrade = Box<Future<Item = (Self::RawConn, Multiaddr), Error = io::Error>>;
|
||||
type Dial = Box<Future<Item = (Self::RawConn, Multiaddr), Error = io::Error>>;
|
||||
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = io::Error>>;
|
||||
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = io::Error>>;
|
||||
|
||||
#[inline]
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||
@ -54,7 +54,7 @@ impl Transport for DeniedTransport {
|
||||
|
||||
impl MuxedTransport for DeniedTransport {
|
||||
type Incoming = future::Empty<Self::IncomingUpgrade, io::Error>;
|
||||
type IncomingUpgrade = future::Empty<(Self::RawConn, Multiaddr), io::Error>;
|
||||
type IncomingUpgrade = future::Empty<(Self::Output, Multiaddr), io::Error>;
|
||||
|
||||
#[inline]
|
||||
fn next_incoming(self) -> Self::Incoming {
|
||||
|
@ -40,7 +40,7 @@ where
|
||||
T: Transport,
|
||||
{
|
||||
type Incoming = future::Empty<Self::IncomingUpgrade, IoError>;
|
||||
type IncomingUpgrade = future::Empty<(T::RawConn, Multiaddr), IoError>;
|
||||
type IncomingUpgrade = future::Empty<(T::Output, Multiaddr), IoError>;
|
||||
|
||||
fn next_incoming(self) -> Self::Incoming
|
||||
where
|
||||
@ -54,7 +54,7 @@ impl<T> Transport for DummyMuxing<T>
|
||||
where
|
||||
T: Transport,
|
||||
{
|
||||
type RawConn = T::RawConn;
|
||||
type Output = T::Output;
|
||||
type Listener = T::Listener;
|
||||
type ListenerUpgrade = T::ListenerUpgrade;
|
||||
type Dial = T::Dial;
|
||||
|
108
swarm/src/transport/map.rs
Normal file
108
swarm/src/transport/map.rs
Normal file
@ -0,0 +1,108 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use futures::prelude::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::io::Error as IoError;
|
||||
use transport::{MuxedTransport, Transport};
|
||||
use Endpoint;
|
||||
|
||||
/// See `Transport::map`.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct Map<T, F> {
|
||||
transport: T,
|
||||
map: F,
|
||||
}
|
||||
|
||||
impl<T, F> Map<T, F> {
|
||||
/// Internal function that builds a `Map`.
|
||||
#[inline]
|
||||
pub(crate) fn new(transport: T, map: F) -> Map<T, F> {
|
||||
Map { transport, map }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, D> Transport for Map<T, F>
|
||||
where
|
||||
T: Transport + 'static, // TODO: 'static :-/
|
||||
F: FnOnce(T::Output, Endpoint, Multiaddr) -> D + Clone + 'static, // TODO: 'static :-/
|
||||
{
|
||||
type Output = D;
|
||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||
type ListenerUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
|
||||
type Dial = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||
let map = self.map;
|
||||
|
||||
match self.transport.listen_on(addr) {
|
||||
Ok((stream, listen_addr)) => {
|
||||
let stream = stream.map(move |future| {
|
||||
let map = map.clone();
|
||||
let future = future
|
||||
.into_future()
|
||||
.map(move |(output, addr)| (map(output, Endpoint::Listener, addr.clone()), addr));
|
||||
Box::new(future) as Box<_>
|
||||
});
|
||||
Ok((Box::new(stream), listen_addr))
|
||||
}
|
||||
Err((transport, addr)) => Err((Map { transport, map }, addr)),
|
||||
}
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||
let map = self.map;
|
||||
|
||||
match self.transport.dial(addr) {
|
||||
Ok(future) => {
|
||||
let future = future
|
||||
.into_future()
|
||||
.map(move |(output, addr)| (map(output, Endpoint::Dialer, addr.clone()), addr));
|
||||
Ok(Box::new(future))
|
||||
}
|
||||
Err((transport, addr)) => Err((Map { transport, map }, addr)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
||||
self.transport.nat_traversal(server, observed)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, F, D> MuxedTransport for Map<T, F>
|
||||
where
|
||||
T: MuxedTransport + 'static, // TODO: 'static :-/
|
||||
F: FnOnce(T::Output, Endpoint, Multiaddr) -> D + Clone + 'static, // TODO: 'static :-/
|
||||
{
|
||||
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
|
||||
type IncomingUpgrade = Box<Future<Item = (Self::Output, Multiaddr), Error = IoError>>;
|
||||
|
||||
fn next_incoming(self) -> Self::Incoming {
|
||||
let map = self.map;
|
||||
let future = self.transport.next_incoming().map(move |upgrade| {
|
||||
let future = upgrade.map(move |(output, addr)| {
|
||||
(map(output, Endpoint::Listener, addr.clone()), addr)
|
||||
});
|
||||
Box::new(future) as Box<_>
|
||||
});
|
||||
Box::new(future)
|
||||
}
|
||||
}
|
@ -33,11 +33,13 @@ use futures::prelude::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::io::Error as IoError;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use upgrade::ConnectionUpgrade;
|
||||
use upgrade::{ConnectionUpgrade, Endpoint};
|
||||
|
||||
pub mod and_then;
|
||||
pub mod choice;
|
||||
pub mod denied;
|
||||
pub mod dummy;
|
||||
pub mod map;
|
||||
pub mod muxed;
|
||||
pub mod upgrade;
|
||||
|
||||
@ -59,7 +61,7 @@ pub use self::upgrade::UpgradedNode;
|
||||
/// > on `Foo`.
|
||||
pub trait Transport {
|
||||
/// The raw connection to a peer.
|
||||
type RawConn: AsyncRead + AsyncWrite;
|
||||
type Output;
|
||||
|
||||
/// The listener produces incoming connections.
|
||||
///
|
||||
@ -71,10 +73,10 @@ pub trait Transport {
|
||||
/// After a connection has been received, we may need to do some asynchronous pre-processing
|
||||
/// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we
|
||||
/// want to be able to continue polling on the listener.
|
||||
type ListenerUpgrade: Future<Item = (Self::RawConn, Multiaddr), Error = IoError>;
|
||||
type ListenerUpgrade: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
|
||||
|
||||
/// A future which indicates that we are currently dialing to a peer.
|
||||
type Dial: IntoFuture<Item = (Self::RawConn, Multiaddr), Error = IoError>;
|
||||
type Dial: IntoFuture<Item = (Self::Output, Multiaddr), Error = IoError>;
|
||||
|
||||
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
|
||||
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
|
||||
@ -108,6 +110,16 @@ pub trait Transport {
|
||||
/// doesn't recognize the protocols, or if `server` and `observed` are related.
|
||||
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
|
||||
|
||||
/// Applies a function on the output of the `Transport`.
|
||||
#[inline]
|
||||
fn map<F, O>(self, map: F) -> map::Map<Self, F>
|
||||
where
|
||||
Self: Sized,
|
||||
F: FnOnce(Self::Output, Endpoint, Multiaddr) -> O + Clone + 'static, // TODO: 'static :-/
|
||||
{
|
||||
map::Map::new(self, map)
|
||||
}
|
||||
|
||||
/// Builds a new struct that implements `Transport` that contains both `self` and `other`.
|
||||
///
|
||||
/// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial`
|
||||
@ -129,11 +141,27 @@ pub trait Transport {
|
||||
fn with_upgrade<U>(self, upgrade: U) -> UpgradedNode<Self, U>
|
||||
where
|
||||
Self: Sized,
|
||||
U: ConnectionUpgrade<Self::RawConn>,
|
||||
Self::Output: AsyncRead + AsyncWrite,
|
||||
U: ConnectionUpgrade<Self::Output>,
|
||||
{
|
||||
UpgradedNode::new(self, upgrade)
|
||||
}
|
||||
|
||||
/// Wraps this transport inside an upgrade. Whenever a connection that uses this transport
|
||||
/// is established, it is wrapped inside the upgrade.
|
||||
///
|
||||
/// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio*
|
||||
/// > (communication encryption), *multiplex*, but also a protocol handler.
|
||||
#[inline]
|
||||
fn and_then<C, F>(self, upgrade: C) -> and_then::AndThen<Self, C>
|
||||
where
|
||||
Self: Sized,
|
||||
C: FnOnce(Self::Output, Endpoint, Multiaddr) -> F + Clone + 'static,
|
||||
F: Future<Error = IoError> + 'static,
|
||||
{
|
||||
and_then::and_then(self, upgrade)
|
||||
}
|
||||
|
||||
/// Builds a dummy implementation of `MuxedTransport` that uses this transport.
|
||||
///
|
||||
/// The resulting object will not actually use muxing. This means that dialing the same node
|
||||
|
@ -30,7 +30,7 @@ pub trait MuxedTransport: Transport {
|
||||
/// Future resolving to a future that will resolve to an incoming connection.
|
||||
type Incoming: Future<Item = Self::IncomingUpgrade, Error = IoError>;
|
||||
/// Future resolving to an incoming connection.
|
||||
type IncomingUpgrade: Future<Item = (Self::RawConn, Multiaddr), Error = IoError>;
|
||||
type IncomingUpgrade: Future<Item = (Self::Output, Multiaddr), Error = IoError>;
|
||||
|
||||
/// Returns the next incoming substream opened by a node that we dialed ourselves.
|
||||
///
|
||||
|
@ -18,16 +18,14 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use bytes::Bytes;
|
||||
use connection_reuse::ConnectionReuse;
|
||||
use futures::prelude::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use multistream_select;
|
||||
use muxing::StreamMuxer;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use std::io::Error as IoError;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use transport::{MuxedTransport, Transport};
|
||||
use upgrade::{ConnectionUpgrade, Endpoint};
|
||||
use upgrade::{apply, ConnectionUpgrade, Endpoint};
|
||||
|
||||
/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received
|
||||
/// connection.
|
||||
@ -51,7 +49,8 @@ impl<T, C> UpgradedNode<T, C> {
|
||||
impl<'a, T, C> UpgradedNode<T, C>
|
||||
where
|
||||
T: Transport + 'a,
|
||||
C: ConnectionUpgrade<T::RawConn> + 'a,
|
||||
T::Output: AsyncRead + AsyncWrite,
|
||||
C: ConnectionUpgrade<T::Output> + 'a,
|
||||
{
|
||||
/// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the
|
||||
/// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`.
|
||||
@ -79,6 +78,8 @@ where
|
||||
self,
|
||||
addr: Multiaddr,
|
||||
) -> Result<Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, (Self, Multiaddr)>
|
||||
where
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
{
|
||||
let upgrade = self.upgrade;
|
||||
|
||||
@ -97,39 +98,7 @@ where
|
||||
let future = dialed_fut
|
||||
// Try to negotiate the protocol.
|
||||
.and_then(move |(connection, client_addr)| {
|
||||
let iter = upgrade.protocol_names()
|
||||
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
|
||||
debug!(target: "libp2p-swarm", "Starting protocol negotiation (dialer)");
|
||||
let negotiated = multistream_select::dialer_select_proto(connection, iter)
|
||||
.map_err(|err| IoError::new(IoErrorKind::Other, err));
|
||||
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr))
|
||||
})
|
||||
.then(|negotiated| {
|
||||
match negotiated {
|
||||
Ok((_, _, _, ref client_addr)) => {
|
||||
debug!(target: "libp2p-swarm", "Successfully negotiated protocol \
|
||||
upgrade with {}", client_addr)
|
||||
},
|
||||
Err(ref err) => {
|
||||
debug!(target: "libp2p-swarm", "Error while negotiated protocol \
|
||||
upgrade: {:?}", err)
|
||||
},
|
||||
};
|
||||
negotiated
|
||||
})
|
||||
.and_then(move |(upgrade_id, connection, upgrade, client_addr)| {
|
||||
let f = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &client_addr);
|
||||
debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}",
|
||||
client_addr);
|
||||
f.map(|v| (v, client_addr))
|
||||
})
|
||||
.then(|val| {
|
||||
match val {
|
||||
Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \
|
||||
protocol"),
|
||||
Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"),
|
||||
}
|
||||
val
|
||||
apply(connection, upgrade, Endpoint::Dialer, client_addr)
|
||||
});
|
||||
|
||||
Ok(Box::new(future))
|
||||
@ -144,9 +113,9 @@ where
|
||||
self,
|
||||
) -> Box<
|
||||
Future<
|
||||
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
|
||||
Error = IoError,
|
||||
>
|
||||
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
|
||||
Error = IoError,
|
||||
>
|
||||
+ 'a,
|
||||
>
|
||||
where
|
||||
@ -158,44 +127,9 @@ where
|
||||
|
||||
let future = self.transports.next_incoming().map(|future| {
|
||||
// Try to negotiate the protocol.
|
||||
let future = future
|
||||
.and_then(move |(connection, addr)| {
|
||||
let iter = upgrade
|
||||
.protocol_names()
|
||||
.map::<_, fn(_) -> _>(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
|
||||
debug!(target: "libp2p-swarm", "Starting protocol negotiation (incoming)");
|
||||
let negotiated = multistream_select::listener_select_proto(connection, iter)
|
||||
.map_err(|err| IoError::new(IoErrorKind::Other, err));
|
||||
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
|
||||
})
|
||||
.then(|negotiated| {
|
||||
match negotiated {
|
||||
Ok((_, _, _, ref client_addr)) => {
|
||||
debug!(target: "libp2p-swarm", "Successfully negotiated protocol \
|
||||
upgrade with {}", client_addr)
|
||||
}
|
||||
Err(ref err) => {
|
||||
debug!(target: "libp2p-swarm", "Error while negotiated protocol \
|
||||
upgrade: {:?}", err)
|
||||
}
|
||||
};
|
||||
negotiated
|
||||
})
|
||||
.and_then(move |(upgrade_id, connection, upgrade, addr)| {
|
||||
let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Listener, &addr);
|
||||
debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}",
|
||||
addr);
|
||||
upg.map(|u| (u, addr))
|
||||
})
|
||||
.then(|val| {
|
||||
match val {
|
||||
Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \
|
||||
protocol"),
|
||||
Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated \
|
||||
protocol"),
|
||||
}
|
||||
val
|
||||
});
|
||||
let future = future.and_then(move |(connection, client_addr)| {
|
||||
apply(connection, upgrade, Endpoint::Listener, client_addr)
|
||||
});
|
||||
|
||||
Box::new(future) as Box<Future<Item = _, Error = _>>
|
||||
});
|
||||
@ -216,9 +150,9 @@ where
|
||||
(
|
||||
Box<
|
||||
Stream<
|
||||
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
|
||||
Error = IoError,
|
||||
>
|
||||
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
|
||||
Error = IoError,
|
||||
>
|
||||
+ 'a,
|
||||
>,
|
||||
Multiaddr,
|
||||
@ -250,38 +184,10 @@ where
|
||||
let stream = listening_stream.map(move |connection| {
|
||||
let upgrade = upgrade.clone();
|
||||
let connection = connection
|
||||
// Try to negotiate the protocol
|
||||
.and_then(move |(connection, remote_addr)| {
|
||||
let iter = upgrade.protocol_names()
|
||||
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
|
||||
let remote_addr2 = remote_addr.clone();
|
||||
debug!(target: "libp2p-swarm", "Starting protocol negotiation (listener)");
|
||||
multistream_select::listener_select_proto(connection, iter)
|
||||
.map_err(|err| IoError::new(IoErrorKind::Other, err))
|
||||
.then(move |negotiated| {
|
||||
match negotiated {
|
||||
Ok(_) => {
|
||||
debug!(target: "libp2p-swarm", "Successfully negotiated \
|
||||
protocol upgrade with {}", remote_addr2)
|
||||
},
|
||||
Err(ref err) => {
|
||||
debug!(target: "libp2p-swarm", "Error while negotiated \
|
||||
protocol upgrade: {:?}", err)
|
||||
},
|
||||
};
|
||||
negotiated
|
||||
})
|
||||
.and_then(move |(upgrade_id, connection)| {
|
||||
let fut = upgrade.upgrade(
|
||||
connection,
|
||||
upgrade_id,
|
||||
Endpoint::Listener,
|
||||
&remote_addr,
|
||||
);
|
||||
fut.map(move |c| (c, remote_addr))
|
||||
})
|
||||
.into_future()
|
||||
});
|
||||
// Try to negotiate the protocol.
|
||||
.and_then(move |(connection, client_addr)| {
|
||||
apply(connection, upgrade, Endpoint::Listener, client_addr)
|
||||
});
|
||||
|
||||
Box::new(connection) as Box<_>
|
||||
});
|
||||
@ -293,12 +199,12 @@ where
|
||||
impl<T, C> Transport for UpgradedNode<T, C>
|
||||
where
|
||||
T: Transport + 'static,
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static,
|
||||
C::Output: AsyncRead + AsyncWrite,
|
||||
T::Output: AsyncRead + AsyncWrite,
|
||||
C: ConnectionUpgrade<T::Output> + 'static,
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
C: Clone,
|
||||
{
|
||||
type RawConn = C::Output;
|
||||
type Output = C::Output;
|
||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
|
||||
type ListenerUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
|
||||
type Dial = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
|
||||
@ -322,8 +228,8 @@ where
|
||||
impl<T, C> MuxedTransport for UpgradedNode<T, C>
|
||||
where
|
||||
T: MuxedTransport + 'static,
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static,
|
||||
C::Output: AsyncRead + AsyncWrite,
|
||||
T::Output: AsyncRead + AsyncWrite,
|
||||
C: ConnectionUpgrade<T::Output> + 'static,
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
C: Clone,
|
||||
{
|
||||
|
81
swarm/src/upgrade/apply.rs
Normal file
81
swarm/src/upgrade/apply.rs
Normal file
@ -0,0 +1,81 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::prelude::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use multistream_select;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use upgrade::{ConnectionUpgrade, Endpoint};
|
||||
|
||||
/// Applies a connection upgrade on a socket.
|
||||
///
|
||||
/// Returns a `Future` that returns the outcome of the connection upgrade.
|
||||
#[inline]
|
||||
pub fn apply<'a, C, U>(
|
||||
connection: C,
|
||||
upgrade: U,
|
||||
endpoint: Endpoint,
|
||||
remote_addr: Multiaddr,
|
||||
) -> Box<Future<Item = (U::Output, Multiaddr), Error = IoError> + 'a>
|
||||
where
|
||||
U: ConnectionUpgrade<C> + 'a,
|
||||
U::NamesIter: Clone, // TODO: not elegant
|
||||
C: AsyncRead + AsyncWrite + 'a,
|
||||
{
|
||||
let iter = upgrade
|
||||
.protocol_names()
|
||||
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
|
||||
let remote_addr2 = remote_addr.clone();
|
||||
debug!(target: "libp2p-swarm", "Starting protocol negotiation");
|
||||
|
||||
let negotiation = match endpoint {
|
||||
Endpoint::Listener => multistream_select::listener_select_proto(connection, iter),
|
||||
Endpoint::Dialer => multistream_select::dialer_select_proto(connection, iter),
|
||||
};
|
||||
|
||||
let future = negotiation
|
||||
.map_err(|err| IoError::new(IoErrorKind::Other, err))
|
||||
.then(move |negotiated| {
|
||||
match negotiated {
|
||||
Ok(_) => debug!(target: "libp2p-swarm", "Successfully negotiated \
|
||||
protocol upgrade with {}", remote_addr2),
|
||||
Err(ref err) => debug!(target: "libp2p-swarm", "Error while negotiated \
|
||||
protocol upgrade: {:?}", err),
|
||||
};
|
||||
negotiated
|
||||
})
|
||||
.and_then(move |(upgrade_id, connection)| {
|
||||
let fut = upgrade.upgrade(connection, upgrade_id, endpoint, &remote_addr);
|
||||
fut.map(move |c| (c, remote_addr))
|
||||
})
|
||||
.into_future()
|
||||
.then(|val| {
|
||||
match val {
|
||||
Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \
|
||||
protocol"),
|
||||
Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"),
|
||||
}
|
||||
val
|
||||
});
|
||||
|
||||
Box::new(future)
|
||||
}
|
@ -26,16 +26,19 @@ use std::io::Error as IoError;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use upgrade::{ConnectionUpgrade, Endpoint};
|
||||
|
||||
/// See `transport::Transport::or_upgrade()`.
|
||||
/// Builds a new `ConnectionUpgrade` that chooses between `A` and `B`.
|
||||
///
|
||||
/// If both `A` and `B` are supported by the remote, then `A` will be chosen.
|
||||
// TODO: write a test for this ^
|
||||
#[inline]
|
||||
pub fn or<A, B>(me: A, other: B) -> OrUpgrade<A, B> {
|
||||
OrUpgrade(me, other)
|
||||
}
|
||||
|
||||
/// See `upgrade::or`.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct OrUpgrade<A, B>(A, B);
|
||||
|
||||
impl<A, B> OrUpgrade<A, B> {
|
||||
pub fn new(a: A, b: B) -> OrUpgrade<A, B> {
|
||||
OrUpgrade(a, b)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C, A, B> ConnectionUpgrade<C> for OrUpgrade<A, B>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite,
|
||||
|
@ -19,11 +19,11 @@
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use bytes::Bytes;
|
||||
use upgrade::{ConnectionUpgrade, Endpoint};
|
||||
use futures::prelude::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::{io, iter};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use upgrade::{ConnectionUpgrade, Endpoint};
|
||||
|
||||
/// Implementation of `ConnectionUpgrade` that always fails to negotiate.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
|
66
swarm/src/upgrade/map.rs
Normal file
66
swarm/src/upgrade/map.rs
Normal file
@ -0,0 +1,66 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use futures::{future, prelude::*};
|
||||
use multiaddr::Multiaddr;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use upgrade::{ConnectionUpgrade, Endpoint};
|
||||
|
||||
/// Applies a closure on the output of a connection upgrade.
|
||||
#[inline]
|
||||
pub fn map<U, F>(upgrade: U, map: F) -> Map<U, F> {
|
||||
Map { upgrade, map }
|
||||
}
|
||||
|
||||
/// Application of a closure on the output of a connection upgrade.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct Map<U, F> {
|
||||
upgrade: U,
|
||||
map: F,
|
||||
}
|
||||
|
||||
impl<C, U, F, O> ConnectionUpgrade<C> for Map<U, F>
|
||||
where
|
||||
U: ConnectionUpgrade<C>,
|
||||
C: AsyncRead + AsyncWrite,
|
||||
F: FnOnce(U::Output) -> O,
|
||||
{
|
||||
type NamesIter = U::NamesIter;
|
||||
type UpgradeIdentifier = U::UpgradeIdentifier;
|
||||
|
||||
fn protocol_names(&self) -> Self::NamesIter {
|
||||
self.upgrade.protocol_names()
|
||||
}
|
||||
|
||||
type Output = O;
|
||||
type Future = future::Map<U::Future, F>;
|
||||
|
||||
fn upgrade(
|
||||
self,
|
||||
socket: C,
|
||||
id: Self::UpgradeIdentifier,
|
||||
ty: Endpoint,
|
||||
remote_addr: &Multiaddr,
|
||||
) -> Self::Future {
|
||||
self.upgrade
|
||||
.upgrade(socket, id, ty, remote_addr)
|
||||
.map(self.map)
|
||||
}
|
||||
}
|
@ -18,14 +18,18 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
pub mod denied;
|
||||
pub mod traits;
|
||||
pub mod apply;
|
||||
pub mod choice;
|
||||
pub mod denied;
|
||||
pub mod map;
|
||||
pub mod plaintext;
|
||||
pub mod simple;
|
||||
pub mod traits;
|
||||
|
||||
pub use self::choice::OrUpgrade;
|
||||
pub use self::apply::apply;
|
||||
pub use self::choice::{or, OrUpgrade};
|
||||
pub use self::denied::DeniedConnectionUpgrade;
|
||||
pub use self::map::map;
|
||||
pub use self::plaintext::PlainTextConfig;
|
||||
pub use self::simple::SimpleProtocol;
|
||||
pub use self::traits::{ConnectionUpgrade, Endpoint, UpgradeExt};
|
||||
pub use self::traits::{ConnectionUpgrade, Endpoint};
|
||||
|
@ -23,7 +23,6 @@ use futures::future::Future;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::io::Error as IoError;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use upgrade::choice::OrUpgrade;
|
||||
|
||||
/// Type of connection for the upgrade.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
@ -74,19 +73,3 @@ pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
|
||||
remote_addr: &Multiaddr,
|
||||
) -> Self::Future;
|
||||
}
|
||||
|
||||
/// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything.
|
||||
pub trait UpgradeExt {
|
||||
/// Builds a struct that will choose an upgrade between `self` and `other`, depending on what
|
||||
/// the remote supports.
|
||||
fn or_upgrade<T>(self, other: T) -> OrUpgrade<Self, T>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
impl<T> UpgradeExt for T {
|
||||
#[inline]
|
||||
fn or_upgrade<U>(self, other: U) -> OrUpgrade<Self, U> {
|
||||
OrUpgrade::new(self, other)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user