diff --git a/swarm/src/connection_reuse.rs b/swarm/src/connection_reuse.rs index 5bafc09a..b3e822ee 100644 --- a/swarm/src/connection_reuse.rs +++ b/swarm/src/connection_reuse.rs @@ -42,6 +42,7 @@ use fnv::FnvHashMap; use futures::future::{self, FutureResult, IntoFuture}; use futures::{Async, Future, Poll, Stream}; +use futures::stream::FuturesUnordered; use futures::stream::Fuse as StreamFuse; use futures::sync::mpsc; use multiaddr::Multiaddr; @@ -141,7 +142,7 @@ where let listener = ConnectionReuseListener { shared: self.shared.clone(), listener: listener.fuse(), - current_upgrades: Vec::new(), + current_upgrades: FuturesUnordered::new(), connections: Vec::new(), }; @@ -233,7 +234,7 @@ where { // The main listener. `S` is from the underlying transport. listener: StreamFuse, - current_upgrades: Vec, + current_upgrades: FuturesUnordered, connections: Vec<(M, ::InboundSubstream, Multiaddr)>, // Shared between the whole connection reuse mechanism. @@ -272,33 +273,27 @@ where } }; - // Check whether any upgrade (to a muxer) on an incoming connection is ready. // We extract everything at the start, then insert back the elements that we still want at // the next iteration. - for n in (0..self.current_upgrades.len()).rev() { - let mut current_upgrade = self.current_upgrades.swap_remove(n); - match current_upgrade.poll() { - Ok(Async::Ready((muxer, client_addr))) => { - let next_incoming = muxer.clone().inbound(); - self.connections - .push((muxer.clone(), next_incoming, client_addr.clone())); - // We overwrite any current active connection to that multiaddr because we - // are the freshest possible connection. - self.shared - .lock() - .active_connections - .insert(client_addr, muxer); - } - Ok(Async::NotReady) => { - self.current_upgrades.push(current_upgrade); - } - Err(err) => { - // Insert the rest of the pending upgrades, but not the current one. - debug!(target: "libp2p-swarm", "error while upgrading listener connection: \ - {:?}", err); - return Ok(Async::Ready(Some(future::err(err)))); - } + match self.current_upgrades.poll() { + Ok(Async::Ready(Some((muxer, client_addr)))) => { + let next_incoming = muxer.clone().inbound(); + self.connections + .push((muxer.clone(), next_incoming, client_addr.clone())); + // We overwrite any current active connection to that multiaddr because we + // are the freshest possible connection. + self.shared + .lock() + .active_connections + .insert(client_addr, muxer); } + Err(err) => { + // Insert the rest of the pending upgrades, but not the current one. + debug!(target: "libp2p-swarm", "error while upgrading listener connection: \ + {:?}", err); + return Ok(Async::Ready(Some(future::err(err)))); + } + _ => {} } // Check whether any incoming substream is ready. diff --git a/swarm/src/swarm.rs b/swarm/src/swarm.rs index a8eb2b34..34962790 100644 --- a/swarm/src/swarm.rs +++ b/swarm/src/swarm.rs @@ -21,6 +21,7 @@ use std::fmt; use std::io::Error as IoError; use futures::{future, Async, Future, IntoFuture, Poll, Stream}; +use futures::stream::{FuturesUnordered, StreamFuture}; use futures::sync::mpsc; use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode}; @@ -55,11 +56,11 @@ where handler: handler, new_listeners: new_listeners_rx, next_incoming: upgraded.clone().next_incoming(), - listeners: Vec::new(), - listeners_upgrade: Vec::new(), - dialers: Vec::new(), + listeners: FuturesUnordered::new(), + listeners_upgrade: FuturesUnordered::new(), + dialers: FuturesUnordered::new(), new_dialers: new_dialers_rx, - to_process: Vec::new(), + to_process: FuturesUnordered::new(), new_toprocess: new_toprocess_rx, }; @@ -222,19 +223,22 @@ where next_incoming: Box< Future>, Error = IoError>, >, - listeners: Vec< - Box< - Stream< - Item = Box>, - Error = IoError, + listeners: FuturesUnordered< + StreamFuture< + Box< + Stream< + Item = Box>, + Error = IoError, + >, >, >, >, - listeners_upgrade: Vec>>, - dialers: Vec>>, + listeners_upgrade: + FuturesUnordered>>, + dialers: FuturesUnordered>>, new_dialers: mpsc::UnboundedReceiver>>, - to_process: Vec>>>, + to_process: FuturesUnordered>>>, new_toprocess: mpsc::UnboundedReceiver>>, } @@ -270,7 +274,7 @@ where match self.new_listeners.poll() { Ok(Async::Ready(Some(new_listener))) => { - self.listeners.push(new_listener); + self.listeners.push(new_listener.into_future()); } Ok(Async::Ready(None)) | Err(_) => { // New listener sender has been closed. @@ -298,75 +302,54 @@ where Ok(Async::NotReady) => {} }; - for n in (0..self.listeners.len()).rev() { - let mut listener = self.listeners.swap_remove(n); - match listener.poll() { - Ok(Async::Ready(Some(upgrade))) => { - trace!(target: "libp2p-swarm", "Swarm received new connection on \ - listener socket"); - self.listeners.push(listener); - self.listeners_upgrade.push(upgrade); - } - Ok(Async::NotReady) => { - self.listeners.push(listener); - } - Ok(Async::Ready(None)) => {} - Err(err) => { - warn!(target: "libp2p-swarm", "Error in listener: {:?}", err); - } - }; + match self.listeners.poll() { + Ok(Async::Ready(Some((Some(upgrade), remaining)))) => { + trace!(target: "libp2p-swarm", "Swarm received new connection on listener socket"); + self.listeners_upgrade.push(upgrade); + self.listeners.push(remaining.into_future()); + } + Err((err, _)) => { + warn!(target: "libp2p-swarm", "Error in listener: {:?}", err); + } + _ => {} } - for n in (0..self.listeners_upgrade.len()).rev() { - let mut upgrade = self.listeners_upgrade.swap_remove(n); - match upgrade.poll() { - Ok(Async::Ready((output, client_addr))) => { - debug!( - "Successfully upgraded incoming connection with {}", - client_addr - ); - self.to_process.push(future::Either::A( - handler(output, client_addr).into_future(), - )); - } - Ok(Async::NotReady) => { - self.listeners_upgrade.push(upgrade); - } - Err(err) => { - debug!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err); - } + match self.listeners_upgrade.poll() { + Ok(Async::Ready(Some((output, client_addr)))) => { + debug!( + "Successfully upgraded incoming connection with {}", + client_addr + ); + self.to_process.push(future::Either::A( + handler(output, client_addr).into_future(), + )); } + Err(err) => { + warn!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err); + } + _ => {} } - for n in (0..self.dialers.len()).rev() { - let mut dialer = self.dialers.swap_remove(n); - match dialer.poll() { - Ok(Async::Ready((output, addr))) => { - trace!("Successfully upgraded dialed connection with {}", addr); - self.to_process - .push(future::Either::A(handler(output, addr).into_future())); - } - Ok(Async::NotReady) => { - self.dialers.push(dialer); - } - Err(err) => { - debug!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err); - } + match self.dialers.poll() { + Ok(Async::Ready(Some((output, addr)))) => { + trace!("Successfully upgraded dialed connection with {}", addr); + self.to_process + .push(future::Either::A(handler(output, addr).into_future())); } + Err(err) => { + warn!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err); + } + _ => {} } - for n in (0..self.to_process.len()).rev() { - let mut to_process = self.to_process.swap_remove(n); - match to_process.poll() { - Ok(Async::Ready(())) => { - trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to \ - completion"); - } - Ok(Async::NotReady) => self.to_process.push(to_process), - Err(err) => { - debug!(target: "libp2p-swarm", "Error in processing: {:?}", err); - } + match self.to_process.poll() { + Ok(Async::Ready(Some(()))) => { + trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to completion"); } + Err(err) => { + warn!(target: "libp2p-swarm", "Error in processing: {:?}", err); + } + _ => {} } // TODO: we never return `Ok(Ready)` because there's no way to know whether