Merge of #116 and #111 (#117)

* Implement ConnectionReuse correctly

* Add some tests and fixes

* Remove useless boolean in active_connections

* Correctly run tests

* Optimize the processing

* Next incoming is now in two steps

* Remove log

* Fix dialing a node even if we already have a connection

* Add a proper PeerId to Peerstore

* Turn identify into a transport layer

* Expose the dialed multiaddress

* Add identified nodes to the peerstore

* Allow configuring the TTL of the addresses

* Split identify in two modules

* Some comments and tweaks

* Run rustfmt

* Add test and bugfix

* Fix wrong address reported when dialing

* Fix websocket browser code

* Ignore errors in the swarm

* Fix multiplex test

* Fix some style concerns

* Fix concerns
This commit is contained in:
Pierre Krieger
2018-03-07 11:51:52 +01:00
committed by GitHub
parent 39dde305b4
commit 22a7159c41
7 changed files with 496 additions and 167 deletions

View File

@ -223,7 +223,8 @@ where
PStoreRef: Deref<Target = PStore> + Clone + 'static,
for<'r> &'r PStore: Peerstore,
{
type Incoming = Box<Future<Item = (Trans::RawConn, Multiaddr), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Trans::RawConn, Multiaddr), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
@ -233,6 +234,8 @@ where
let future = self.transport
.next_incoming()
.map(move |incoming| {
let future = incoming
.and_then(move |(connec, client_addr)| {
// On an incoming connection, dial back the node and upgrade to the identify
// protocol.
@ -261,6 +264,9 @@ where
Ok((connec, real_addr))
});
Box::new(future) as Box<Future<Item = _, Error = _>>
});
Box::new(future) as Box<_>
}
}

View File

@ -5,6 +5,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
bytes = "0.4"
fnv = "1.0"
multiaddr = "0.2.0"
multistream-select = { path = "../multistream-select" }
futures = { version = "0.1", features = ["use_std"] }
@ -15,4 +16,5 @@ tokio-io = "0.1"
[dev-dependencies]
libp2p-ping = { path = "../libp2p-ping" }
libp2p-tcp-transport = { path = "../libp2p-tcp-transport" }
multiplex = { path = "../multiplex-rs" }
tokio-core = "0.1"

View File

