Cleaner shutdown process (#992)

* Cleaner shutdown process

* Finish

* Fix Yamux panic

* Remove irrelevant tests

* Update core/src/nodes/handled_node_tasks.rs

Co-Authored-By: tomaka <pierre.krieger1708@gmail.com>

* Fix yamux error handling

* Update yamux
This commit is contained in:
Pierre Krieger
2019-03-11 17:19:50 +01:00
committed by GitHub
parent cb93c822f1
commit 8059a693a3
32 changed files with 434 additions and 1555 deletions

View File

@ -31,7 +31,7 @@ use crate::{
}
};
use futures::prelude::*;
use std::time::{Duration, Instant};
use std::{error, fmt, time::Duration, time::Instant};
use tokio_timer::{Delay, Timeout};
/// Prototype for a `NodeHandlerWrapper`.
@ -145,6 +145,46 @@ where
connection_shutdown: Option<Delay>,
}
/// Error generated by the `NodeHandlerWrapper`.
#[derive(Debug)]
pub enum NodeHandlerWrapperError<TErr> {
/// Error generated by the handler.
Handler(TErr),
/// The connection has been deemed useless and has been closed.
UselessTimeout,
}
impl<TErr> From<TErr> for NodeHandlerWrapperError<TErr> {
fn from(err: TErr) -> NodeHandlerWrapperError<TErr> {
NodeHandlerWrapperError::Handler(err)
}
}
impl<TErr> fmt::Display for NodeHandlerWrapperError<TErr>
where
TErr: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NodeHandlerWrapperError::Handler(err) => write!(f, "{}", err),
NodeHandlerWrapperError::UselessTimeout =>
write!(f, "Node has been closed due to inactivity"),
}
}
}
impl<TErr> error::Error for NodeHandlerWrapperError<TErr>
where
TErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
NodeHandlerWrapperError::Handler(err) => Some(err),
NodeHandlerWrapperError::UselessTimeout => None,
}
}
}
impl<TProtoHandler> NodeHandler for NodeHandlerWrapper<TProtoHandler>
where
TProtoHandler: ProtocolsHandler,
@ -153,7 +193,7 @@ where
{
type InEvent = TProtoHandler::InEvent;
type OutEvent = TProtoHandler::OutEvent;
type Error = TProtoHandler::Error;
type Error = NodeHandlerWrapperError<TProtoHandler::Error>;
type Substream = TProtoHandler::Substream;
// The first element of the tuple is the unique upgrade identifier
// (see `unique_dial_upgrade_id`).
@ -192,42 +232,11 @@ where
}
}
#[inline]
fn inject_inbound_closed(&mut self) {
self.handler.inject_inbound_closed();
}
fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo) {
let pos = match self
.queued_dial_upgrades
.iter()
.position(|(id, _)| id == &user_data.0)
{
Some(p) => p,
None => {
debug_assert!(
false,
"Received an outbound closed error with an invalid upgrade ID"
);
return;
}
};
self.queued_dial_upgrades.remove(pos);
self.handler
.inject_dial_upgrade_error(user_data.1, ProtocolsHandlerUpgrErr::MuxerDeniedSubstream);
}
#[inline]
fn inject_event(&mut self, event: Self::InEvent) {
self.handler.inject_event(event);
}
#[inline]
fn shutdown(&mut self) {
self.handler.shutdown();
}
fn poll(&mut self) -> Poll<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error> {
// Continue negotiation of newly-opened substreams on the listening side.
// We remove each element from `negotiating_in` one by one and add them back if not ready.
@ -273,55 +282,47 @@ where
// Poll the handler at the end so that we see the consequences of the method calls on
// `self.handler`.
loop {
let poll_result = self.handler.poll()?;
let poll_result = self.handler.poll()?;
self.connection_shutdown = match self.handler.connection_keep_alive() {
KeepAlive::Until(expiration) => Some(Delay::new(expiration)),
KeepAlive::Now => Some(Delay::new(Instant::now())),
KeepAlive::Forever => None,
};
self.connection_shutdown = match self.handler.connection_keep_alive() {
KeepAlive::Until(expiration) => Some(Delay::new(expiration)),
KeepAlive::Now => Some(Delay::new(Instant::now())),
KeepAlive::Forever => None,
};
match poll_result {
Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(NodeHandlerEvent::Custom(event)));
}
Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade,
info,
}) => {
let id = self.unique_dial_upgrade_id;
self.unique_dial_upgrade_id += 1;
self.queued_dial_upgrades.push((id, upgrade));
return Ok(Async::Ready(
NodeHandlerEvent::OutboundSubstreamRequest((id, info)),
));
}
Async::Ready(ProtocolsHandlerEvent::Shutdown) => {
return Ok(Async::Ready(NodeHandlerEvent::Shutdown))
},
Async::NotReady => (),
};
// Check the `connection_shutdown`.
if let Some(mut connection_shutdown) = self.connection_shutdown.take() {
// If we're negotiating substreams, let's delay the closing.
if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {
match connection_shutdown.poll() {
Ok(Async::Ready(_)) | Err(_) => {
self.shutdown();
continue; // We need to poll the handler again.
},
Ok(Async::NotReady) => {
self.connection_shutdown = Some(connection_shutdown);
}
}
} else {
self.connection_shutdown = Some(connection_shutdown);
}
match poll_result {
Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(NodeHandlerEvent::Custom(event)));
}
Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade,
info,
}) => {
let id = self.unique_dial_upgrade_id;
self.unique_dial_upgrade_id += 1;
self.queued_dial_upgrades.push((id, upgrade));
return Ok(Async::Ready(
NodeHandlerEvent::OutboundSubstreamRequest((id, info)),
));
}
Async::NotReady => (),
};
break;
// Check the `connection_shutdown`.
if let Some(mut connection_shutdown) = self.connection_shutdown.take() {
// If we're negotiating substreams, let's delay the closing.
if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {
match connection_shutdown.poll() {
Ok(Async::Ready(_)) | Err(_) => {
return Err(NodeHandlerWrapperError::UselessTimeout);
},
Ok(Async::NotReady) => {
self.connection_shutdown = Some(connection_shutdown);
}
}
} else {
self.connection_shutdown = Some(connection_shutdown);
}
}
Ok(Async::NotReady)