replace Vec with FuturesUnordered (#163)

* replace Vec with FuturesUnordered

* add err log
This commit is contained in:
quake wang
2018-04-16 18:55:16 +09:00
committed by Pierre Krieger
parent 2445d9e9ee
commit 96747441fc
2 changed files with 77 additions and 99 deletions

View File

@ -42,6 +42,7 @@
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::future::{self, FutureResult, IntoFuture}; use futures::future::{self, FutureResult, IntoFuture};
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use futures::stream::FuturesUnordered;
use futures::stream::Fuse as StreamFuse; use futures::stream::Fuse as StreamFuse;
use futures::sync::mpsc; use futures::sync::mpsc;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
@ -141,7 +142,7 @@ where
let listener = ConnectionReuseListener { let listener = ConnectionReuseListener {
shared: self.shared.clone(), shared: self.shared.clone(),
listener: listener.fuse(), listener: listener.fuse(),
current_upgrades: Vec::new(), current_upgrades: FuturesUnordered::new(),
connections: Vec::new(), connections: Vec::new(),
}; };
@ -233,7 +234,7 @@ where
{ {
// The main listener. `S` is from the underlying transport. // The main listener. `S` is from the underlying transport.
listener: StreamFuse<S>, listener: StreamFuse<S>,
current_upgrades: Vec<F>, current_upgrades: FuturesUnordered<F>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>, connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
// Shared between the whole connection reuse mechanism. // Shared between the whole connection reuse mechanism.
@ -272,13 +273,10 @@ where
} }
}; };
// Check whether any upgrade (to a muxer) on an incoming connection is ready.
// We extract everything at the start, then insert back the elements that we still want at // We extract everything at the start, then insert back the elements that we still want at
// the next iteration. // the next iteration.
for n in (0..self.current_upgrades.len()).rev() { match self.current_upgrades.poll() {
let mut current_upgrade = self.current_upgrades.swap_remove(n); Ok(Async::Ready(Some((muxer, client_addr)))) => {
match current_upgrade.poll() {
Ok(Async::Ready((muxer, client_addr))) => {
let next_incoming = muxer.clone().inbound(); let next_incoming = muxer.clone().inbound();
self.connections self.connections
.push((muxer.clone(), next_incoming, client_addr.clone())); .push((muxer.clone(), next_incoming, client_addr.clone()));
@ -289,16 +287,13 @@ where
.active_connections .active_connections
.insert(client_addr, muxer); .insert(client_addr, muxer);
} }
Ok(Async::NotReady) => {
self.current_upgrades.push(current_upgrade);
}
Err(err) => { Err(err) => {
// Insert the rest of the pending upgrades, but not the current one. // Insert the rest of the pending upgrades, but not the current one.
debug!(target: "libp2p-swarm", "error while upgrading listener connection: \ debug!(target: "libp2p-swarm", "error while upgrading listener connection: \
{:?}", err); {:?}", err);
return Ok(Async::Ready(Some(future::err(err)))); return Ok(Async::Ready(Some(future::err(err))));
} }
} _ => {}
} }
// Check whether any incoming substream is ready. // Check whether any incoming substream is ready.

View File