@ -32,26 +32,21 @@
//! When called on a `ConnectionReuse`, the `listen_on` method will listen on the given
//! multiaddress (by using the underlying `Transport`), then will apply a `flat_map` on the
//! incoming connections so that we actually listen to the incoming substreams of each connection.
//! TODO: design issue ; we need a way to handle the substreams that are opened by remotes on
//! connections opened by us
//!
//! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has
//! already been opened earlier, and open an outgoing substream on it. If none is available, it
//! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with
//! us. In order to handle these new substreams you should use the `next_incoming` method of the
//! `MuxedTransport` trait.
//! TODO: this raises several questions ^
//!
//! TODO: this whole code is a dummy and should be rewritten after the design has been properly
//! figured out.
use fnv::FnvHashMap;
use futures::future::{self, IntoFuture, FutureResult};
use futures::{stream, Async, Future, Poll, Stream, task};
use futures::{Async, Future, Poll, Stream};
use futures::stream::Fuse as StreamFuse;
use futures::sync::mpsc;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use parking_lot::Mutex;
use smallvec::SmallVec;
use std::io::Error as IoError;
use std::sync::Arc;
use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode};
@ -66,31 +61,48 @@ pub struct ConnectionReuse<T, C>
where
T: Transport,
C: ConnectionUpgrade<T::RawConn>,
C::Output: StreamMuxer,
{
// Underlying transport and connection upgrade for when we need to dial or listen.
inner: UpgradedNode<T, C>,
// Struct shared between most of the `ConnectionReuse` infrastructure.
shared: Arc<Mutex<Shared<C::Output>>>,
}
struct Shared<O> {
// List of futures to dialed connections.
incoming: Vec<Box<Stream<Item = (O, Multiaddr), Error = future::SharedError<Mutex<Option<IoError>>>>>>,
// Tasks to signal when an element is added to `incoming`. Only used when `incoming` is empty.
to_signal: Vec<task::Task>,
struct Shared<M> where M: StreamMuxer {
// List of active muxers.
active_connections: FnvHashMap<Multiaddr, M>,
// List of pending inbound substreams from dialed nodes.
// Only add to this list elements received through `add_to_next_rx`.
next_incoming: Vec<(M, M::InboundSubstream, Multiaddr)>,
// New elements are not directly added to `next_incoming`. Instead they are sent to this
// channel. This is done so that we can wake up tasks whenever a new element is added.
add_to_next_rx: mpsc::UnboundedReceiver<(M, M::InboundSubstream, Multiaddr)>,
// Other side of `add_to_next_rx`.
add_to_next_tx: mpsc::UnboundedSender<(M, M::InboundSubstream, Multiaddr)>,
}
impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
where
T: Transport,
C: ConnectionUpgrade<T::RawConn>,
C::Output: StreamMuxer,
{
#[inline]
fn from(node: UpgradedNode<T, C>) -> ConnectionReuse<T, C> {
let (tx, rx) = mpsc::unbounded();
ConnectionReuse {
inner: node,
shared: Arc::new(Mutex::new(Shared {
incoming: Vec::new(),
to_signal: Vec::new(),
active_connections: Default::default(),
next_incoming: Vec::new(),
add_to_next_rx: rx,
add_to_next_tx: tx,
})),
}
}
@ -118,6 +130,7 @@ where
};
let listener = ConnectionReuseListener {
shared: self.shared.clone(),
listener: listener.fuse(),
current_upgrades: Vec::new(),
connections: Vec::new(),
@ -127,34 +140,36 @@ where
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let dial = match self.inner.dial(addr.clone()) {
// If we already have an active connection, use it!
if let Some(connec) = self.shared.lock().active_connections.get(&addr).map(|c| c.clone()) {
let future = connec.outbound().map(|s| (s, addr));
return Ok(Box::new(future) as Box<_>);
}
// TODO: handle if we're already in the middle in dialing that same node?
// TODO: try dialing again if the existing connection has dropped
let dial = match self.inner.dial(addr) {
Ok(l) => l,
Err((inner, addr)) => {
return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr));
}
};
let shared = self.shared.clone();
let dial = dial
.map_err::<fn(IoError) -> Mutex<Option<IoError>>, _>(|err| Mutex::new(Some(err)))
.shared();
let ingoing = dial.clone()
.map(|muxer| stream::repeat(muxer))
.flatten_stream()
.map(move |muxer| (&*muxer).clone());
let mut lock = self.shared.lock();
lock.incoming.push(Box::new(ingoing) as Box<_>);
for task in lock.to_signal.drain(..) { task.notify(); }
drop(lock);
let future = dial
.map_err(|err| err.lock().take().expect("error can only be extracted once"))
.and_then(|dial| {
let (dial, client_addr) = (&*dial).clone();
dial.outbound().map(|s| (s, client_addr))
.into_future()
.and_then(move |(connec, addr)| {
// Always replace the active connection because we are the most recent.
let mut lock = shared.lock();
lock.active_connections.insert(addr.clone(), connec.clone());
// TODO: doesn't need locking ; the sender could be extracted
let _ = lock.add_to_next_tx
.unbounded_send((connec.clone(), connec.clone().inbound(), addr.clone()));
connec.outbound().map(|s| (s, addr))
});
Ok(Box::new(future) as Box<_>)
Ok(Box::new(dial) as Box<_>)
}
#[inline]
@ -171,29 +186,29 @@ where
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone, // TODO: not elegant
{
type Incoming = Box<Future<Item = (<C::Output as StreamMuxer>::Substream, Multiaddr), Error = IoError>>;
type Incoming = ConnectionReuseIncoming<C::Output>;
type IncomingUpgrade = future::FutureResult<(<C::Output as StreamMuxer>::Substream, Multiaddr), IoError>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
let future = ConnectionReuseIncoming { shared: self.shared.clone() }
.and_then(|(out, addr)| {
out.inbound().map(|o| (o, addr))
});
Box::new(future) as Box<_>
ConnectionReuseIncoming { shared: self.shared.clone() }
}
}
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct.
/// Implementation of `Stream` for the connections incoming from listening on a specific address.
pub struct ConnectionReuseListener<S, F, M>
where
S: Stream<Item = F, Error = IoError>,
F: Future<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer,
{
// The main listener. `S` is from the underlying transport.
listener: StreamFuse<S>,
current_upgrades: Vec<F>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
// Shared between the whole connection reuse mechanism.
shared: Arc<Mutex<Shared<M>>>,
}
impl<S, F, M> Stream for ConnectionReuseListener<S, F, M>
@ -205,125 +220,128 @@ where S: Stream<Item = F, Error = IoError>,
type Error = IoError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Check for any incoming connection on the listening socket.
// Note that since `self.listener` is a `Fuse`, it's not a problem to continue polling even
// after it is finished or after it error'ed.
match self.listener.poll() {
Ok(Async::Ready(Some(upgrade))) => {
self.current_upgrades.push(upgrade);
}
Ok(Async::NotReady) => (),
Ok(Async::NotReady) => {},
Ok(Async::Ready(None)) => {
if self.connections.is_empty() {
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Ok(Async::Ready(None));
}
}
Err(err) => {
if self.connections.is_empty() {
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Err(err);
}
}
};
// Most of the time, this array will contain 0 or 1 elements, but sometimes it may contain
// more and we don't want to panic if that happens. With 8 elements, we can be pretty
// confident that this is never going to spill into a `Vec`.
let mut upgrades_to_drop: SmallVec<[_; 8]> = SmallVec::new();
let mut early_ret = None;
for (index, current_upgrade) in
self.current_upgrades.iter_mut().enumerate()
{
// Check whether any upgrade (to a muxer) on an incoming connection is ready.
// We extract everything at the start, then insert back the elements that we still want at
// the next iteration.
for n in (0 .. self.current_upgrades.len()).rev() {
let mut current_upgrade = self.current_upgrades.swap_remove(n);
match current_upgrade.poll() {
Ok(Async::Ready((muxer, client_addr))) => {
let next_incoming = muxer.clone().inbound();
self.connections.push((muxer, next_incoming, client_addr.clone()));
upgrades_to_drop.push(index);
self.connections.push((muxer.clone(), next_incoming, client_addr.clone()));
// We overwrite any current active connection to that multiaddr because we
// are the freshest possible connection.
self.shared.lock().active_connections.insert(client_addr, muxer);
},
Ok(Async::NotReady) => {
self.current_upgrades.push(current_upgrade);
},
Ok(Async::NotReady) => {},
Err(err) => {
upgrades_to_drop.push(index);
early_ret = Some(Async::Ready(Some(Err(err).into_future())));
// Insert the rest of the pending upgrades, but not the current one.
return Ok(Async::Ready(Some(future::err(err))));
},
}
}
for &index in upgrades_to_drop.iter().rev() {
self.current_upgrades.swap_remove(index);
}
if let Some(early_ret) = early_ret {
return Ok(early_ret);
}
// We reuse `upgrades_to_drop`.
upgrades_to_drop.clear();
let mut connections_to_drop = upgrades_to_drop;
for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in
self.connections.iter_mut().enumerate()
{
// Check whether any incoming substream is ready.
for n in (0 .. self.connections.len()).rev() {
let (muxer, mut next_incoming, client_addr) = self.connections.swap_remove(n);
match next_incoming.poll() {
Ok(Async::Ready(incoming)) => {
// A new substream is ready.
let mut new_next = muxer.clone().inbound();
*next_incoming = new_next;
return Ok(Async::Ready(Some(Ok((incoming, client_addr.clone())).into_future())));
self.connections.push((muxer, new_next, client_addr.clone()));
return Ok(Async::Ready(Some(Ok((incoming, client_addr)).into_future())));
}
Ok(Async::NotReady) => {}
Err(_) => {
connections_to_drop.push(index);
Ok(Async::NotReady) => {
self.connections.push((muxer, next_incoming, client_addr));
}
Err(err) => {
// Insert the rest of the pending connections, but not the current one.
return Ok(Async::Ready(Some(future::err(err))));
}
}
}
for &index in connections_to_drop.iter().rev() {
self.connections.swap_remove(index);
}
// Nothing is ready, return `NotReady`.
Ok(Async::NotReady)
}
}
/// Implementation of `Future<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct.
pub struct ConnectionReuseIncoming<O> {
shared: Arc<Mutex<Shared<O>>>,
/// Implementation of `Future` that yields the next incoming substream from a dialed connection.
pub struct ConnectionReuseIncoming<M>
where M: StreamMuxer
{
// Shared between the whole connection reuse system.
shared: Arc<Mutex<Shared<M>>>,
}
impl<O> Future for ConnectionReuseIncoming<O>
where O: Clone
impl<M> Future for ConnectionReuseIncoming<M>
where M: Clone + StreamMuxer,
{
type Item = (O, Multiaddr);
type Item = future::FutureResult<(M::Substream, Multiaddr), IoError>;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut lock = self.shared.lock();
let mut to_remove = SmallVec::<[_; 8]>::new();
let mut ret_value = None;
// Try to get any new muxer from `add_to_next_rx`.
// We push the new muxers to a channel instead of adding them to `next_incoming`, so that
// tasks are notified when something is pushed.
loop {
match lock.add_to_next_rx.poll() {
Ok(Async::Ready(Some(elem))) => {
lock.next_incoming.push(elem);
},
Ok(Async::NotReady) => break,
Ok(Async::Ready(None)) | Err(_) => {
unreachable!("the sender and receiver are both in the same struct, therefore \
the link can never break")
},
}
}
for (offset, future) in lock.incoming.iter_mut().enumerate() {
// Check whether any incoming substream is ready.
for n in (0 .. lock.next_incoming.len()).rev() {
let (muxer, mut future, addr) = lock.next_incoming.swap_remove(n);
match future.poll() {
Ok(Async::Ready(Some((value, addr)))) => {
ret_value = Some((value.clone(), addr));
break;
Ok(Async::Ready(value)) => {
// A substream is ready ; push back the muxer for the next time this function
// is called, then return.
let next = muxer.clone().inbound();
lock.next_incoming.push((muxer, next, addr.clone()));
return Ok(Async::Ready(future::ok((value, addr))));
},
Ok(Async::Ready(None)) => {
to_remove.push(offset);
Ok(Async::NotReady) => {
lock.next_incoming.push((muxer, future, addr));
},
Ok(Async::NotReady) => {},
Err(_) => {
to_remove.push(offset);
// In case of error, we just not push back the element, which drops it.
},
}
}
for offset in to_remove.into_iter().rev() {
lock.incoming.swap_remove(offset);
}
if let Some(ret_value) = ret_value {
Ok(Async::Ready(ret_value))
} else {
lock.to_signal.push(task::current());
// Nothing is ready.
Ok(Async::NotReady)
}
}
}

View File

@ -203,6 +203,7 @@
//! ```
extern crate bytes;
extern crate fnv;
#[macro_use]
extern crate futures;
extern crate multistream_select;

View File

@ -159,7 +159,7 @@ pub struct SwarmFuture<T, C, H, F>
upgraded: UpgradedNode<T, C>,
handler: H,
new_listeners: mpsc::UnboundedReceiver<Box<Stream<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>>>,
next_incoming: Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
next_incoming: Box<Future<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>>,
listeners: Vec<Box<Stream<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>>>,
listeners_upgrade: Vec<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
dialers: Vec<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
@ -183,13 +183,15 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
let handler = &mut self.handler;
match self.next_incoming.poll() {
Ok(Async::Ready((connec, client_addr))) => {
Ok(Async::Ready(connec)) => {
self.next_incoming = self.upgraded.clone().next_incoming();
self.to_process.push(future::Either::A(handler(connec, client_addr).into_future()));
self.listeners_upgrade.push(connec);
},
Ok(Async::NotReady) => {},
// TODO: may not be the best idea because we're killing the whole server
Err(err) => return Err(err),
Err(_err) => {
self.next_incoming = self.upgraded.clone().next_incoming();
},
};
match self.new_listeners.poll() {
@ -233,7 +235,7 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
self.listeners.push(listener);
},
Ok(Async::Ready(None)) => {},
Err(err) => return Err(err),
Err(_err) => {}, // Ignoring errors
};
}
@ -246,7 +248,7 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
Ok(Async::NotReady) => {
self.listeners_upgrade.push(upgrade);
},
Err(err) => return Err(err),
Err(_err) => {}, // Ignoring errors
}
}
@ -259,7 +261,7 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
Ok(Async::NotReady) => {
self.dialers.push(dialer);
},
Err(err) => return Err(err),
Err(_err) => {}, // Ignoring errors
}
}
@ -268,7 +270,7 @@ impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
match to_process.poll() {
Ok(Async::Ready(())) => {},
Ok(Async::NotReady) => self.to_process.push(to_process),
Err(err) => return Err(err),
Err(_err) => {}, // Ignoring errors
}
}

