diff --git a/core/src/protocols_handler/dummy.rs b/core/src/protocols_handler/dummy.rs index 50e66b6e..d543b6d4 100644 --- a/core/src/protocols_handler/dummy.rs +++ b/core/src/protocols_handler/dummy.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -89,7 +89,7 @@ where fn inject_inbound_closed(&mut self) {} #[inline] - fn connection_keep_alive(&self) -> bool { false } + fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Now } #[inline] fn shutdown(&mut self) { diff --git a/core/src/protocols_handler/fuse.rs b/core/src/protocols_handler/fuse.rs index 08250af5..3213cc8f 100644 --- a/core/src/protocols_handler/fuse.rs +++ b/core/src/protocols_handler/fuse.rs @@ -20,7 +20,7 @@ use crate::{ either::EitherOutput, - protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{ DeniedUpgrade, EitherUpgrade, @@ -124,11 +124,11 @@ where } #[inline] - fn connection_keep_alive(&self) -> bool { + fn connection_keep_alive(&self) -> KeepAlive { if let Some(inner) = self.inner.as_ref() { inner.connection_keep_alive() } else { - false + KeepAlive::Now } } diff --git a/core/src/protocols_handler/map_in.rs b/core/src/protocols_handler/map_in.rs index 9f80d248..42e8ca1f 100644 --- a/core/src/protocols_handler/map_in.rs +++ b/core/src/protocols_handler/map_in.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -100,7 +100,7 @@ where } #[inline] - fn connection_keep_alive(&self) -> bool { + fn connection_keep_alive(&self) -> KeepAlive { self.inner.connection_keep_alive() } diff --git a/core/src/protocols_handler/map_out.rs b/core/src/protocols_handler/map_out.rs index 6df74424..f8f122ca 100644 --- a/core/src/protocols_handler/map_out.rs +++ b/core/src/protocols_handler/map_out.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -95,7 +95,7 @@ where } #[inline] - fn connection_keep_alive(&self) -> bool { + fn connection_keep_alive(&self) -> KeepAlive { self.inner.connection_keep_alive() } diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index 56a2015c..954cb78e 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -40,7 +40,7 @@ use crate::upgrade::{ UpgradeError, }; use futures::prelude::*; -use std::{error, fmt, time::Duration}; +use std::{error, fmt, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; pub use self::dummy::DummyProtocolsHandler; @@ -142,22 +142,23 @@ pub trait ProtocolsHandler { /// therefore no more inbound substreams will be produced. fn inject_inbound_closed(&mut self); - /// Returns whether the connection should be kept alive. + /// Returns until when the connection should be kept alive. /// - /// If returns `false`, that indicates that this connection is not important and the user may - /// invoke `shutdown()` if they think that they will no longer need the connection in the - /// future. + /// If returns `Until`, that indicates that this connection may invoke `shutdown()` after the + /// returned `Instant` has elapsed if they think that they will no longer need the connection + /// in the future. Returning `Forever` is equivalent to "infinite". Returning `Now` is + /// equivalent to `Until(Instant::now())`. /// - /// On the other hand, returning `true` is only an indication and doesn't mean that the user + /// On the other hand, the return value is only an indication and doesn't mean that the user /// will not call `shutdown()`. /// - /// When multiple `ProtocolsHandler` are combined together, they should use *OR* to merge the - /// result of this method. + /// When multiple `ProtocolsHandler` are combined together, they should use return the largest + /// value of the two, or `Forever` if either returns `Forever`. /// /// The result of this method should be checked every time `poll()` is invoked. /// /// After `shutdown()` is called, the result of this method doesn't matter anymore. - fn connection_keep_alive(&self) -> bool; + fn connection_keep_alive(&self) -> KeepAlive; /// Indicates to the node that it should shut down. After that, it is expected that `poll()` /// returns `Ready(ProtocolsHandlerEvent::Shutdown)` as soon as possible. @@ -398,7 +399,7 @@ pub trait IntoProtocolsHandler { where Self: Sized, { - NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10), Duration::from_secs(5)) + NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10)) } } @@ -412,3 +413,24 @@ where T: ProtocolsHandler self } } + +/// How long the connection should be kept alive. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum KeepAlive { + /// If nothing new happens, the connection should be closed at the given `Instant`. + Until(Instant), + /// Keep the connection alive. + Forever, + /// Close the connection as soon as possible. + Now, +} + +impl KeepAlive { + /// Returns true for `Forever`, false otherwise. + pub fn is_forever(&self) -> bool { + match *self { + KeepAlive::Forever => true, + _ => false, + } + } +} diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index 3c5083a1..9d1c0d26 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -22,7 +22,7 @@ use crate::{ PeerId, nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}, nodes::handled_node_tasks::IntoNodeHandler, - protocols_handler::{ProtocolsHandler, IntoProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{KeepAlive, ProtocolsHandler, IntoProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{ self, OutboundUpgrade, @@ -42,8 +42,6 @@ pub struct NodeHandlerWrapperBuilder { in_timeout: Duration, /// Timeout for outgoing substreams negotiation. out_timeout: Duration, - /// Time after which a useless connection will be closed. - useless_timeout: Duration, } impl NodeHandlerWrapperBuilder @@ -52,12 +50,11 @@ where { /// Builds a `NodeHandlerWrapperBuilder`. #[inline] - pub(crate) fn new(handler: TIntoProtoHandler, in_timeout: Duration, out_timeout: Duration, useless_timeout: Duration) -> Self { + pub(crate) fn new(handler: TIntoProtoHandler, in_timeout: Duration, out_timeout: Duration) -> Self { NodeHandlerWrapperBuilder { handler, in_timeout, out_timeout, - useless_timeout, } } @@ -75,14 +72,6 @@ where self } - /// Sets the timeout between the moment `connection_keep_alive()` returns `false` on the - /// `ProtocolsHandler`, and the moment the connection is closed. - #[inline] - pub fn with_useless_timeout(mut self, timeout: Duration) -> Self { - self.useless_timeout = timeout; - self - } - /// Builds the `NodeHandlerWrapper`. #[deprecated(note = "Pass the NodeHandlerWrapperBuilder directly")] #[inline] @@ -98,7 +87,6 @@ where queued_dial_upgrades: Vec::new(), unique_dial_upgrade_id: 0, connection_shutdown: None, - useless_timeout: self.useless_timeout, } } } @@ -122,7 +110,6 @@ where queued_dial_upgrades: Vec::new(), unique_dial_upgrade_id: 0, connection_shutdown: None, - useless_timeout: self.useless_timeout, } } } @@ -156,9 +143,6 @@ where /// When a connection has been deemed useless, will contain `Some` with a `Delay` to when it /// should be shut down. connection_shutdown: Option, - /// Timeout after which a useless connection is closed. When the `connection_shutdown` is set - /// to `Some`, this is the value that is being used. - useless_timeout: Duration, } impl NodeHandler for NodeHandlerWrapper @@ -292,11 +276,11 @@ where loop { let poll_result = self.handler.poll()?; - if self.handler.connection_keep_alive() { - self.connection_shutdown = None; - } else if self.connection_shutdown.is_none() { - self.connection_shutdown = Some(Delay::new(Instant::now() + self.useless_timeout)); - } + self.connection_shutdown = match self.handler.connection_keep_alive() { + KeepAlive::Until(expiration) => Some(Delay::new(expiration)), + KeepAlive::Now => Some(Delay::new(Instant::now())), + KeepAlive::Forever => None, + }; match poll_result { Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { diff --git a/core/src/protocols_handler/one_shot.rs b/core/src/protocols_handler/one_shot.rs index f161a7b5..9d2a6251 100644 --- a/core/src/protocols_handler/one_shot.rs +++ b/core/src/protocols_handler/one_shot.rs @@ -18,11 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; +use crate::protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; use crate::upgrade::{InboundUpgrade, OutboundUpgrade}; use futures::prelude::*; use smallvec::SmallVec; -use std::{error, marker::PhantomData}; +use std::{error, marker::PhantomData, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; /// Implementation of `ProtocolsHandler` that opens a new substream for each individual message. @@ -46,6 +46,10 @@ where TOutProto: OutboundUpgrade dial_negotiated: u32, /// Maximum number of concurrent outbound substreams being opened. Value is never modified. max_dial_negotiated: u32, + /// Value to return from `connection_keep_alive`. + keep_alive: KeepAlive, + /// After the given duration has elapsed, an inactive connection will shutdown. + inactive_timeout: Duration, /// Pin the `TSubstream` generic. marker: PhantomData, } @@ -65,6 +69,8 @@ where TOutProto: OutboundUpgrade dial_queue: SmallVec::new(), dial_negotiated: 0, max_dial_negotiated: 8, + keep_alive: KeepAlive::Forever, + inactive_timeout: Duration::from_secs(10), // TODO: allow configuring marker: PhantomData, } } @@ -96,6 +102,7 @@ where TOutProto: OutboundUpgrade /// Opens an outbound substream with `upgrade`. #[inline] pub fn send_request(&mut self, upgrade: TOutProto) { + self.keep_alive = KeepAlive::Forever; self.dial_queue.push(upgrade); } } @@ -143,6 +150,11 @@ where return; } + // If we're shutting down the connection for inactivity, reset the timeout. + if !self.keep_alive.is_forever() { + self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); + } + self.events_out.push(out.into()); } @@ -154,6 +166,10 @@ where ) { self.dial_negotiated -= 1; + if self.dial_negotiated == 0 && self.dial_queue.is_empty() { + self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); + } + if self.shutting_down { return; } @@ -177,8 +193,8 @@ where fn inject_inbound_closed(&mut self) {} #[inline] - fn connection_keep_alive(&self) -> bool { - self.dial_negotiated != 0 || !self.dial_queue.is_empty() + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive } #[inline] diff --git a/core/src/protocols_handler/select.rs b/core/src/protocols_handler/select.rs index 3776da67..392e143f 100644 --- a/core/src/protocols_handler/select.rs +++ b/core/src/protocols_handler/select.rs @@ -23,6 +23,7 @@ use crate::{ either::EitherError, either::EitherOutput, protocols_handler::{ + KeepAlive, Fuse, IntoProtocolsHandler, ProtocolsHandler, @@ -38,6 +39,7 @@ use crate::{ } }; use futures::prelude::*; +use std::cmp; use tokio_io::{AsyncRead, AsyncWrite}; /// Implementation of `IntoProtocolsHandler` that combines two protocols into one. @@ -207,8 +209,12 @@ where } #[inline] - fn connection_keep_alive(&self) -> bool { - self.proto1.connection_keep_alive() || self.proto2.connection_keep_alive() + fn connection_keep_alive(&self) -> KeepAlive { + match (self.proto1.connection_keep_alive(), self.proto2.connection_keep_alive()) { + (KeepAlive::Forever, _) | (_, KeepAlive::Forever) => KeepAlive::Forever, + (a, KeepAlive::Now) | (KeepAlive::Now, a) => a, + (KeepAlive::Until(a), KeepAlive::Until(b)) => KeepAlive::Until(cmp::max(a, b)), + } } #[inline] diff --git a/protocols/identify/src/listen_handler.rs b/protocols/identify/src/listen_handler.rs index e2d0e4fb..e30ba0bd 100644 --- a/protocols/identify/src/listen_handler.rs +++ b/protocols/identify/src/listen_handler.rs @@ -21,7 +21,7 @@ use crate::protocol::{IdentifySender, IdentifyProtocolConfig}; use futures::prelude::*; use libp2p_core::{ - protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade} }; use smallvec::SmallVec; @@ -90,8 +90,8 @@ where fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} #[inline] - fn connection_keep_alive(&self) -> bool { - false + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::Now } #[inline] diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs index 6625e424..4f175600 100644 --- a/protocols/identify/src/periodic_id_handler.rs +++ b/protocols/identify/src/periodic_id_handler.rs @@ -21,7 +21,7 @@ use crate::protocol::{RemoteInfo, IdentifyProtocolConfig}; use futures::prelude::*; use libp2p_core::{ - protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, upgrade::{DeniedUpgrade, OutboundUpgrade} }; use std::{io, marker::PhantomData, time::{Duration, Instant}}; @@ -123,8 +123,12 @@ where } #[inline] - fn connection_keep_alive(&self) -> bool { - !self.first_id_happened + fn connection_keep_alive(&self) -> KeepAlive { + if self.first_id_happened { + KeepAlive::Now + } else { + KeepAlive::Forever + } } #[inline] diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 2c9a5cc7..84399ec2 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -23,10 +23,10 @@ use crate::protocol::{ KademliaProtocolConfig, }; use futures::prelude::*; -use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; +use libp2p_core::protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId}; use multihash::Multihash; -use std::{error, fmt, io}; +use std::{error, fmt, io, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; /// Protocol handler that handles Kademlia communications with the remote. @@ -54,6 +54,9 @@ where /// List of active substreams with the state they are in. substreams: Vec>, + + /// Until when to keep the connection alive. + keep_alive: KeepAlive, } /// State of an active substream, opened either by us or by the remote. @@ -322,6 +325,7 @@ where allow_listening, next_connec_unique_id: UniqueConnecId(0), substreams: Vec::new(), + keep_alive: KeepAlive::Forever, } } } @@ -490,8 +494,8 @@ where } #[inline] - fn connection_keep_alive(&self) -> bool { - !self.substreams.is_empty() + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive } #[inline] @@ -549,6 +553,12 @@ where } } + if self.substreams.is_empty() { + self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10)); + } else { + self.keep_alive = KeepAlive::Forever; + } + Ok(Async::NotReady) } }