Add loops around stream polling (#251)

This commit is contained in:
Pierre Krieger 2018-06-22 11:02:47 +02:00 committed by GitHub
parent 02bf7604d0
commit ab96f7efe0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 108 additions and 88 deletions

View File

@ -278,39 +278,42 @@ where
// Check for any incoming connection on the listening socket. // Check for any incoming connection on the listening socket.
// Note that since `self.listener` is a `Fuse`, it's not a problem to continue polling even // Note that since `self.listener` is a `Fuse`, it's not a problem to continue polling even
// after it is finished or after it error'ed. // after it is finished or after it error'ed.
match self.listener.poll() { loop {
Ok(Async::Ready(Some(upgrade))) => { match self.listener.poll() {
self.current_upgrades.push(upgrade); Ok(Async::Ready(Some(upgrade))) => {
} self.current_upgrades.push(upgrade);
Ok(Async::NotReady) => {} }
Ok(Async::Ready(None)) => { Ok(Async::NotReady) => break,
debug!("listener has been closed"); Ok(Async::Ready(None)) => {
if self.connections.is_empty() && self.current_upgrades.is_empty() { debug!("listener has been closed");
return Ok(Async::Ready(None)); if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Ok(Async::Ready(None));
}
break
}
Err(err) => {
debug!("error while polling listener: {:?}", err);
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Err(err);
}
break
} }
} }
Err(err) => { }
debug!("error while polling listener: {:?}", err);
if self.connections.is_empty() && self.current_upgrades.is_empty() {
return Err(err);
}
}
};
// We extract everything at the start, then insert back the elements that we still want at loop {
// the next iteration. match self.current_upgrades.poll() {
match self.current_upgrades.poll() { Ok(Async::Ready(Some((muxer, client_addr)))) => {
Ok(Async::Ready(Some((muxer, client_addr)))) => { let next_incoming = muxer.clone().inbound();
let next_incoming = muxer.clone().inbound(); self.connections
self.connections .push((muxer.clone(), next_incoming, client_addr.clone()));
.push((muxer.clone(), next_incoming, client_addr.clone())); }
Err(err) => {
debug!("error while upgrading listener connection: {:?}", err);
return Ok(Async::Ready(Some(future::err(err))));
}
_ => break,
} }
Err(err) => {
// Insert the rest of the pending upgrades, but not the current one.
debug!("error while upgrading listener connection: {:?}", err);
return Ok(Async::Ready(Some(future::err(err))));
}
_ => {}
} }
// Check whether any incoming substream is ready. // Check whether any incoming substream is ready.

View File

@ -205,46 +205,55 @@ where
debug!("Error in multiplexed incoming connection: {:?}", err); debug!("Error in multiplexed incoming connection: {:?}", err);
self.next_incoming = self.transport.clone().next_incoming(); self.next_incoming = self.transport.clone().next_incoming();
} }
}; }
match self.new_listeners.poll() { loop {
Ok(Async::Ready(Some(new_listener))) => { match self.new_listeners.poll() {
let new_listener = Box::new( Ok(Async::Ready(Some(new_listener))) => {
new_listener.map(|f| { let new_listener = Box::new(
let f = f.map(|(out, maf)| { new_listener.map(|f| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>) let f = f.map(|(out, maf)| {
}); (out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
});
Box::new(f) as Box<Future<Item = _, Error = _>> Box::new(f) as Box<Future<Item = _, Error = _>>
}), }),
) as Box<Stream<Item = _, Error = _>>; ) as Box<Stream<Item = _, Error = _>>;
self.listeners.push(new_listener.into_future()); self.listeners.push(new_listener.into_future());
}
Ok(Async::Ready(None)) | Err(_) => {
// New listener sender has been closed.
break;
}
Ok(Async::NotReady) => break,
} }
Ok(Async::Ready(None)) | Err(_) => { }
// New listener sender has been closed.
}
Ok(Async::NotReady) => {}
};
match self.new_dialers.poll() { loop {
Ok(Async::Ready(Some(new_dialer))) => { match self.new_dialers.poll() {
self.dialers.push(new_dialer); Ok(Async::Ready(Some(new_dialer))) => {
self.dialers.push(new_dialer);
}
Ok(Async::Ready(None)) | Err(_) => {
// New dialers sender has been closed.
break
}
Ok(Async::NotReady) => break,
} }
Ok(Async::Ready(None)) | Err(_) => { }
// New dialers sender has been closed.
}
Ok(Async::NotReady) => {}
};
match self.new_toprocess.poll() { loop {
Ok(Async::Ready(Some(new_toprocess))) => { match self.new_toprocess.poll() {
self.to_process.push(future::Either::B(new_toprocess)); Ok(Async::Ready(Some(new_toprocess))) => {
self.to_process.push(future::Either::B(new_toprocess));
}
Ok(Async::Ready(None)) | Err(_) => {
// New to-process sender has been closed.
break
}
Ok(Async::NotReady) => break,
} }
Ok(Async::Ready(None)) | Err(_) => { }
// New to-process sender has been closed.
}
Ok(Async::NotReady) => {}
};
loop { loop {
match self.listeners.poll() { match self.listeners.poll() {
@ -261,39 +270,47 @@ where
} }
} }
match self.listeners_upgrade.poll() { loop {
Ok(Async::Ready(Some((output, client_addr)))) => { match self.listeners_upgrade.poll() {
debug!("Successfully upgraded incoming connection"); Ok(Async::Ready(Some((output, client_addr)))) => {
self.to_process.push(future::Either::A( debug!("Successfully upgraded incoming connection");
handler(output, client_addr).into_future(), self.to_process.push(future::Either::A(
)); handler(output, client_addr).into_future(),
));
}
Err(err) => {
debug!("Error in listener upgrade: {:?}", err);
break;
}
_ => break
} }
Err(err) => {
debug!("Error in listener upgrade: {:?}", err);
}
_ => {}
} }
match self.dialers.poll() { loop {
Ok(Async::Ready(Some((output, addr)))) => { match self.dialers.poll() {
trace!("Successfully upgraded dialed connection"); Ok(Async::Ready(Some((output, addr)))) => {
self.to_process trace!("Successfully upgraded dialed connection");
.push(future::Either::A(handler(output, addr).into_future())); self.to_process
.push(future::Either::A(handler(output, addr).into_future()));
}
Err(err) => {
debug!("Error in dialer upgrade: {:?}", err);
break;
}
_ => break
} }
Err(err) => {
debug!("Error in dialer upgrade: {:?}", err);
}
_ => {}
} }
match self.to_process.poll() { loop {
Ok(Async::Ready(Some(()))) => { match self.to_process.poll() {
trace!("Future returned by swarm handler driven to completion"); Ok(Async::Ready(Some(()))) => {
trace!("Future returned by swarm handler driven to completion");
}
Err(err) => {
debug!("Error in processing: {:?}", err);
}
_ => break,
} }
Err(err) => {
debug!("Error in processing: {:?}", err);
}
_ => {}
} }
// TODO: we never return `Ok(Ready)` because there's no way to know whether // TODO: we never return `Ok(Ready)` because there's no way to know whether