View File

@ -147,8 +147,10 @@ pub trait Transport {
/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which
/// the dialed node can dial you back.
pub trait MuxedTransport: Transport {
/// Future resolving to a future that will resolve to an incoming connection.
type Incoming: Future<Item = Self::IncomingUpgrade, Error = IoError>;
/// Future resolving to an incoming connection.
type Incoming: Future<Item = (Self::RawConn, Multiaddr), Error = IoError>;
type IncomingUpgrade: Future<Item = (Self::RawConn, Multiaddr), Error = IoError>;
/// Returns the next incoming substream opened by a node that we dialed ourselves.
///
@ -195,7 +197,8 @@ impl Transport for DeniedTransport {
}
impl MuxedTransport for DeniedTransport {
type Incoming = future::Empty<(Self::RawConn, Multiaddr), IoError>;
type Incoming = future::Empty<Self::IncomingUpgrade, IoError>;
type IncomingUpgrade = future::Empty<(Self::RawConn, Multiaddr), IoError>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
@ -291,13 +294,24 @@ where
B: MuxedTransport,
A::Incoming: 'static, // TODO: meh :-/
B::Incoming: 'static, // TODO: meh :-/
A::IncomingUpgrade: 'static, // TODO: meh :-/
B::IncomingUpgrade: 'static, // TODO: meh :-/
A::RawConn: 'static, // TODO: meh :-/
B::RawConn: 'static, // TODO: meh :-/
{
type Incoming = Box<Future<Item = (EitherSocket<A::RawConn, B::RawConn>, Multiaddr), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (EitherSocket<A::RawConn, B::RawConn>, Multiaddr), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
let first = self.0.next_incoming().map(|(out, addr)| (EitherSocket::First(out), addr));
let second = self.1.next_incoming().map(|(out, addr)| (EitherSocket::Second(out), addr));
let first = self.0.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherSocket::First(v), addr));
Box::new(fut) as Box<Future<Item = _, Error = _>>
});
let second = self.1.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherSocket::Second(v), addr));
Box::new(fut) as Box<Future<Item = _, Error = _>>
});
let future = first.select(second)
.map(|(i, _)| i)
.map_err(|(e, _)| e);
@ -790,7 +804,8 @@ pub struct DummyMuxing<T> {
impl<T> MuxedTransport for DummyMuxing<T>
where T: Transport
{
type Incoming = future::Empty<(T::RawConn, Multiaddr), IoError>;
type Incoming = future::Empty<Self::IncomingUpgrade, IoError>;
type IncomingUpgrade = future::Empty<(T::RawConn, Multiaddr), IoError>;
fn next_incoming(self) -> Self::Incoming
where Self: Sized
@ -851,7 +866,9 @@ where
/// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the
/// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`.
#[inline]
pub fn into_connection_reuse(self) -> ConnectionReuse<T, C> {
pub fn into_connection_reuse(self) -> ConnectionReuse<T, C>
where C::Output: StreamMuxer
{
From::from(self)
}
@ -907,7 +924,7 @@ where
///
/// This function returns the next incoming substream. You are strongly encouraged to call it
/// if you have a muxed transport.
pub fn next_incoming(self) -> Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>
pub fn next_incoming(self) -> Box<Future<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, Error = IoError> + 'a>
where T: MuxedTransport,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
@ -915,17 +932,22 @@ where
let upgrade = self.upgrade;
let future = self.transports.next_incoming()
.map(|future| {
// Try to negotiate the protocol.
let future = future
.and_then(move |(connection, addr)| {
let iter = upgrade.protocol_names()
.map::<_, fn(_) -> _>(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
let negotiated = multistream_select::listener_select_proto(connection, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err));
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr))
})
.and_then(|(upgrade_id, connection, upgrade, addr)| {
upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr)
.map(|u| (u, addr))
.and_then(move |(upgrade_id, connection, upgrade, addr)| {
let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr);
upg.map(|u| (u, addr))
});
Box::new(future) as Box<Future<Item = _, Error = _>>
});
Box::new(future) as Box<_>
@ -1028,7 +1050,8 @@ where
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
type Incoming = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>;
#[inline]
fn next_incoming(self) -> Self::Incoming {

View File

@ -0,0 +1,277 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
extern crate bytes;
extern crate futures;
extern crate libp2p_swarm;
extern crate libp2p_tcp_transport;
extern crate multiplex;
extern crate tokio_core;
extern crate tokio_io;
use bytes::BytesMut;
use futures::future::Future;
use futures::{Stream, Sink};
use libp2p_swarm::{Multiaddr, Transport, StreamMuxer, MuxedTransport};
use libp2p_tcp_transport::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited::Framed;
use std::sync::{atomic, mpsc};
use std::thread;
// Ensures that a transport is only ever used once for dialing.
#[derive(Debug)]
struct OnlyOnce<T>(T, atomic::AtomicBool);
impl<T> From<T> for OnlyOnce<T> {
fn from(tp: T) -> OnlyOnce<T> {
OnlyOnce(tp, atomic::AtomicBool::new(false))
}
}
impl<T: Clone> Clone for OnlyOnce<T> {
fn clone(&self) -> Self {
OnlyOnce(self.0.clone(), atomic::AtomicBool::new(self.1.load(atomic::Ordering::SeqCst)))
}
}
impl<T: Transport> Transport for OnlyOnce<T> {
type RawConn = T::RawConn;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Ok(self.0.listen_on(addr).unwrap_or_else(|_| panic!()))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
assert!(!self.1.swap(true, atomic::Ordering::SeqCst));
Ok(self.0.dial(addr).unwrap_or_else(|_| panic!()))
}
fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option<Multiaddr> {
self.0.nat_traversal(a, b)
}
}
#[test]
fn client_to_server_outbound() {
// A client opens a connection to a server, then an outgoing substream, then sends a message
// on that substream.
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let mut core = Core::new().unwrap();
let transport = TcpConfig::new(core.handle())
.with_upgrade(multiplex::MultiplexConfig)
.into_connection_reuse();
let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap_or_else(|_| panic!());
tx.send(addr).unwrap();
let future = listener
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap())
.map(|client| client.0)
.map(|client| Framed::<_, BytesMut>::new(client))
.and_then(|client| {
client.into_future()
.map_err(|(err, _)| err)
.map(|(msg, _)| msg)
})
.and_then(|msg| {
let msg = msg.unwrap();
assert_eq!(msg, "hello world");
Ok(())
});
core.run(future).unwrap();
});
let mut core = Core::new().unwrap();
let transport = TcpConfig::new(core.handle())
.with_upgrade(multiplex::MultiplexConfig);
let future = transport.dial(rx.recv().unwrap()).unwrap()
.and_then(|client| client.0.outbound())
.map(|server| Framed::<_, BytesMut>::new(server))
.and_then(|server| server.send("hello world".into()))
.map(|_| ());
core.run(future).unwrap();
bg_thread.join().unwrap();
}
#[test]
fn connection_reused_for_dialing() {
// A client dials the same multiaddress twice in a row. We check that it uses two substreams
// instead of opening two different connections.
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let mut core = Core::new().unwrap();
let transport = OnlyOnce::from(TcpConfig::new(core.handle()))
.with_upgrade(multiplex::MultiplexConfig)
.into_connection_reuse();
let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap_or_else(|_| panic!());
tx.send(addr).unwrap();
let future = listener
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, rest)| client.unwrap().map(move |c| (c.0, rest)))
.map(|(client, rest)| (Framed::<_, BytesMut>::new(client), rest))
.and_then(|(client, rest)| {
client.into_future()
.map(|v| (v, rest))
.map_err(|(err, _)| err)
})
.and_then(|((msg, _), rest)| {
let msg = msg.unwrap();
assert_eq!(msg, "hello world");
Ok(rest)
})
.flatten_stream()
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap())
.map(|client| client.0)
.map(|client| Framed::<_, BytesMut>::new(client))
.and_then(|client| {
client.into_future()
.map_err(|(err, _)| err)
})
.and_then(|(msg, _)| {
let msg = msg.unwrap();
assert_eq!(msg, "second message");
Ok(())
});
core.run(future).unwrap();
});
let mut core = Core::new().unwrap();
let transport = OnlyOnce::from(TcpConfig::new(core.handle()))
.with_upgrade(multiplex::MultiplexConfig)
.into_connection_reuse();
let listen_addr = rx.recv().unwrap();
let future = transport.clone().dial(listen_addr.clone()).unwrap_or_else(|_| panic!())
.map(|server| Framed::<_, BytesMut>::new(server.0))
.and_then(|server| {
server.send("hello world".into())
})
.and_then(|first_connec| {
transport.clone().dial(listen_addr.clone()).unwrap_or_else(|_| panic!())
.map(|server| Framed::<_, BytesMut>::new(server.0))
.map(|server| (first_connec, server))
})
.and_then(|(_first, second)| {
second.send("second message".into())
})
.map(|_| ());
core.run(future).unwrap();
bg_thread.join().unwrap();
}
#[test]
fn use_opened_listen_to_dial() {
// A server waits for an incoming substream and a message on it, then opens an outgoing
// substream on that same connection, that the client has to accept. The client then sends a
// message on that new substream.
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let mut core = Core::new().unwrap();
let transport = OnlyOnce::from(TcpConfig::new(core.handle()))
.with_upgrade(multiplex::MultiplexConfig);
let (listener, addr) = transport.clone()
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap_or_else(|_| panic!());
tx.send(addr).unwrap();
let future = listener
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap())
.map(|client| client.0)
.and_then(|c| {
let c2 = c.clone();
c.clone().inbound().map(move |i| (c2, i))
})
.map(|(muxer, client)| {
(muxer, Framed::<_, BytesMut>::new(client))
})
.and_then(|(muxer, client)| {
client.into_future()
.map(move |msg| (muxer, msg))
.map_err(|(err, _)| err)
})
.and_then(|(muxer, (msg, _))| {
let msg = msg.unwrap();
assert_eq!(msg, "hello world");
muxer.outbound()
})
.map(|client| {
Framed::<_, BytesMut>::new(client)
})
.and_then(|client| {
client.into_future()
.map_err(|(err, _)| err)
})
.and_then(|(msg, _)| {
let msg = msg.unwrap();
assert_eq!(msg, "second message");
Ok(())
});
core.run(future).unwrap();
});
let mut core = Core::new().unwrap();
let transport = OnlyOnce::from(TcpConfig::new(core.handle()))
.with_upgrade(multiplex::MultiplexConfig)
.into_connection_reuse();
let listen_addr = rx.recv().unwrap();
let future = transport.clone().dial(listen_addr.clone()).unwrap_or_else(|_| panic!())
.map(|server| Framed::<_, BytesMut>::new(server.0))
.and_then(|server| {
server.send("hello world".into())
})
.and_then(|first_connec| {
transport.clone().next_incoming()
.and_then(|server| server)
.map(|server| Framed::<_, BytesMut>::new(server.0))
.map(|server| (first_connec, server))
})
.and_then(|(_first, second)| {
second.send("second message".into())
})
.map(|_| ());
core.run(future).unwrap();
bg_thread.join().unwrap();
}