diff --git a/core/src/protocols_handler/dummy.rs b/core/src/protocols_handler/dummy.rs index ee8dafb4..50e66b6e 100644 --- a/core/src/protocols_handler/dummy.rs +++ b/core/src/protocols_handler/dummy.rs @@ -88,6 +88,9 @@ where #[inline] fn inject_inbound_closed(&mut self) {} + #[inline] + fn connection_keep_alive(&self) -> bool { false } + #[inline] fn shutdown(&mut self) { self.shutting_down = true; diff --git a/core/src/protocols_handler/map_in.rs b/core/src/protocols_handler/map_in.rs index ae62d9fe..9f80d248 100644 --- a/core/src/protocols_handler/map_in.rs +++ b/core/src/protocols_handler/map_in.rs @@ -99,6 +99,11 @@ where self.inner.inject_inbound_closed() } + #[inline] + fn connection_keep_alive(&self) -> bool { + self.inner.connection_keep_alive() + } + #[inline] fn shutdown(&mut self) { self.inner.shutdown() diff --git a/core/src/protocols_handler/map_out.rs b/core/src/protocols_handler/map_out.rs index defa03a0..6df74424 100644 --- a/core/src/protocols_handler/map_out.rs +++ b/core/src/protocols_handler/map_out.rs @@ -94,6 +94,11 @@ where self.inner.inject_inbound_closed() } + #[inline] + fn connection_keep_alive(&self) -> bool { + self.inner.connection_keep_alive() + } + #[inline] fn shutdown(&mut self) { self.inner.shutdown() diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index 6127b68a..1fd5795d 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -134,8 +134,25 @@ pub trait ProtocolsHandler { /// therefore no more inbound substreams will be produced. fn inject_inbound_closed(&mut self); + /// Returns whether the connection should be kept alive. + /// + /// If returns `false`, that indicates that this connection is not important and the user may + /// invoke `shutdown()` if they think that they will no longer need the connection in the + /// future. + /// + /// On the other hand, returning `true` is only an indication and doesn't mean that the user + /// will not call `shutdown()`. + /// + /// When multiple `ProtocolsHandler` are combined together, they should use *OR* to merge the + /// result of this method. + /// + /// The result of this method should be checked every time `poll()` is invoked. + /// + /// After `shutdown()` is called, the result of this method doesn't matter anymore. + fn connection_keep_alive(&self) -> bool; + /// Indicates to the node that it should shut down. After that, it is expected that `poll()` - /// returns `Ready(None)` as soon as possible. + /// returns `Ready(ProtocolsHandlerEvent::Shutdown)` as soon as possible. /// /// This method allows an implementation to perform a graceful shutdown of the substreams, and /// send back various events. @@ -145,8 +162,9 @@ pub trait ProtocolsHandler { /// node should be closed. /// /// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns - /// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to - /// > `shutdown()` and will eventually be closed and destroyed. + /// > `Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))`, all the other handlers + /// > will receive a call to `shutdown()` and will eventually be closed and + /// > destroyed. fn poll(&mut self) -> Poll, Self::Error>; /// Adds a closure that turns the input event into something else. @@ -186,7 +204,7 @@ pub trait ProtocolsHandler { where Self: Sized, { - NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10)) + NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10), Duration::from_secs(5)) } /// Builds an implementation of `NodeHandler` that handles this protocol exclusively. diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index ad321b81..6a3d2833 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -29,8 +29,8 @@ use crate::{ } }; use futures::prelude::*; -use std::time::Duration; -use tokio_timer::Timeout; +use std::time::{Duration, Instant}; +use tokio_timer::{Delay, Timeout}; /// Prototype for a `NodeHandlerWrapper`. pub struct NodeHandlerWrapperBuilder @@ -43,6 +43,8 @@ where in_timeout: Duration, /// Timeout for outgoing substreams negotiation. out_timeout: Duration, + /// Time after which a useless connection will be closed. + useless_timeout: Duration, } impl NodeHandlerWrapperBuilder @@ -51,11 +53,12 @@ where { /// Builds a `NodeHandlerWrapperBuilder`. #[inline] - pub(crate) fn new(handler: TProtoHandler, in_timeout: Duration, out_timeout: Duration) -> Self { + pub(crate) fn new(handler: TProtoHandler, in_timeout: Duration, out_timeout: Duration, useless_timeout: Duration) -> Self { NodeHandlerWrapperBuilder { handler, in_timeout, out_timeout, + useless_timeout, } } @@ -73,6 +76,14 @@ where self } + /// Sets the timeout between the moment `connection_keep_alive()` returns `false` on the + /// `ProtocolsHandler`, and the moment the connection is closed. + #[inline] + pub fn with_useless_timeout(mut self, timeout: Duration) -> Self { + self.useless_timeout = timeout; + self + } + /// Builds the `NodeHandlerWrapper`. #[inline] pub fn build(self) -> NodeHandlerWrapper { @@ -84,6 +95,8 @@ where out_timeout: self.out_timeout, queued_dial_upgrades: Vec::new(), unique_dial_upgrade_id: 0, + connection_shutdown: None, + useless_timeout: self.useless_timeout, } } } @@ -114,6 +127,12 @@ where queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>, /// Unique identifier assigned to each queued dial upgrade. unique_dial_upgrade_id: u64, + /// When a connection has been deemed useless, will contain `Some` with a `Delay` to when it + /// should be shut down. + connection_shutdown: Option, + /// Timeout after which a useless connection is closed. When the `connection_shutdown` is set + /// to `Some`, this is the value that is being used. + useless_timeout: Duration, } impl NodeHandler for NodeHandlerWrapper @@ -243,7 +262,15 @@ where // Poll the handler at the end so that we see the consequences of the method calls on // `self.handler`. - match self.handler.poll()? { + let poll_result = self.handler.poll()?; + + if self.handler.connection_keep_alive() { + self.connection_shutdown = None; + } else if self.connection_shutdown.is_none() { + self.connection_shutdown = Some(Delay::new(Instant::now() + self.useless_timeout)); + } + + match poll_result { Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { return Ok(Async::Ready(NodeHandlerEvent::Custom(event))); } @@ -264,6 +291,23 @@ where Async::NotReady => (), }; + // Check the `connection_shutdown`. + if let Some(mut connection_shutdown) = self.connection_shutdown.take() { + // If we're negotiating substreams, let's delay the closing. + if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { + match connection_shutdown.poll() { + Ok(Async::Ready(_)) | Err(_) => { + return Ok(Async::Ready(NodeHandlerEvent::Shutdown)) + }, + Ok(Async::NotReady) => { + self.connection_shutdown = Some(connection_shutdown); + } + } + } else { + self.connection_shutdown = Some(connection_shutdown); + } + } + Ok(Async::NotReady) } } diff --git a/core/src/protocols_handler/select.rs b/core/src/protocols_handler/select.rs index d010771b..41031f10 100644 --- a/core/src/protocols_handler/select.rs +++ b/core/src/protocols_handler/select.rs @@ -155,6 +155,11 @@ where } } + #[inline] + fn connection_keep_alive(&self) -> bool { + self.proto1.connection_keep_alive() || self.proto2.connection_keep_alive() + } + #[inline] fn shutdown(&mut self) { self.proto1.shutdown(); diff --git a/protocols/floodsub/src/handler.rs b/protocols/floodsub/src/handler.rs index f8a1fac4..d96d26fc 100644 --- a/protocols/floodsub/src/handler.rs +++ b/protocols/floodsub/src/handler.rs @@ -148,6 +148,11 @@ where #[inline] fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} + #[inline] + fn connection_keep_alive(&self) -> bool { + !self.substreams.is_empty() + } + #[inline] fn shutdown(&mut self) { self.shutting_down = true; diff --git a/protocols/identify/src/listen_handler.rs b/protocols/identify/src/listen_handler.rs index c05a3926..e2d0e4fb 100644 --- a/protocols/identify/src/listen_handler.rs +++ b/protocols/identify/src/listen_handler.rs @@ -89,6 +89,11 @@ where #[inline] fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} + #[inline] + fn connection_keep_alive(&self) -> bool { + false + } + #[inline] fn shutdown(&mut self) { self.shutdown = true; diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs index 2cb1bbf1..6625e424 100644 --- a/protocols/identify/src/periodic_id_handler.rs +++ b/protocols/identify/src/periodic_id_handler.rs @@ -49,6 +49,9 @@ pub struct PeriodicIdHandler { /// shut down. next_id: Option, + /// If `true`, we have started an identification of the remote at least once in the past. + first_id_happened: bool, + /// Marker for strong typing. marker: PhantomData, } @@ -70,6 +73,7 @@ impl PeriodicIdHandler { config: IdentifyProtocolConfig, pending_result: None, next_id: Some(Delay::new(Instant::now() + DELAY_TO_FIRST_ID)), + first_id_happened: false, marker: PhantomData, } } @@ -118,6 +122,11 @@ where } } + #[inline] + fn connection_keep_alive(&self) -> bool { + !self.first_id_happened + } + #[inline] fn shutdown(&mut self) { self.next_id = None; @@ -151,6 +160,7 @@ where next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); let upgrade = self.config.clone(); let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; + self.first_id_happened = true; Ok(Async::Ready(ev)) } } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 7ef85081..a20b6a1d 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -489,6 +489,11 @@ where } } + #[inline] + fn connection_keep_alive(&self) -> bool { + !self.substreams.is_empty() + } + #[inline] fn shutdown(&mut self) { self.shutting_down = true; diff --git a/protocols/ping/src/dial_handler.rs b/protocols/ping/src/dial_handler.rs index fff2e8ce..53eedc75 100644 --- a/protocols/ping/src/dial_handler.rs +++ b/protocols/ping/src/dial_handler.rs @@ -193,6 +193,11 @@ where } } + #[inline] + fn connection_keep_alive(&self) -> bool { + false + } + fn shutdown(&mut self) { // Put `Shutdown` in `self.out_state` if we don't have any substream open. // Otherwise, keep the state as it is but call `shutdown()` on the substream. This diff --git a/protocols/ping/src/listen_handler.rs b/protocols/ping/src/listen_handler.rs index 47fb26d1..e9d6064c 100644 --- a/protocols/ping/src/listen_handler.rs +++ b/protocols/ping/src/listen_handler.rs @@ -105,6 +105,11 @@ where #[inline] fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} + #[inline] + fn connection_keep_alive(&self) -> bool { + false + } + #[inline] fn shutdown(&mut self) { for ping in self.ping_in_substreams.iter_mut() {