From 22a7159c414f28b9d5a4ef05e5e7a93013ce1498 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 7 Mar 2018 11:51:52 +0100 Subject: [PATCH] Merge of #116 and #111 (#117) * Implement ConnectionReuse correctly * Add some tests and fixes * Remove useless boolean in active_connections * Correctly run tests * Optimize the processing * Next incoming is now in two steps * Remove log * Fix dialing a node even if we already have a connection * Add a proper PeerId to Peerstore * Turn identify into a transport layer * Expose the dialed multiaddress * Add identified nodes to the peerstore * Allow configuring the TTL of the addresses * Split identify in two modules * Some comments and tweaks * Run rustfmt * Add test and bugfix * Fix wrong address reported when dialing * Fix websocket browser code * Ignore errors in the swarm * Fix multiplex test * Fix some style concerns * Fix concerns --- libp2p-identify/src/transport.rs | 56 +++--- libp2p-swarm/Cargo.toml | 2 + libp2p-swarm/src/connection_reuse.rs | 244 ++++++++++++----------- libp2p-swarm/src/lib.rs | 1 + libp2p-swarm/src/swarm.rs | 18 +- libp2p-swarm/src/transport.rs | 65 +++++-- libp2p-swarm/tests/multiplex.rs | 277 +++++++++++++++++++++++++++ 7 files changed, 496 insertions(+), 167 deletions(-) create mode 100644 libp2p-swarm/tests/multiplex.rs diff --git a/libp2p-identify/src/transport.rs b/libp2p-identify/src/transport.rs index f485f5cc..cce1d08a 100644 --- a/libp2p-identify/src/transport.rs +++ b/libp2p-identify/src/transport.rs @@ -223,7 +223,8 @@ where PStoreRef: Deref + Clone + 'static, for<'r> &'r PStore: Peerstore, { - type Incoming = Box>; + type Incoming = Box>; + type IncomingUpgrade = Box>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -233,32 +234,37 @@ where let future = self.transport .next_incoming() - .and_then(move |(connec, client_addr)| { - // On an incoming connection, dial back the node and upgrade to the identify - // protocol. - identify_upgrade - .clone() - .dial(client_addr.clone()) - .map_err(|_| { - IoError::new(IoErrorKind::Other, "couldn't dial back incoming node") + .map(move |incoming| { + let future = incoming + .and_then(move |(connec, client_addr)| { + // On an incoming connection, dial back the node and upgrade to the identify + // protocol. + identify_upgrade + .clone() + .dial(client_addr.clone()) + .map_err(|_| { + IoError::new(IoErrorKind::Other, "couldn't dial back incoming node") + }) + .map(move |id| (id, connec)) }) - .map(move |id| (id, connec)) - }) - .and_then(move |(dial, connec)| dial.map(move |dial| (dial, connec))) - .and_then(move |(identify, connec)| { - // Add the info to the peerstore and compute the "real" address of the node (in - // the form `/p2p/...`). - let real_addr = match identify { - (IdentifyOutput::RemoteInfo { info, .. }, old_addr) => { - process_identify_info(&info, &*peerstore, old_addr, addr_ttl)? - } - _ => unreachable!( - "the identify protocol guarantees that we receive remote \ - information when we dial a node" - ), - }; + .and_then(move |(dial, connec)| dial.map(move |dial| (dial, connec))) + .and_then(move |(identify, connec)| { + // Add the info to the peerstore and compute the "real" address of the node (in + // the form `/p2p/...`). + let real_addr = match identify { + (IdentifyOutput::RemoteInfo { info, .. }, old_addr) => { + process_identify_info(&info, &*peerstore, old_addr, addr_ttl)? + } + _ => unreachable!( + "the identify protocol guarantees that we receive remote \ + information when we dial a node" + ), + }; - Ok((connec, real_addr)) + Ok((connec, real_addr)) + }); + + Box::new(future) as Box> }); Box::new(future) as Box<_> diff --git a/libp2p-swarm/Cargo.toml b/libp2p-swarm/Cargo.toml index 1439f839..3b45ef66 100644 --- a/libp2p-swarm/Cargo.toml +++ b/libp2p-swarm/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Parity Technologies "] [dependencies] bytes = "0.4" +fnv = "1.0" multiaddr = "0.2.0" multistream-select = { path = "../multistream-select" } futures = { version = "0.1", features = ["use_std"] } @@ -15,4 +16,5 @@ tokio-io = "0.1" [dev-dependencies] libp2p-ping = { path = "../libp2p-ping" } libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +multiplex = { path = "../multiplex-rs" } tokio-core = "0.1" diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 28569aa7..f0b0a977 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -32,26 +32,21 @@ //! When called on a `ConnectionReuse`, the `listen_on` method will listen on the given //! multiaddress (by using the underlying `Transport`), then will apply a `flat_map` on the //! incoming connections so that we actually listen to the incoming substreams of each connection. -//! TODO: design issue ; we need a way to handle the substreams that are opened by remotes on -//! connections opened by us //! //! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has //! already been opened earlier, and open an outgoing substream on it. If none is available, it //! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with //! us. In order to handle these new substreams you should use the `next_incoming` method of the //! `MuxedTransport` trait. -//! TODO: this raises several questions ^ -//! -//! TODO: this whole code is a dummy and should be rewritten after the design has been properly -//! figured out. +use fnv::FnvHashMap; use futures::future::{self, IntoFuture, FutureResult}; -use futures::{stream, Async, Future, Poll, Stream, task}; +use futures::{Async, Future, Poll, Stream}; use futures::stream::Fuse as StreamFuse; +use futures::sync::mpsc; use multiaddr::Multiaddr; use muxing::StreamMuxer; use parking_lot::Mutex; -use smallvec::SmallVec; use std::io::Error as IoError; use std::sync::Arc; use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode}; @@ -66,31 +61,48 @@ pub struct ConnectionReuse where T: Transport, C: ConnectionUpgrade, + C::Output: StreamMuxer, { // Underlying transport and connection upgrade for when we need to dial or listen. inner: UpgradedNode, + + // Struct shared between most of the `ConnectionReuse` infrastructure. shared: Arc>>, } -struct Shared { - // List of futures to dialed connections. - incoming: Vec>>>>>, - // Tasks to signal when an element is added to `incoming`. Only used when `incoming` is empty. - to_signal: Vec, +struct Shared where M: StreamMuxer { + // List of active muxers. + active_connections: FnvHashMap, + + // List of pending inbound substreams from dialed nodes. + // Only add to this list elements received through `add_to_next_rx`. + next_incoming: Vec<(M, M::InboundSubstream, Multiaddr)>, + + // New elements are not directly added to `next_incoming`. Instead they are sent to this + // channel. This is done so that we can wake up tasks whenever a new element is added. + add_to_next_rx: mpsc::UnboundedReceiver<(M, M::InboundSubstream, Multiaddr)>, + + // Other side of `add_to_next_rx`. + add_to_next_tx: mpsc::UnboundedSender<(M, M::InboundSubstream, Multiaddr)>, } impl From> for ConnectionReuse where T: Transport, C: ConnectionUpgrade, + C::Output: StreamMuxer, { #[inline] fn from(node: UpgradedNode) -> ConnectionReuse { + let (tx, rx) = mpsc::unbounded(); + ConnectionReuse { inner: node, shared: Arc::new(Mutex::new(Shared { - incoming: Vec::new(), - to_signal: Vec::new(), + active_connections: Default::default(), + next_incoming: Vec::new(), + add_to_next_rx: rx, + add_to_next_tx: tx, })), } } @@ -118,6 +130,7 @@ where }; let listener = ConnectionReuseListener { + shared: self.shared.clone(), listener: listener.fuse(), current_upgrades: Vec::new(), connections: Vec::new(), @@ -127,34 +140,36 @@ where } fn dial(self, addr: Multiaddr) -> Result { - let dial = match self.inner.dial(addr.clone()) { + // If we already have an active connection, use it! + if let Some(connec) = self.shared.lock().active_connections.get(&addr).map(|c| c.clone()) { + let future = connec.outbound().map(|s| (s, addr)); + return Ok(Box::new(future) as Box<_>); + } + + // TODO: handle if we're already in the middle in dialing that same node? + // TODO: try dialing again if the existing connection has dropped + + let dial = match self.inner.dial(addr) { Ok(l) => l, Err((inner, addr)) => { return Err((ConnectionReuse { inner: inner, shared: self.shared }, addr)); } }; + let shared = self.shared.clone(); let dial = dial - .map_err:: Mutex>, _>(|err| Mutex::new(Some(err))) - .shared(); - - let ingoing = dial.clone() - .map(|muxer| stream::repeat(muxer)) - .flatten_stream() - .map(move |muxer| (&*muxer).clone()); - - let mut lock = self.shared.lock(); - lock.incoming.push(Box::new(ingoing) as Box<_>); - for task in lock.to_signal.drain(..) { task.notify(); } - drop(lock); - - let future = dial - .map_err(|err| err.lock().take().expect("error can only be extracted once")) - .and_then(|dial| { - let (dial, client_addr) = (&*dial).clone(); - dial.outbound().map(|s| (s, client_addr)) + .into_future() + .and_then(move |(connec, addr)| { + // Always replace the active connection because we are the most recent. + let mut lock = shared.lock(); + lock.active_connections.insert(addr.clone(), connec.clone()); + // TODO: doesn't need locking ; the sender could be extracted + let _ = lock.add_to_next_tx + .unbounded_send((connec.clone(), connec.clone().inbound(), addr.clone())); + connec.outbound().map(|s| (s, addr)) }); - Ok(Box::new(future) as Box<_>) + + Ok(Box::new(dial) as Box<_>) } #[inline] @@ -171,29 +186,29 @@ where C::Output: StreamMuxer + Clone, C::NamesIter: Clone, // TODO: not elegant { - type Incoming = Box::Substream, Multiaddr), Error = IoError>>; + type Incoming = ConnectionReuseIncoming; + type IncomingUpgrade = future::FutureResult<(::Substream, Multiaddr), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { - let future = ConnectionReuseIncoming { shared: self.shared.clone() } - .and_then(|(out, addr)| { - out.inbound().map(|o| (o, addr)) - }); - Box::new(future) as Box<_> + ConnectionReuseIncoming { shared: self.shared.clone() } } } -/// Implementation of `Stream where S: Stream, F: Future, M: StreamMuxer, { + // The main listener. `S` is from the underlying transport. listener: StreamFuse, current_upgrades: Vec, connections: Vec<(M, ::InboundSubstream, Multiaddr)>, + + // Shared between the whole connection reuse mechanism. + shared: Arc>>, } impl Stream for ConnectionReuseListener @@ -205,125 +220,128 @@ where S: Stream, type Error = IoError; fn poll(&mut self) -> Poll, Self::Error> { + // Check for any incoming connection on the listening socket. + // Note that since `self.listener` is a `Fuse`, it's not a problem to continue polling even + // after it is finished or after it error'ed. match self.listener.poll() { Ok(Async::Ready(Some(upgrade))) => { self.current_upgrades.push(upgrade); } - Ok(Async::NotReady) => (), + Ok(Async::NotReady) => {}, Ok(Async::Ready(None)) => { - if self.connections.is_empty() { + if self.connections.is_empty() && self.current_upgrades.is_empty() { return Ok(Async::Ready(None)); } } Err(err) => { - if self.connections.is_empty() { + if self.connections.is_empty() && self.current_upgrades.is_empty() { return Err(err); } } }; - - // Most of the time, this array will contain 0 or 1 elements, but sometimes it may contain - // more and we don't want to panic if that happens. With 8 elements, we can be pretty - // confident that this is never going to spill into a `Vec`. - let mut upgrades_to_drop: SmallVec<[_; 8]> = SmallVec::new(); - let mut early_ret = None; - for (index, current_upgrade) in - self.current_upgrades.iter_mut().enumerate() - { + // Check whether any upgrade (to a muxer) on an incoming connection is ready. + // We extract everything at the start, then insert back the elements that we still want at + // the next iteration. + for n in (0 .. self.current_upgrades.len()).rev() { + let mut current_upgrade = self.current_upgrades.swap_remove(n); match current_upgrade.poll() { Ok(Async::Ready((muxer, client_addr))) => { let next_incoming = muxer.clone().inbound(); - self.connections.push((muxer, next_incoming, client_addr.clone())); - upgrades_to_drop.push(index); + self.connections.push((muxer.clone(), next_incoming, client_addr.clone())); + // We overwrite any current active connection to that multiaddr because we + // are the freshest possible connection. + self.shared.lock().active_connections.insert(client_addr, muxer); + }, + Ok(Async::NotReady) => { + self.current_upgrades.push(current_upgrade); }, - Ok(Async::NotReady) => {}, Err(err) => { - upgrades_to_drop.push(index); - early_ret = Some(Async::Ready(Some(Err(err).into_future()))); + // Insert the rest of the pending upgrades, but not the current one. + return Ok(Async::Ready(Some(future::err(err)))); }, } } - for &index in upgrades_to_drop.iter().rev() { - self.current_upgrades.swap_remove(index); - } - - if let Some(early_ret) = early_ret { - return Ok(early_ret); - } - - // We reuse `upgrades_to_drop`. - upgrades_to_drop.clear(); - let mut connections_to_drop = upgrades_to_drop; - - for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in - self.connections.iter_mut().enumerate() - { + // Check whether any incoming substream is ready. + for n in (0 .. self.connections.len()).rev() { + let (muxer, mut next_incoming, client_addr) = self.connections.swap_remove(n); match next_incoming.poll() { Ok(Async::Ready(incoming)) => { + // A new substream is ready. let mut new_next = muxer.clone().inbound(); - *next_incoming = new_next; - return Ok(Async::Ready(Some(Ok((incoming, client_addr.clone())).into_future()))); + self.connections.push((muxer, new_next, client_addr.clone())); + return Ok(Async::Ready(Some(Ok((incoming, client_addr)).into_future()))); } - Ok(Async::NotReady) => {} - Err(_) => { - connections_to_drop.push(index); + Ok(Async::NotReady) => { + self.connections.push((muxer, next_incoming, client_addr)); + } + Err(err) => { + // Insert the rest of the pending connections, but not the current one. + return Ok(Async::Ready(Some(future::err(err)))); } } } - for &index in connections_to_drop.iter().rev() { - self.connections.swap_remove(index); - } - + // Nothing is ready, return `NotReady`. Ok(Async::NotReady) } } -/// Implementation of `Future { - shared: Arc>>, +/// Implementation of `Future` that yields the next incoming substream from a dialed connection. +pub struct ConnectionReuseIncoming + where M: StreamMuxer +{ + // Shared between the whole connection reuse system. + shared: Arc>>, } -impl Future for ConnectionReuseIncoming - where O: Clone +impl Future for ConnectionReuseIncoming + where M: Clone + StreamMuxer, { - type Item = (O, Multiaddr); + type Item = future::FutureResult<(M::Substream, Multiaddr), IoError>; type Error = IoError; fn poll(&mut self) -> Poll { let mut lock = self.shared.lock(); - let mut to_remove = SmallVec::<[_; 8]>::new(); - let mut ret_value = None; - - for (offset, future) in lock.incoming.iter_mut().enumerate() { - match future.poll() { - Ok(Async::Ready(Some((value, addr)))) => { - ret_value = Some((value.clone(), addr)); - break; + // Try to get any new muxer from `add_to_next_rx`. + // We push the new muxers to a channel instead of adding them to `next_incoming`, so that + // tasks are notified when something is pushed. + loop { + match lock.add_to_next_rx.poll() { + Ok(Async::Ready(Some(elem))) => { + lock.next_incoming.push(elem); }, - Ok(Async::Ready(None)) => { - to_remove.push(offset); - }, - Ok(Async::NotReady) => {}, - Err(_) => { - to_remove.push(offset); + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) | Err(_) => { + unreachable!("the sender and receiver are both in the same struct, therefore \ + the link can never break") }, } } - for offset in to_remove.into_iter().rev() { - lock.incoming.swap_remove(offset); + // Check whether any incoming substream is ready. + for n in (0 .. lock.next_incoming.len()).rev() { + let (muxer, mut future, addr) = lock.next_incoming.swap_remove(n); + match future.poll() { + Ok(Async::Ready(value)) => { + // A substream is ready ; push back the muxer for the next time this function + // is called, then return. + let next = muxer.clone().inbound(); + lock.next_incoming.push((muxer, next, addr.clone())); + return Ok(Async::Ready(future::ok((value, addr)))); + }, + Ok(Async::NotReady) => { + lock.next_incoming.push((muxer, future, addr)); + }, + Err(_) => { + // In case of error, we just not push back the element, which drops it. + }, + } } - if let Some(ret_value) = ret_value { - Ok(Async::Ready(ret_value)) - } else { - lock.to_signal.push(task::current()); - Ok(Async::NotReady) - } + // Nothing is ready. + Ok(Async::NotReady) } } diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index 5cf46de8..1ede217a 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -203,6 +203,7 @@ //! ``` extern crate bytes; +extern crate fnv; #[macro_use] extern crate futures; extern crate multistream_select; diff --git a/libp2p-swarm/src/swarm.rs b/libp2p-swarm/src/swarm.rs index 2b3b792d..b654cba4 100644 --- a/libp2p-swarm/src/swarm.rs +++ b/libp2p-swarm/src/swarm.rs @@ -159,7 +159,7 @@ pub struct SwarmFuture upgraded: UpgradedNode, handler: H, new_listeners: mpsc::UnboundedReceiver>, Error = IoError>>>, - next_incoming: Box>, + next_incoming: Box>, Error = IoError>>, listeners: Vec>, Error = IoError>>>, listeners_upgrade: Vec>>, dialers: Vec>>, @@ -183,13 +183,15 @@ impl Future for SwarmFuture let handler = &mut self.handler; match self.next_incoming.poll() { - Ok(Async::Ready((connec, client_addr))) => { + Ok(Async::Ready(connec)) => { self.next_incoming = self.upgraded.clone().next_incoming(); - self.to_process.push(future::Either::A(handler(connec, client_addr).into_future())); + self.listeners_upgrade.push(connec); }, Ok(Async::NotReady) => {}, // TODO: may not be the best idea because we're killing the whole server - Err(err) => return Err(err), + Err(_err) => { + self.next_incoming = self.upgraded.clone().next_incoming(); + }, }; match self.new_listeners.poll() { @@ -233,7 +235,7 @@ impl Future for SwarmFuture self.listeners.push(listener); }, Ok(Async::Ready(None)) => {}, - Err(err) => return Err(err), + Err(_err) => {}, // Ignoring errors }; } @@ -246,7 +248,7 @@ impl Future for SwarmFuture Ok(Async::NotReady) => { self.listeners_upgrade.push(upgrade); }, - Err(err) => return Err(err), + Err(_err) => {}, // Ignoring errors } } @@ -259,7 +261,7 @@ impl Future for SwarmFuture Ok(Async::NotReady) => { self.dialers.push(dialer); }, - Err(err) => return Err(err), + Err(_err) => {}, // Ignoring errors } } @@ -268,7 +270,7 @@ impl Future for SwarmFuture match to_process.poll() { Ok(Async::Ready(())) => {}, Ok(Async::NotReady) => self.to_process.push(to_process), - Err(err) => return Err(err), + Err(_err) => {}, // Ignoring errors } } diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 0cdf7c03..6dac3a12 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -147,8 +147,10 @@ pub trait Transport { /// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which /// the dialed node can dial you back. pub trait MuxedTransport: Transport { + /// Future resolving to a future that will resolve to an incoming connection. + type Incoming: Future; /// Future resolving to an incoming connection. - type Incoming: Future; + type IncomingUpgrade: Future; /// Returns the next incoming substream opened by a node that we dialed ourselves. /// @@ -195,7 +197,8 @@ impl Transport for DeniedTransport { } impl MuxedTransport for DeniedTransport { - type Incoming = future::Empty<(Self::RawConn, Multiaddr), IoError>; + type Incoming = future::Empty; + type IncomingUpgrade = future::Empty<(Self::RawConn, Multiaddr), IoError>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -291,13 +294,24 @@ where B: MuxedTransport, A::Incoming: 'static, // TODO: meh :-/ B::Incoming: 'static, // TODO: meh :-/ + A::IncomingUpgrade: 'static, // TODO: meh :-/ + B::IncomingUpgrade: 'static, // TODO: meh :-/ + A::RawConn: 'static, // TODO: meh :-/ + B::RawConn: 'static, // TODO: meh :-/ { - type Incoming = Box, Multiaddr), Error = IoError>>; + type Incoming = Box>; + type IncomingUpgrade = Box, Multiaddr), Error = IoError>>; #[inline] fn next_incoming(self) -> Self::Incoming { - let first = self.0.next_incoming().map(|(out, addr)| (EitherSocket::First(out), addr)); - let second = self.1.next_incoming().map(|(out, addr)| (EitherSocket::Second(out), addr)); + let first = self.0.next_incoming().map(|out| { + let fut = out.map(move |(v, addr)| (EitherSocket::First(v), addr)); + Box::new(fut) as Box> + }); + let second = self.1.next_incoming().map(|out| { + let fut = out.map(move |(v, addr)| (EitherSocket::Second(v), addr)); + Box::new(fut) as Box> + }); let future = first.select(second) .map(|(i, _)| i) .map_err(|(e, _)| e); @@ -790,7 +804,8 @@ pub struct DummyMuxing { impl MuxedTransport for DummyMuxing where T: Transport { - type Incoming = future::Empty<(T::RawConn, Multiaddr), IoError>; + type Incoming = future::Empty; + type IncomingUpgrade = future::Empty<(T::RawConn, Multiaddr), IoError>; fn next_incoming(self) -> Self::Incoming where Self: Sized @@ -851,7 +866,9 @@ where /// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. #[inline] - pub fn into_connection_reuse(self) -> ConnectionReuse { + pub fn into_connection_reuse(self) -> ConnectionReuse + where C::Output: StreamMuxer + { From::from(self) } @@ -907,7 +924,7 @@ where /// /// This function returns the next incoming substream. You are strongly encouraged to call it /// if you have a muxed transport. - pub fn next_incoming(self) -> Box + 'a> + pub fn next_incoming(self) -> Box + 'a>, Error = IoError> + 'a> where T: MuxedTransport, C::NamesIter: Clone, // TODO: not elegant C: Clone, @@ -915,18 +932,23 @@ where let upgrade = self.upgrade; let future = self.transports.next_incoming() - // Try to negotiate the protocol. - .and_then(move |(connection, addr)| { - let iter = upgrade.protocol_names() - .map::<_, fn(_) -> _>(|(name, id)| (name, ::eq, id)); - let negotiated = multistream_select::listener_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)); - negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) - }) - .and_then(|(upgrade_id, connection, upgrade, addr)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr) - .map(|u| (u, addr)) - }); + .map(|future| { + // Try to negotiate the protocol. + let future = future + .and_then(move |(connection, addr)| { + let iter = upgrade.protocol_names() + .map::<_, fn(_) -> _>(|(name, id)| (name, ::eq, id)); + let negotiated = multistream_select::listener_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)); + negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) + }) + .and_then(move |(upgrade_id, connection, upgrade, addr)| { + let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &addr); + upg.map(|u| (u, addr)) + }); + + Box::new(future) as Box> + }); Box::new(future) as Box<_> } @@ -1028,7 +1050,8 @@ where C::NamesIter: Clone, // TODO: not elegant C: Clone, { - type Incoming = Box>; + type Incoming = Box>; + type IncomingUpgrade = Box>; #[inline] fn next_incoming(self) -> Self::Incoming { diff --git a/libp2p-swarm/tests/multiplex.rs b/libp2p-swarm/tests/multiplex.rs new file mode 100644 index 00000000..224d02ad --- /dev/null +++ b/libp2p-swarm/tests/multiplex.rs @@ -0,0 +1,277 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate bytes; +extern crate futures; +extern crate libp2p_swarm; +extern crate libp2p_tcp_transport; +extern crate multiplex; +extern crate tokio_core; +extern crate tokio_io; + +use bytes::BytesMut; +use futures::future::Future; +use futures::{Stream, Sink}; +use libp2p_swarm::{Multiaddr, Transport, StreamMuxer, MuxedTransport}; +use libp2p_tcp_transport::TcpConfig; +use tokio_core::reactor::Core; +use tokio_io::codec::length_delimited::Framed; +use std::sync::{atomic, mpsc}; +use std::thread; + +// Ensures that a transport is only ever used once for dialing. +#[derive(Debug)] +struct OnlyOnce(T, atomic::AtomicBool); +impl From for OnlyOnce { + fn from(tp: T) -> OnlyOnce { + OnlyOnce(tp, atomic::AtomicBool::new(false)) + } +} +impl Clone for OnlyOnce { + fn clone(&self) -> Self { + OnlyOnce(self.0.clone(), atomic::AtomicBool::new(self.1.load(atomic::Ordering::SeqCst))) + } +} +impl Transport for OnlyOnce { + type RawConn = T::RawConn; + type Listener = T::Listener; + type ListenerUpgrade = T::ListenerUpgrade; + type Dial = T::Dial; + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + Ok(self.0.listen_on(addr).unwrap_or_else(|_| panic!())) + } + fn dial(self, addr: Multiaddr) -> Result { + assert!(!self.1.swap(true, atomic::Ordering::SeqCst)); + Ok(self.0.dial(addr).unwrap_or_else(|_| panic!())) + } + fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option { + self.0.nat_traversal(a, b) + } +} + +#[test] +fn client_to_server_outbound() { + // A client opens a connection to a server, then an outgoing substream, then sends a message + // on that substream. + + let (tx, rx) = mpsc::channel(); + + let bg_thread = thread::spawn(move || { + let mut core = Core::new().unwrap(); + let transport = TcpConfig::new(core.handle()) + .with_upgrade(multiplex::MultiplexConfig) + .into_connection_reuse(); + + let (listener, addr) = transport + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap_or_else(|_| panic!()); + tx.send(addr).unwrap(); + + let future = listener + .into_future() + .map_err(|(err, _)| err) + .and_then(|(client, _)| client.unwrap()) + .map(|client| client.0) + .map(|client| Framed::<_, BytesMut>::new(client)) + .and_then(|client| { + client.into_future() + .map_err(|(err, _)| err) + .map(|(msg, _)| msg) + }) + .and_then(|msg| { + let msg = msg.unwrap(); + assert_eq!(msg, "hello world"); + Ok(()) + }); + + core.run(future).unwrap(); + }); + + let mut core = Core::new().unwrap(); + let transport = TcpConfig::new(core.handle()) + .with_upgrade(multiplex::MultiplexConfig); + + let future = transport.dial(rx.recv().unwrap()).unwrap() + .and_then(|client| client.0.outbound()) + .map(|server| Framed::<_, BytesMut>::new(server)) + .and_then(|server| server.send("hello world".into())) + .map(|_| ()); + + core.run(future).unwrap(); + bg_thread.join().unwrap(); +} + +#[test] +fn connection_reused_for_dialing() { + // A client dials the same multiaddress twice in a row. We check that it uses two substreams + // instead of opening two different connections. + + let (tx, rx) = mpsc::channel(); + + let bg_thread = thread::spawn(move || { + let mut core = Core::new().unwrap(); + let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + .with_upgrade(multiplex::MultiplexConfig) + .into_connection_reuse(); + + let (listener, addr) = transport + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap_or_else(|_| panic!()); + tx.send(addr).unwrap(); + + let future = listener + .into_future() + .map_err(|(err, _)| err) + .and_then(|(client, rest)| client.unwrap().map(move |c| (c.0, rest))) + .map(|(client, rest)| (Framed::<_, BytesMut>::new(client), rest)) + .and_then(|(client, rest)| { + client.into_future() + .map(|v| (v, rest)) + .map_err(|(err, _)| err) + }) + .and_then(|((msg, _), rest)| { + let msg = msg.unwrap(); + assert_eq!(msg, "hello world"); + Ok(rest) + }) + .flatten_stream() + .into_future() + .map_err(|(err, _)| err) + .and_then(|(client, _)| client.unwrap()) + .map(|client| client.0) + .map(|client| Framed::<_, BytesMut>::new(client)) + .and_then(|client| { + client.into_future() + .map_err(|(err, _)| err) + }) + .and_then(|(msg, _)| { + let msg = msg.unwrap(); + assert_eq!(msg, "second message"); + Ok(()) + }); + + core.run(future).unwrap(); + }); + + let mut core = Core::new().unwrap(); + let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + .with_upgrade(multiplex::MultiplexConfig) + .into_connection_reuse(); + + let listen_addr = rx.recv().unwrap(); + + let future = transport.clone().dial(listen_addr.clone()).unwrap_or_else(|_| panic!()) + .map(|server| Framed::<_, BytesMut>::new(server.0)) + .and_then(|server| { + server.send("hello world".into()) + }) + .and_then(|first_connec| { + transport.clone().dial(listen_addr.clone()).unwrap_or_else(|_| panic!()) + .map(|server| Framed::<_, BytesMut>::new(server.0)) + .map(|server| (first_connec, server)) + }) + .and_then(|(_first, second)| { + second.send("second message".into()) + }) + .map(|_| ()); + + core.run(future).unwrap(); + bg_thread.join().unwrap(); +} + +#[test] +fn use_opened_listen_to_dial() { + // A server waits for an incoming substream and a message on it, then opens an outgoing + // substream on that same connection, that the client has to accept. The client then sends a + // message on that new substream. + + let (tx, rx) = mpsc::channel(); + + let bg_thread = thread::spawn(move || { + let mut core = Core::new().unwrap(); + let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + .with_upgrade(multiplex::MultiplexConfig); + + let (listener, addr) = transport.clone() + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap_or_else(|_| panic!()); + tx.send(addr).unwrap(); + + let future = listener + .into_future() + .map_err(|(err, _)| err) + .and_then(|(client, _)| client.unwrap()) + .map(|client| client.0) + .and_then(|c| { + let c2 = c.clone(); + c.clone().inbound().map(move |i| (c2, i)) + }) + .map(|(muxer, client)| { + (muxer, Framed::<_, BytesMut>::new(client)) + }) + .and_then(|(muxer, client)| { + client.into_future() + .map(move |msg| (muxer, msg)) + .map_err(|(err, _)| err) + }) + .and_then(|(muxer, (msg, _))| { + let msg = msg.unwrap(); + assert_eq!(msg, "hello world"); + muxer.outbound() + }) + .map(|client| { + Framed::<_, BytesMut>::new(client) + }) + .and_then(|client| { + client.into_future() + .map_err(|(err, _)| err) + }) + .and_then(|(msg, _)| { + let msg = msg.unwrap(); + assert_eq!(msg, "second message"); + Ok(()) + }); + + core.run(future).unwrap(); + }); + + let mut core = Core::new().unwrap(); + let transport = OnlyOnce::from(TcpConfig::new(core.handle())) + .with_upgrade(multiplex::MultiplexConfig) + .into_connection_reuse(); + + let listen_addr = rx.recv().unwrap(); + + let future = transport.clone().dial(listen_addr.clone()).unwrap_or_else(|_| panic!()) + .map(|server| Framed::<_, BytesMut>::new(server.0)) + .and_then(|server| { + server.send("hello world".into()) + }) + .and_then(|first_connec| { + transport.clone().next_incoming() + .and_then(|server| server) + .map(|server| Framed::<_, BytesMut>::new(server.0)) + .map(|server| (first_connec, server)) + }) + .and_then(|(_first, second)| { + second.send("second message".into()) + }) + .map(|_| ()); + + core.run(future).unwrap(); + bg_thread.join().unwrap(); +}