Automatically close useless connections (#816)

* Automatically close useless connections

* Add a timeout before dropping the connection

* Rework the timeout

* Use OR to combine the outcome
This commit is contained in:
Pierre Krieger
2019-01-04 12:02:39 +01:00
committed by GitHub
parent ea0f61366c
commit 7da1a860be
12 changed files with 123 additions and 8 deletions

View File

@ -88,6 +88,9 @@ where
#[inline] #[inline]
fn inject_inbound_closed(&mut self) {} fn inject_inbound_closed(&mut self) {}
#[inline]
fn connection_keep_alive(&self) -> bool { false }
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.shutting_down = true; self.shutting_down = true;

View File

@ -99,6 +99,11 @@ where
self.inner.inject_inbound_closed() self.inner.inject_inbound_closed()
} }
#[inline]
fn connection_keep_alive(&self) -> bool {
self.inner.connection_keep_alive()
}
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.inner.shutdown() self.inner.shutdown()

View File

@ -94,6 +94,11 @@ where
self.inner.inject_inbound_closed() self.inner.inject_inbound_closed()
} }
#[inline]
fn connection_keep_alive(&self) -> bool {
self.inner.connection_keep_alive()
}
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.inner.shutdown() self.inner.shutdown()

View File

@ -134,8 +134,25 @@ pub trait ProtocolsHandler {
/// therefore no more inbound substreams will be produced. /// therefore no more inbound substreams will be produced.
fn inject_inbound_closed(&mut self); fn inject_inbound_closed(&mut self);
/// Returns whether the connection should be kept alive.
///
/// If returns `false`, that indicates that this connection is not important and the user may
/// invoke `shutdown()` if they think that they will no longer need the connection in the
/// future.
///
/// On the other hand, returning `true` is only an indication and doesn't mean that the user
/// will not call `shutdown()`.
///
/// When multiple `ProtocolsHandler` are combined together, they should use *OR* to merge the
/// result of this method.
///
/// The result of this method should be checked every time `poll()` is invoked.
///
/// After `shutdown()` is called, the result of this method doesn't matter anymore.
fn connection_keep_alive(&self) -> bool;
/// Indicates to the node that it should shut down. After that, it is expected that `poll()` /// Indicates to the node that it should shut down. After that, it is expected that `poll()`
/// returns `Ready(None)` as soon as possible. /// returns `Ready(ProtocolsHandlerEvent::Shutdown)` as soon as possible.
/// ///
/// This method allows an implementation to perform a graceful shutdown of the substreams, and /// This method allows an implementation to perform a graceful shutdown of the substreams, and
/// send back various events. /// send back various events.
@ -145,8 +162,9 @@ pub trait ProtocolsHandler {
/// node should be closed. /// node should be closed.
/// ///
/// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns /// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns
/// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to /// > `Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))`, all the other handlers
/// > `shutdown()` and will eventually be closed and destroyed. /// > will receive a call to `shutdown()` and will eventually be closed and
/// > destroyed.
fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>; fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>;
/// Adds a closure that turns the input event into something else. /// Adds a closure that turns the input event into something else.
@ -186,7 +204,7 @@ pub trait ProtocolsHandler {
where where
Self: Sized, Self: Sized,
{ {
NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10)) NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10), Duration::from_secs(5))
} }
/// Builds an implementation of `NodeHandler` that handles this protocol exclusively. /// Builds an implementation of `NodeHandler` that handles this protocol exclusively.

View File