@ -21,6 +21,7 @@
use std::fmt; use std::fmt;
use std::io::Error as IoError; use std::io::Error as IoError;
use futures::{future, Async, Future, IntoFuture, Poll, Stream}; use futures::{future, Async, Future, IntoFuture, Poll, Stream};
use futures::stream::{FuturesUnordered, StreamFuture};
use futures::sync::mpsc; use futures::sync::mpsc;
use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode}; use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode};
@ -55,11 +56,11 @@ where
handler: handler, handler: handler,
new_listeners: new_listeners_rx, new_listeners: new_listeners_rx,
next_incoming: upgraded.clone().next_incoming(), next_incoming: upgraded.clone().next_incoming(),
listeners: Vec::new(), listeners: FuturesUnordered::new(),
listeners_upgrade: Vec::new(), listeners_upgrade: FuturesUnordered::new(),
dialers: Vec::new(), dialers: FuturesUnordered::new(),
new_dialers: new_dialers_rx, new_dialers: new_dialers_rx,
to_process: Vec::new(), to_process: FuturesUnordered::new(),
new_toprocess: new_toprocess_rx, new_toprocess: new_toprocess_rx,
}; };
@ -222,7 +223,8 @@ where
next_incoming: Box< next_incoming: Box<
Future<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>, Future<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>,
>, >,
listeners: Vec< listeners: FuturesUnordered<
StreamFuture<
Box< Box<
Stream< Stream<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
@ -230,11 +232,13 @@ where
>, >,
>, >,
>, >,
listeners_upgrade: Vec<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>, >,
dialers: Vec<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>, listeners_upgrade:
FuturesUnordered<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
dialers: FuturesUnordered<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
new_dialers: new_dialers:
mpsc::UnboundedReceiver<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>, mpsc::UnboundedReceiver<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
to_process: Vec<future::Either<F, Box<Future<Item = (), Error = IoError>>>>, to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>, new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
} }
@ -270,7 +274,7 @@ where
match self.new_listeners.poll() { match self.new_listeners.poll() {
Ok(Async::Ready(Some(new_listener))) => { Ok(Async::Ready(Some(new_listener))) => {
self.listeners.push(new_listener); self.listeners.push(new_listener.into_future());
} }
Ok(Async::Ready(None)) | Err(_) => { Ok(Async::Ready(None)) | Err(_) => {
// New listener sender has been closed. // New listener sender has been closed.
@ -298,29 +302,20 @@ where
Ok(Async::NotReady) => {} Ok(Async::NotReady) => {}
}; };
for n in (0..self.listeners.len()).rev() { match self.listeners.poll() {
let mut listener = self.listeners.swap_remove(n); Ok(Async::Ready(Some((Some(upgrade), remaining)))) => {
match listener.poll() { trace!(target: "libp2p-swarm", "Swarm received new connection on listener socket");
Ok(Async::Ready(Some(upgrade))) => {
trace!(target: "libp2p-swarm", "Swarm received new connection on \
listener socket");
self.listeners.push(listener);
self.listeners_upgrade.push(upgrade); self.listeners_upgrade.push(upgrade);
self.listeners.push(remaining.into_future());
} }
Ok(Async::NotReady) => { Err((err, _)) => {
self.listeners.push(listener);
}
Ok(Async::Ready(None)) => {}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in listener: {:?}", err); warn!(target: "libp2p-swarm", "Error in listener: {:?}", err);
} }
}; _ => {}
} }
for n in (0..self.listeners_upgrade.len()).rev() { match self.listeners_upgrade.poll() {
let mut upgrade = self.listeners_upgrade.swap_remove(n); Ok(Async::Ready(Some((output, client_addr)))) => {
match upgrade.poll() {
Ok(Async::Ready((output, client_addr))) => {
debug!( debug!(
"Successfully upgraded incoming connection with {}", "Successfully upgraded incoming connection with {}",
client_addr client_addr
@ -329,44 +324,32 @@ where
handler(output, client_addr).into_future(), handler(output, client_addr).into_future(),
)); ));
} }
Ok(Async::NotReady) => {
self.listeners_upgrade.push(upgrade);
}
Err(err) => { Err(err) => {
debug!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err); warn!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err);
}
} }
_ => {}
} }
for n in (0..self.dialers.len()).rev() { match self.dialers.poll() {
let mut dialer = self.dialers.swap_remove(n); Ok(Async::Ready(Some((output, addr)))) => {
match dialer.poll() {
Ok(Async::Ready((output, addr))) => {
trace!("Successfully upgraded dialed connection with {}", addr); trace!("Successfully upgraded dialed connection with {}", addr);
self.to_process self.to_process
.push(future::Either::A(handler(output, addr).into_future())); .push(future::Either::A(handler(output, addr).into_future()));
} }
Ok(Async::NotReady) => {
self.dialers.push(dialer);
}
Err(err) => { Err(err) => {
debug!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err); warn!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err);
}
} }
_ => {}
} }
for n in (0..self.to_process.len()).rev() { match self.to_process.poll() {
let mut to_process = self.to_process.swap_remove(n); Ok(Async::Ready(Some(()))) => {
match to_process.poll() { trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to completion");
Ok(Async::Ready(())) => {
trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to \
completion");
} }
Ok(Async::NotReady) => self.to_process.push(to_process),
Err(err) => { Err(err) => {
debug!(target: "libp2p-swarm", "Error in processing: {:?}", err); warn!(target: "libp2p-swarm", "Error in processing: {:?}", err);
}
} }
_ => {}
} }
// TODO: we never return `Ok(Ready)` because there's no way to know whether // TODO: we never return `Ok(Ready)` because there's no way to know whether