Allow oneshot handler's inactivity_timeout to be configurable (#1039)

This commit is contained in:
Age Manning
2019-04-05 06:28:18 +11:00
committed by Pierre Krieger
parent fac1ba12ec
commit 1dcb3e5b1c

View File

@@ -18,7 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr};
use crate::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr,
};
use crate::upgrade::{InboundUpgrade, OutboundUpgrade};
use futures::prelude::*;
use smallvec::SmallVec;
@@ -30,12 +32,14 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// This struct is meant to be a helper for other implementations to use.
// TODO: Debug
pub struct OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where TOutProto: OutboundUpgrade<TSubstream>
where
TOutProto: OutboundUpgrade<TSubstream>,
{
/// The upgrade for inbound substreams.
listen_protocol: TInProto,
/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: Option<ProtocolsHandlerUpgrErr<<TOutProto as OutboundUpgrade<TSubstream>>::Error>>,
pending_error:
Option<ProtocolsHandlerUpgrErr<<TOutProto as OutboundUpgrade<TSubstream>>::Error>>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[TOutEvent; 4]>,
/// Queue of outbound substreams to open.
@@ -54,11 +58,12 @@ where TOutProto: OutboundUpgrade<TSubstream>
impl<TSubstream, TInProto, TOutProto, TOutEvent>
OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where TOutProto: OutboundUpgrade<TSubstream>
where
TOutProto: OutboundUpgrade<TSubstream>,
{
/// Creates a `OneShotHandler`.
#[inline]
pub fn new(listen_protocol: TInProto) -> Self {
pub fn new(listen_protocol: TInProto, inactive_timeout: Duration) -> Self {
OneShotHandler {
listen_protocol,
pending_error: None,
@@ -67,7 +72,7 @@ where TOutProto: OutboundUpgrade<TSubstream>
dial_negotiated: 0,
max_dial_negotiated: 8,
keep_alive: KeepAlive::Forever,
inactive_timeout: Duration::from_secs(10), // TODO: allow configuring
inactive_timeout,
marker: PhantomData,
}
}
@@ -104,19 +109,20 @@ where TOutProto: OutboundUpgrade<TSubstream>
}
}
impl<TSubstream, TInProto, TOutProto, TOutEvent> Default for
OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where TOutProto: OutboundUpgrade<TSubstream>,
TInProto: Default
impl<TSubstream, TInProto, TOutProto, TOutEvent> Default
for OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where
TOutProto: OutboundUpgrade<TSubstream>,
TInProto: Default,
{
#[inline]
fn default() -> Self {
OneShotHandler::new(Default::default())
OneShotHandler::new(Default::default(), Duration::from_secs(10))
}
}
impl<TSubstream, TInProto, TOutProto, TOutEvent> ProtocolsHandler for
OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
impl<TSubstream, TInProto, TOutProto, TOutEvent> ProtocolsHandler
for OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where
TSubstream: AsyncRead + AsyncWrite,
TInProto: InboundUpgrade<TSubstream> + Clone,
@@ -127,7 +133,9 @@ where
{
type InEvent = TOutProto;
type OutEvent = TOutEvent;
type Error = ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>;
type Error = ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
>;
type Substream = TSubstream;
type InboundProtocol = TInProto;
type OutboundProtocol = TOutProto;
@@ -141,7 +149,7 @@ where
#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgrade<Self::Substream>>::Output
out: <Self::InboundProtocol as InboundUpgrade<Self::Substream>>::Output,
) {
// If we're shutting down the connection for inactivity, reset the timeout.
if !self.keep_alive.is_forever() {
@@ -155,7 +163,7 @@ where
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Output,
_: Self::OutboundOpenInfo
_: Self::OutboundOpenInfo,
) {
self.dial_negotiated -= 1;
@@ -172,7 +180,13 @@ where
}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
>,
) {
if self.pending_error.is_none() {
self.pending_error = Some(error);
}
@@ -183,13 +197,20 @@ where
self.keep_alive
}
fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error> {
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error,
> {
if let Some(err) = self.pending_error.take() {
return Err(err);
}
if !self.events_out.is_empty() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
self.events_out.remove(0),
)));
} else {
self.events_out.shrink_to_fit();
}
@@ -197,10 +218,12 @@ where
if !self.dial_queue.is_empty() {
if self.dial_negotiated < self.max_dial_negotiated {
self.dial_negotiated += 1;
return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: self.dial_queue.remove(0),
info: (),
}));
return Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: self.dial_queue.remove(0),
info: (),
},
));
}
} else {
self.dial_queue.shrink_to_fit();