@ -29,8 +29,8 @@ use crate::{
} }
}; };
use futures::prelude::*; use futures::prelude::*;
use std::time::Duration; use std::time::{Duration, Instant};
use tokio_timer::Timeout; use tokio_timer::{Delay, Timeout};
/// Prototype for a `NodeHandlerWrapper`. /// Prototype for a `NodeHandlerWrapper`.
pub struct NodeHandlerWrapperBuilder<TProtoHandler> pub struct NodeHandlerWrapperBuilder<TProtoHandler>
@ -43,6 +43,8 @@ where
in_timeout: Duration, in_timeout: Duration,
/// Timeout for outgoing substreams negotiation. /// Timeout for outgoing substreams negotiation.
out_timeout: Duration, out_timeout: Duration,
/// Time after which a useless connection will be closed.
useless_timeout: Duration,
} }
impl<TProtoHandler> NodeHandlerWrapperBuilder<TProtoHandler> impl<TProtoHandler> NodeHandlerWrapperBuilder<TProtoHandler>
@ -51,11 +53,12 @@ where
{ {
/// Builds a `NodeHandlerWrapperBuilder`. /// Builds a `NodeHandlerWrapperBuilder`.
#[inline] #[inline]
pub(crate) fn new(handler: TProtoHandler, in_timeout: Duration, out_timeout: Duration) -> Self { pub(crate) fn new(handler: TProtoHandler, in_timeout: Duration, out_timeout: Duration, useless_timeout: Duration) -> Self {
NodeHandlerWrapperBuilder { NodeHandlerWrapperBuilder {
handler, handler,
in_timeout, in_timeout,
out_timeout, out_timeout,
useless_timeout,
} }
} }
@ -73,6 +76,14 @@ where
self self
} }
/// Sets the timeout between the moment `connection_keep_alive()` returns `false` on the
/// `ProtocolsHandler`, and the moment the connection is closed.
#[inline]
pub fn with_useless_timeout(mut self, timeout: Duration) -> Self {
self.useless_timeout = timeout;
self
}
/// Builds the `NodeHandlerWrapper`. /// Builds the `NodeHandlerWrapper`.
#[inline] #[inline]
pub fn build(self) -> NodeHandlerWrapper<TProtoHandler> { pub fn build(self) -> NodeHandlerWrapper<TProtoHandler> {
@ -84,6 +95,8 @@ where
out_timeout: self.out_timeout, out_timeout: self.out_timeout,
queued_dial_upgrades: Vec::new(), queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0, unique_dial_upgrade_id: 0,
connection_shutdown: None,
useless_timeout: self.useless_timeout,
} }
} }
} }
@ -114,6 +127,12 @@ where
queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>, queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>,
/// Unique identifier assigned to each queued dial upgrade. /// Unique identifier assigned to each queued dial upgrade.
unique_dial_upgrade_id: u64, unique_dial_upgrade_id: u64,
/// When a connection has been deemed useless, will contain `Some` with a `Delay` to when it
/// should be shut down.
connection_shutdown: Option<Delay>,
/// Timeout after which a useless connection is closed. When the `connection_shutdown` is set
/// to `Some`, this is the value that is being used.
useless_timeout: Duration,
} }
impl<TProtoHandler> NodeHandler for NodeHandlerWrapper<TProtoHandler> impl<TProtoHandler> NodeHandler for NodeHandlerWrapper<TProtoHandler>
@ -243,7 +262,15 @@ where
// Poll the handler at the end so that we see the consequences of the method calls on // Poll the handler at the end so that we see the consequences of the method calls on
// `self.handler`. // `self.handler`.
match self.handler.poll()? { let poll_result = self.handler.poll()?;
if self.handler.connection_keep_alive() {
self.connection_shutdown = None;
} else if self.connection_shutdown.is_none() {
self.connection_shutdown = Some(Delay::new(Instant::now() + self.useless_timeout));
}
match poll_result {
Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {
return Ok(Async::Ready(NodeHandlerEvent::Custom(event))); return Ok(Async::Ready(NodeHandlerEvent::Custom(event)));
} }
@ -264,6 +291,23 @@ where
Async::NotReady => (), Async::NotReady => (),
}; };
// Check the `connection_shutdown`.
if let Some(mut connection_shutdown) = self.connection_shutdown.take() {
// If we're negotiating substreams, let's delay the closing.
if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {
match connection_shutdown.poll() {
Ok(Async::Ready(_)) | Err(_) => {
return Ok(Async::Ready(NodeHandlerEvent::Shutdown))
},
Ok(Async::NotReady) => {
self.connection_shutdown = Some(connection_shutdown);
}
}
} else {
self.connection_shutdown = Some(connection_shutdown);
}
}
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }

View File

@ -155,6 +155,11 @@ where
} }
} }
#[inline]
fn connection_keep_alive(&self) -> bool {
self.proto1.connection_keep_alive() || self.proto2.connection_keep_alive()
}
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.proto1.shutdown(); self.proto1.shutdown();

View File

@ -148,6 +148,11 @@ where
#[inline] #[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {} fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
#[inline]
fn connection_keep_alive(&self) -> bool {
!self.substreams.is_empty()
}
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.shutting_down = true; self.shutting_down = true;

View File

@ -89,6 +89,11 @@ where
#[inline] #[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {} fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
#[inline]
fn connection_keep_alive(&self) -> bool {
false
}
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.shutdown = true; self.shutdown = true;

View File

@ -49,6 +49,9 @@ pub struct PeriodicIdHandler<TSubstream> {
/// shut down. /// shut down.
next_id: Option<Delay>, next_id: Option<Delay>,
/// If `true`, we have started an identification of the remote at least once in the past.
first_id_happened: bool,
/// Marker for strong typing. /// Marker for strong typing.
marker: PhantomData<TSubstream>, marker: PhantomData<TSubstream>,
} }
@ -70,6 +73,7 @@ impl<TSubstream> PeriodicIdHandler<TSubstream> {
config: IdentifyProtocolConfig, config: IdentifyProtocolConfig,
pending_result: None, pending_result: None,
next_id: Some(Delay::new(Instant::now() + DELAY_TO_FIRST_ID)), next_id: Some(Delay::new(Instant::now() + DELAY_TO_FIRST_ID)),
first_id_happened: false,
marker: PhantomData, marker: PhantomData,
} }
} }
@ -118,6 +122,11 @@ where
} }
} }
#[inline]
fn connection_keep_alive(&self) -> bool {
!self.first_id_happened
}
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.next_id = None; self.next_id = None;
@ -151,6 +160,7 @@ where
next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); next_id.reset(Instant::now() + DELAY_TO_NEXT_ID);
let upgrade = self.config.clone(); let upgrade = self.config.clone();
let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () };
self.first_id_happened = true;
Ok(Async::Ready(ev)) Ok(Async::Ready(ev))
} }
} }

View File

@ -489,6 +489,11 @@ where
} }
} }
#[inline]
fn connection_keep_alive(&self) -> bool {
!self.substreams.is_empty()
}
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
self.shutting_down = true; self.shutting_down = true;

View File

@ -193,6 +193,11 @@ where
} }
} }
#[inline]
fn connection_keep_alive(&self) -> bool {
false
}
fn shutdown(&mut self) { fn shutdown(&mut self) {
// Put `Shutdown` in `self.out_state` if we don't have any substream open. // Put `Shutdown` in `self.out_state` if we don't have any substream open.
// Otherwise, keep the state as it is but call `shutdown()` on the substream. This // Otherwise, keep the state as it is but call `shutdown()` on the substream. This

View File

@ -105,6 +105,11 @@ where
#[inline] #[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {} fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
#[inline]
fn connection_keep_alive(&self) -> bool {
false
}
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {
for ping in self.ping_in_substreams.iter_mut() { for ping in self.ping_in_substreams.iter_mut() {