From 30c082dfe55f374cc5f0964a2e3953b54aadfa34 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 25 Jan 2019 15:56:32 +0100 Subject: [PATCH] Properly shut down protocols handler on useless timeout (#892) --- core/src/protocols_handler/node_handler.rs | 83 ++++++++++++---------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index fbf84133..3c5083a1 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -289,50 +289,55 @@ where // Poll the handler at the end so that we see the consequences of the method calls on // `self.handler`. - let poll_result = self.handler.poll()?; + loop { + let poll_result = self.handler.poll()?; - if self.handler.connection_keep_alive() { - self.connection_shutdown = None; - } else if self.connection_shutdown.is_none() { - self.connection_shutdown = Some(Delay::new(Instant::now() + self.useless_timeout)); - } - - match poll_result { - Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { - return Ok(Async::Ready(NodeHandlerEvent::Custom(event))); + if self.handler.connection_keep_alive() { + self.connection_shutdown = None; + } else if self.connection_shutdown.is_none() { + self.connection_shutdown = Some(Delay::new(Instant::now() + self.useless_timeout)); } - Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade, - info, - }) => { - let id = self.unique_dial_upgrade_id; - self.unique_dial_upgrade_id += 1; - self.queued_dial_upgrades.push((id, upgrade)); - return Ok(Async::Ready( - NodeHandlerEvent::OutboundSubstreamRequest((id, info)), - )); - } - Async::Ready(ProtocolsHandlerEvent::Shutdown) => { - return Ok(Async::Ready(NodeHandlerEvent::Shutdown)) - }, - Async::NotReady => (), - }; - // Check the `connection_shutdown`. - if let Some(mut connection_shutdown) = self.connection_shutdown.take() { - // If we're negotiating substreams, let's delay the closing. - if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { - match connection_shutdown.poll() { - Ok(Async::Ready(_)) | Err(_) => { - return Ok(Async::Ready(NodeHandlerEvent::Shutdown)) - }, - Ok(Async::NotReady) => { - self.connection_shutdown = Some(connection_shutdown); - } + match poll_result { + Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Ok(Async::Ready(NodeHandlerEvent::Custom(event))); + } + Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade, + info, + }) => { + let id = self.unique_dial_upgrade_id; + self.unique_dial_upgrade_id += 1; + self.queued_dial_upgrades.push((id, upgrade)); + return Ok(Async::Ready( + NodeHandlerEvent::OutboundSubstreamRequest((id, info)), + )); + } + Async::Ready(ProtocolsHandlerEvent::Shutdown) => { + return Ok(Async::Ready(NodeHandlerEvent::Shutdown)) + }, + Async::NotReady => (), + }; + + // Check the `connection_shutdown`. + if let Some(mut connection_shutdown) = self.connection_shutdown.take() { + // If we're negotiating substreams, let's delay the closing. + if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { + match connection_shutdown.poll() { + Ok(Async::Ready(_)) | Err(_) => { + self.shutdown(); + continue; // We need to poll the handler again. + }, + Ok(Async::NotReady) => { + self.connection_shutdown = Some(connection_shutdown); + } + } + } else { + self.connection_shutdown = Some(connection_shutdown); } - } else { - self.connection_shutdown = Some(connection_shutdown); } + + break; } Ok(Async::NotReady)