Proper shutdown in ProtocolsHandlerSelect (#870)

* Proper shutdown in ProtocolsHandlerSelect

* Fix infinite loop
This commit is contained in:
Pierre Krieger
2019-01-29 19:52:22 +01:00
committed by GitHub
parent b8dfa724fd
commit 35ec7f053c
3 changed files with 228 additions and 39 deletions

View File

@ -0,0 +1,161 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{
either::EitherOutput,
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{
DeniedUpgrade,
EitherUpgrade,
InboundUpgrade,
OutboundUpgrade,
}
};
use futures::prelude::*;
/// Wrapper around a protocol handler and ignores all further method calls once it has shut down.
#[derive(Debug, Copy, Clone)]
pub struct Fuse<TProtoHandler> {
inner: Option<TProtoHandler>,
}
impl<TProtoHandler> Fuse<TProtoHandler> {
/// Creates a `Fuse`.
#[inline]
pub(crate) fn new(inner: TProtoHandler) -> Self {
Fuse {
inner: Some(inner),
}
}
/// Returns true if polling has returned `Shutdown` in the past.
#[inline]
pub fn is_shutdown(&self) -> bool {
self.inner.is_none()
}
}
impl<TProtoHandler> ProtocolsHandler for Fuse<TProtoHandler>
where
TProtoHandler: ProtocolsHandler,
{
type InEvent = TProtoHandler::InEvent;
type OutEvent = TProtoHandler::OutEvent;
type Error = TProtoHandler::Error;
type Substream = TProtoHandler::Substream;
type InboundProtocol = EitherUpgrade<TProtoHandler::InboundProtocol, DeniedUpgrade>;
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
#[inline]
fn listen_protocol(&self) -> Self::InboundProtocol {
if let Some(inner) = self.inner.as_ref() {
EitherUpgrade::A(inner.listen_protocol())
} else {
EitherUpgrade::B(DeniedUpgrade)
}
}
#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<Self::Substream>>::Output
) {
match (protocol, self.inner.as_mut()) {
(EitherOutput::First(proto), Some(inner)) => {
inner.inject_fully_negotiated_inbound(proto)
},
(EitherOutput::Second(_), None) => {}
(EitherOutput::First(_), None) => {} // Can happen if we shut down during an upgrade.
(EitherOutput::Second(_), Some(_)) => {
panic!("Wrong API usage; an upgrade was passed to a different object that the \
one that asked for the upgrade")
},
}
}
#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Output,
info: Self::OutboundOpenInfo
) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_fully_negotiated_outbound(protocol, info)
}
}
#[inline]
fn inject_event(&mut self, event: Self::InEvent) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_event(event)
}
}
#[inline]
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_dial_upgrade_error(info, error)
}
}
#[inline]
fn inject_inbound_closed(&mut self) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_inbound_closed()
}
}
#[inline]
fn connection_keep_alive(&self) -> bool {
if let Some(inner) = self.inner.as_ref() {
inner.connection_keep_alive()
} else {
false
}
}
#[inline]
fn shutdown(&mut self) {
if let Some(inner) = self.inner.as_mut() {
inner.shutdown()
}
}
#[inline]
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error,
> {
if let Some(mut inner) = self.inner.take() {
let poll = inner.poll();
if let Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) = poll {
poll
} else {
self.inner = Some(inner);
poll
}
} else {
Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
}
}
}

View File

@ -44,6 +44,7 @@ use std::{error, fmt, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
pub use self::dummy::DummyProtocolsHandler;
pub use self::fuse::Fuse;
pub use self::map_in::MapInEvent;
pub use self::map_out::MapOutEvent;
pub use self::node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder};
@ -51,6 +52,7 @@ pub use self::one_shot::OneShotHandler;
pub use self::select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect};
mod dummy;
mod fuse;
mod map_in;
mod map_out;
mod node_handler;
@ -193,6 +195,16 @@ pub trait ProtocolsHandler {
MapOutEvent::new(self, map)
}
/// Wraps around `self`. When `poll()` returns `Shutdown`, any further call to any method will
/// be ignored.
#[inline]
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse::new(self)
}
/// Builds an implementation of `ProtocolsHandler` that handles both this protocol and the
/// other one together.
#[inline]

