Allow configuration of outbound substream in OneShotHandler. (#1521)

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Piotr Gołąb 2020-04-01 13:07:10 +02:00 committed by GitHub
parent be970466b3
commit 6b4bdc1fe9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -54,8 +54,8 @@ where
max_dial_negotiated: u32, max_dial_negotiated: u32,
/// Value to return from `connection_keep_alive`. /// Value to return from `connection_keep_alive`.
keep_alive: KeepAlive, keep_alive: KeepAlive,
/// After the given duration has elapsed, an inactive connection will shutdown. /// The configuration container for the handler
inactive_timeout: Duration, config: OneShotHandlerConfig,
} }
impl<TInProto, TOutProto, TOutEvent> impl<TInProto, TOutProto, TOutEvent>
@ -67,7 +67,7 @@ where
#[inline] #[inline]
pub fn new( pub fn new(
listen_protocol: SubstreamProtocol<TInProto>, listen_protocol: SubstreamProtocol<TInProto>,
inactive_timeout: Duration config: OneShotHandlerConfig,
) -> Self { ) -> Self {
OneShotHandler { OneShotHandler {
listen_protocol, listen_protocol,
@ -77,7 +77,7 @@ where
dial_negotiated: 0, dial_negotiated: 0,
max_dial_negotiated: 8, max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes, keep_alive: KeepAlive::Yes,
inactive_timeout, config
} }
} }
@ -121,7 +121,10 @@ where
{ {
#[inline] #[inline]
fn default() -> Self { fn default() -> Self {
OneShotHandler::new(SubstreamProtocol::new(Default::default()), Duration::from_secs(10)) OneShotHandler::new(
SubstreamProtocol::new(Default::default()),
OneShotHandlerConfig::default()
)
} }
} }
@ -157,7 +160,7 @@ where
) { ) {
// If we're shutting down the connection for inactivity, reset the timeout. // If we're shutting down the connection for inactivity, reset the timeout.
if !self.keep_alive.is_yes() { if !self.keep_alive.is_yes() {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout);
} }
self.events_out.push(out.into()); self.events_out.push(out.into());
@ -172,7 +175,7 @@ where
self.dial_negotiated -= 1; self.dial_negotiated -= 1;
if self.dial_negotiated == 0 && self.dial_queue.is_empty() { if self.dial_negotiated == 0 && self.dial_queue.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout);
} }
self.events_out.push(out.into()); self.events_out.push(out.into());
@ -224,7 +227,8 @@ where
self.dial_negotiated += 1; self.dial_negotiated += 1;
return Poll::Ready( return Poll::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest { ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.dial_queue.remove(0)), protocol: SubstreamProtocol::new(self.dial_queue.remove(0))
.with_timeout(self.config.substream_timeout),
info: (), info: (),
}, },
); );
@ -236,3 +240,21 @@ where
Poll::Pending Poll::Pending
} }
} }
/// Configuration parameters for the `OneShotHandler`
#[derive(Debug)]
pub struct OneShotHandlerConfig {
/// After the given duration has elapsed, an inactive connection will shutdown.
inactive_timeout: Duration,
/// Timeout duration for each newly opened outbound substream.
substream_timeout: Duration,
}
impl Default for OneShotHandlerConfig {
fn default() -> Self {
let inactive_timeout = Duration::from_secs(10);
let substream_timeout = Duration::from_secs(10);
OneShotHandlerConfig { inactive_timeout, substream_timeout }
}
}