diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index 7a009231..67bf9469 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -278,39 +278,42 @@ where // Check for any incoming connection on the listening socket. // Note that since `self.listener` is a `Fuse`, it's not a problem to continue polling even // after it is finished or after it error'ed. - match self.listener.poll() { - Ok(Async::Ready(Some(upgrade))) => { - self.current_upgrades.push(upgrade); - } - Ok(Async::NotReady) => {} - Ok(Async::Ready(None)) => { - debug!("listener has been closed"); - if self.connections.is_empty() && self.current_upgrades.is_empty() { - return Ok(Async::Ready(None)); + loop { + match self.listener.poll() { + Ok(Async::Ready(Some(upgrade))) => { + self.current_upgrades.push(upgrade); + } + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) => { + debug!("listener has been closed"); + if self.connections.is_empty() && self.current_upgrades.is_empty() { + return Ok(Async::Ready(None)); + } + break + } + Err(err) => { + debug!("error while polling listener: {:?}", err); + if self.connections.is_empty() && self.current_upgrades.is_empty() { + return Err(err); + } + break } } - Err(err) => { - debug!("error while polling listener: {:?}", err); - if self.connections.is_empty() && self.current_upgrades.is_empty() { - return Err(err); - } - } - }; + } - // We extract everything at the start, then insert back the elements that we still want at - // the next iteration. - match self.current_upgrades.poll() { - Ok(Async::Ready(Some((muxer, client_addr)))) => { - let next_incoming = muxer.clone().inbound(); - self.connections - .push((muxer.clone(), next_incoming, client_addr.clone())); + loop { + match self.current_upgrades.poll() { + Ok(Async::Ready(Some((muxer, client_addr)))) => { + let next_incoming = muxer.clone().inbound(); + self.connections + .push((muxer.clone(), next_incoming, client_addr.clone())); + } + Err(err) => { + debug!("error while upgrading listener connection: {:?}", err); + return Ok(Async::Ready(Some(future::err(err)))); + } + _ => break, } - Err(err) => { - // Insert the rest of the pending upgrades, but not the current one. - debug!("error while upgrading listener connection: {:?}", err); - return Ok(Async::Ready(Some(future::err(err)))); - } - _ => {} } // Check whether any incoming substream is ready. diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 25cff9bc..3929ee01 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -205,46 +205,55 @@ where debug!("Error in multiplexed incoming connection: {:?}", err); self.next_incoming = self.transport.clone().next_incoming(); } - }; + } - match self.new_listeners.poll() { - Ok(Async::Ready(Some(new_listener))) => { - let new_listener = Box::new( - new_listener.map(|f| { - let f = f.map(|(out, maf)| { - (out, Box::new(maf) as Box>) - }); + loop { + match self.new_listeners.poll() { + Ok(Async::Ready(Some(new_listener))) => { + let new_listener = Box::new( + new_listener.map(|f| { + let f = f.map(|(out, maf)| { + (out, Box::new(maf) as Box>) + }); - Box::new(f) as Box> - }), - ) as Box>; - self.listeners.push(new_listener.into_future()); + Box::new(f) as Box> + }), + ) as Box>; + self.listeners.push(new_listener.into_future()); + } + Ok(Async::Ready(None)) | Err(_) => { + // New listener sender has been closed. + break; + } + Ok(Async::NotReady) => break, } - Ok(Async::Ready(None)) | Err(_) => { - // New listener sender has been closed. - } - Ok(Async::NotReady) => {} - }; + } - match self.new_dialers.poll() { - Ok(Async::Ready(Some(new_dialer))) => { - self.dialers.push(new_dialer); + loop { + match self.new_dialers.poll() { + Ok(Async::Ready(Some(new_dialer))) => { + self.dialers.push(new_dialer); + } + Ok(Async::Ready(None)) | Err(_) => { + // New dialers sender has been closed. + break + } + Ok(Async::NotReady) => break, } - Ok(Async::Ready(None)) | Err(_) => { - // New dialers sender has been closed. - } - Ok(Async::NotReady) => {} - }; + } - match self.new_toprocess.poll() { - Ok(Async::Ready(Some(new_toprocess))) => { - self.to_process.push(future::Either::B(new_toprocess)); + loop { + match self.new_toprocess.poll() { + Ok(Async::Ready(Some(new_toprocess))) => { + self.to_process.push(future::Either::B(new_toprocess)); + } + Ok(Async::Ready(None)) | Err(_) => { + // New to-process sender has been closed. + break + } + Ok(Async::NotReady) => break, } - Ok(Async::Ready(None)) | Err(_) => { - // New to-process sender has been closed. - } - Ok(Async::NotReady) => {} - }; + } loop { match self.listeners.poll() { @@ -261,39 +270,47 @@ where } } - match self.listeners_upgrade.poll() { - Ok(Async::Ready(Some((output, client_addr)))) => { - debug!("Successfully upgraded incoming connection"); - self.to_process.push(future::Either::A( - handler(output, client_addr).into_future(), - )); + loop { + match self.listeners_upgrade.poll() { + Ok(Async::Ready(Some((output, client_addr)))) => { + debug!("Successfully upgraded incoming connection"); + self.to_process.push(future::Either::A( + handler(output, client_addr).into_future(), + )); + } + Err(err) => { + debug!("Error in listener upgrade: {:?}", err); + break; + } + _ => break } - Err(err) => { - debug!("Error in listener upgrade: {:?}", err); - } - _ => {} } - match self.dialers.poll() { - Ok(Async::Ready(Some((output, addr)))) => { - trace!("Successfully upgraded dialed connection"); - self.to_process - .push(future::Either::A(handler(output, addr).into_future())); + loop { + match self.dialers.poll() { + Ok(Async::Ready(Some((output, addr)))) => { + trace!("Successfully upgraded dialed connection"); + self.to_process + .push(future::Either::A(handler(output, addr).into_future())); + } + Err(err) => { + debug!("Error in dialer upgrade: {:?}", err); + break; + } + _ => break } - Err(err) => { - debug!("Error in dialer upgrade: {:?}", err); - } - _ => {} } - match self.to_process.poll() { - Ok(Async::Ready(Some(()))) => { - trace!("Future returned by swarm handler driven to completion"); + loop { + match self.to_process.poll() { + Ok(Async::Ready(Some(()))) => { + trace!("Future returned by swarm handler driven to completion"); + } + Err(err) => { + debug!("Error in processing: {:?}", err); + } + _ => break, } - Err(err) => { - debug!("Error in processing: {:?}", err); - } - _ => {} } // TODO: we never return `Ok(Ready)` because there's no way to know whether