View File

@ -1,4 +1,4 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
@ -23,6 +23,7 @@ use crate::{
either::EitherError,
either::EitherOutput,
protocols_handler::{
Fuse,
IntoProtocolsHandler,
ProtocolsHandler,
ProtocolsHandlerEvent,
@ -42,7 +43,9 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// Implementation of `IntoProtocolsHandler` that combines two protocols into one.
#[derive(Debug, Clone)]
pub struct IntoProtocolsHandlerSelect<TProto1, TProto2> {
/// The first protocol.
proto1: TProto1,
/// The second protocol.
proto2: TProto2,
}
@ -73,8 +76,8 @@ where
fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler {
ProtocolsHandlerSelect {
proto1: self.proto1.into_handler(remote_peer_id),
proto2: self.proto2.into_handler(remote_peer_id),
proto1: self.proto1.into_handler(remote_peer_id).fuse(),
proto2: self.proto2.into_handler(remote_peer_id).fuse(),
}
}
}
@ -82,8 +85,10 @@ where
/// Implementation of `ProtocolsHandler` that combines two protocols into one.
#[derive(Debug, Clone)]
pub struct ProtocolsHandlerSelect<TProto1, TProto2> {
proto1: TProto1,
proto2: TProto2,
/// The first protocol.
proto1: Fuse<TProto1>,
/// The second protocol.
proto2: Fuse<TProto2>,
}
impl<TProto1, TProto2> ProtocolsHandlerSelect<TProto1, TProto2> {
@ -91,8 +96,8 @@ impl<TProto1, TProto2> ProtocolsHandlerSelect<TProto1, TProto2> {
#[inline]
pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self {
ProtocolsHandlerSelect {
proto1,
proto2,
proto1: Fuse::new(proto1),
proto2: Fuse::new(proto2),
}
}
}
@ -112,7 +117,7 @@ where
type OutEvent = EitherOutput<TProto1::OutEvent, TProto2::OutEvent>;
type Error = EitherError<TProto1::Error, TProto2::Error>;
type Substream = TSubstream;
type InboundProtocol = SelectUpgrade<TProto1::InboundProtocol, TProto2::InboundProtocol>;
type InboundProtocol = SelectUpgrade<<Fuse<TProto1> as ProtocolsHandler>::InboundProtocol, <Fuse<TProto2> as ProtocolsHandler>::InboundProtocol>;
type OutboundProtocol = EitherUpgrade<TProto1::OutboundProtocol, TProto2::OutboundProtocol>;
type OutboundOpenInfo = EitherOutput<TProto1::OutboundOpenInfo, TProto2::OutboundOpenInfo>;
@ -213,38 +218,49 @@ where
}
fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error> {
match self.proto1.poll().map_err(EitherError::A)? {
Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::First(event))));
},
Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info}) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: EitherUpgrade::A(upgrade),
info: EitherOutput::First(info),
}));
},
Async::Ready(ProtocolsHandlerEvent::Shutdown) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
},
Async::NotReady => ()
};
loop {
match self.proto1.poll().map_err(EitherError::A)? {
Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::First(event))));
},
Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info}) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: EitherUpgrade::A(upgrade),
info: EitherOutput::First(info),
}));
},
Async::Ready(ProtocolsHandlerEvent::Shutdown) => {
self.proto2.shutdown();
},
Async::NotReady => ()
};
match self.proto2.poll().map_err(EitherError::B)? {
Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event))));
},
Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info }) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: EitherUpgrade::B(upgrade),
info: EitherOutput::Second(info),
}));
},
Async::Ready(ProtocolsHandlerEvent::Shutdown) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
},
Async::NotReady => ()
};
match self.proto2.poll().map_err(EitherError::B)? {
Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event))));
},
Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info }) => {
return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: EitherUpgrade::B(upgrade),
info: EitherOutput::Second(info),
}));
},
Async::Ready(ProtocolsHandlerEvent::Shutdown) => {
if !self.proto1.is_shutdown() {
self.proto1.shutdown();
continue;
}
},
Async::NotReady => ()
};
Ok(Async::NotReady)
break;
}
if self.proto1.is_shutdown() && self.proto2.is_shutdown() {
Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))
} else {
Ok(Async::NotReady)
}
}
}