connection_keep_alive() now returns KeepAlive (#899)

* connection_keep_alive() now returns Option<Instant>

* Use KeepAlive instead of Option<Instant>
This commit is contained in:
Pierre Krieger
2019-01-30 16:37:34 +01:00
committed by GitHub
parent bbf56c6371
commit 663ec7e8da
11 changed files with 100 additions and 58 deletions

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{ use crate::{
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{ upgrade::{
InboundUpgrade, InboundUpgrade,
OutboundUpgrade, OutboundUpgrade,
@ -89,7 +89,7 @@ where
fn inject_inbound_closed(&mut self) {} fn inject_inbound_closed(&mut self) {}
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { false } fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Now }
#[inline] #[inline]
fn shutdown(&mut self) { fn shutdown(&mut self) {

View File

@ -20,7 +20,7 @@
use crate::{ use crate::{
either::EitherOutput, either::EitherOutput,
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{ upgrade::{
DeniedUpgrade, DeniedUpgrade,
EitherUpgrade, EitherUpgrade,
@ -124,11 +124,11 @@ where
} }
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> KeepAlive {
if let Some(inner) = self.inner.as_ref() { if let Some(inner) = self.inner.as_ref() {
inner.connection_keep_alive() inner.connection_keep_alive()
} else { } else {
false KeepAlive::Now
} }
} }

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{ use crate::{
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{ upgrade::{
InboundUpgrade, InboundUpgrade,
OutboundUpgrade, OutboundUpgrade,
@ -100,7 +100,7 @@ where
} }
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> KeepAlive {
self.inner.connection_keep_alive() self.inner.connection_keep_alive()
} }

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{ use crate::{
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{ upgrade::{
InboundUpgrade, InboundUpgrade,
OutboundUpgrade, OutboundUpgrade,
@ -95,7 +95,7 @@ where
} }
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> KeepAlive {
self.inner.connection_keep_alive() self.inner.connection_keep_alive()
} }

View File

@ -40,7 +40,7 @@ use crate::upgrade::{
UpgradeError, UpgradeError,
}; };
use futures::prelude::*; use futures::prelude::*;
use std::{error, fmt, time::Duration}; use std::{error, fmt, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
pub use self::dummy::DummyProtocolsHandler; pub use self::dummy::DummyProtocolsHandler;
@ -142,22 +142,23 @@ pub trait ProtocolsHandler {
/// therefore no more inbound substreams will be produced. /// therefore no more inbound substreams will be produced.
fn inject_inbound_closed(&mut self); fn inject_inbound_closed(&mut self);
/// Returns whether the connection should be kept alive. /// Returns until when the connection should be kept alive.
/// ///
/// If returns `false`, that indicates that this connection is not important and the user may /// If returns `Until`, that indicates that this connection may invoke `shutdown()` after the
/// invoke `shutdown()` if they think that they will no longer need the connection in the /// returned `Instant` has elapsed if they think that they will no longer need the connection
/// future. /// in the future. Returning `Forever` is equivalent to "infinite". Returning `Now` is
/// equivalent to `Until(Instant::now())`.
/// ///
/// On the other hand, returning `true` is only an indication and doesn't mean that the user /// On the other hand, the return value is only an indication and doesn't mean that the user
/// will not call `shutdown()`. /// will not call `shutdown()`.
/// ///
/// When multiple `ProtocolsHandler` are combined together, they should use *OR* to merge the /// When multiple `ProtocolsHandler` are combined together, they should use return the largest
/// result of this method. /// value of the two, or `Forever` if either returns `Forever`.
/// ///
/// The result of this method should be checked every time `poll()` is invoked. /// The result of this method should be checked every time `poll()` is invoked.
/// ///
/// After `shutdown()` is called, the result of this method doesn't matter anymore. /// After `shutdown()` is called, the result of this method doesn't matter anymore.
fn connection_keep_alive(&self) -> bool; fn connection_keep_alive(&self) -> KeepAlive;
/// Indicates to the node that it should shut down. After that, it is expected that `poll()` /// Indicates to the node that it should shut down. After that, it is expected that `poll()`
/// returns `Ready(ProtocolsHandlerEvent::Shutdown)` as soon as possible. /// returns `Ready(ProtocolsHandlerEvent::Shutdown)` as soon as possible.
@ -398,7 +399,7 @@ pub trait IntoProtocolsHandler {
where where
Self: Sized, Self: Sized,
{ {
NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10), Duration::from_secs(5)) NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10))
} }
} }
@ -412,3 +413,24 @@ where T: ProtocolsHandler
self self
} }
} }
/// How long the connection should be kept alive.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum KeepAlive {
/// If nothing new happens, the connection should be closed at the given `Instant`.
Until(Instant),
/// Keep the connection alive.
Forever,
/// Close the connection as soon as possible.
Now,
}
impl KeepAlive {
/// Returns true for `Forever`, false otherwise.
pub fn is_forever(&self) -> bool {
match *self {
KeepAlive::Forever => true,
_ => false,
}
}
}

View File

@ -22,7 +22,7 @@ use crate::{
PeerId, PeerId,
nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent}, nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent},
nodes::handled_node_tasks::IntoNodeHandler, nodes::handled_node_tasks::IntoNodeHandler,
protocols_handler::{ProtocolsHandler, IntoProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, protocols_handler::{KeepAlive, ProtocolsHandler, IntoProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{ upgrade::{
self, self,
OutboundUpgrade, OutboundUpgrade,
@ -42,8 +42,6 @@ pub struct NodeHandlerWrapperBuilder<TIntoProtoHandler> {
in_timeout: Duration, in_timeout: Duration,
/// Timeout for outgoing substreams negotiation. /// Timeout for outgoing substreams negotiation.
out_timeout: Duration, out_timeout: Duration,
/// Time after which a useless connection will be closed.
useless_timeout: Duration,
} }
impl<TIntoProtoHandler> NodeHandlerWrapperBuilder<TIntoProtoHandler> impl<TIntoProtoHandler> NodeHandlerWrapperBuilder<TIntoProtoHandler>
@ -52,12 +50,11 @@ where
{ {
/// Builds a `NodeHandlerWrapperBuilder`. /// Builds a `NodeHandlerWrapperBuilder`.
#[inline] #[inline]
pub(crate) fn new(handler: TIntoProtoHandler, in_timeout: Duration, out_timeout: Duration, useless_timeout: Duration) -> Self { pub(crate) fn new(handler: TIntoProtoHandler, in_timeout: Duration, out_timeout: Duration) -> Self {
NodeHandlerWrapperBuilder { NodeHandlerWrapperBuilder {
handler, handler,
in_timeout, in_timeout,
out_timeout, out_timeout,
useless_timeout,
} }
} }
@ -75,14 +72,6 @@ where
self self
} }
/// Sets the timeout between the moment `connection_keep_alive()` returns `false` on the
/// `ProtocolsHandler`, and the moment the connection is closed.
#[inline]
pub fn with_useless_timeout(mut self, timeout: Duration) -> Self {
self.useless_timeout = timeout;
self
}
/// Builds the `NodeHandlerWrapper`. /// Builds the `NodeHandlerWrapper`.
#[deprecated(note = "Pass the NodeHandlerWrapperBuilder directly")] #[deprecated(note = "Pass the NodeHandlerWrapperBuilder directly")]
#[inline] #[inline]
@ -98,7 +87,6 @@ where
queued_dial_upgrades: Vec::new(), queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0, unique_dial_upgrade_id: 0,
connection_shutdown: None, connection_shutdown: None,
useless_timeout: self.useless_timeout,
} }
} }
} }
@ -122,7 +110,6 @@ where
queued_dial_upgrades: Vec::new(), queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0, unique_dial_upgrade_id: 0,
connection_shutdown: None, connection_shutdown: None,
useless_timeout: self.useless_timeout,
} }
} }
} }
@ -156,9 +143,6 @@ where
/// When a connection has been deemed useless, will contain `Some` with a `Delay` to when it /// When a connection has been deemed useless, will contain `Some` with a `Delay` to when it
/// should be shut down. /// should be shut down.
connection_shutdown: Option<Delay>, connection_shutdown: Option<Delay>,
/// Timeout after which a useless connection is closed. When the `connection_shutdown` is set
/// to `Some`, this is the value that is being used.
useless_timeout: Duration,
} }
impl<TProtoHandler> NodeHandler for NodeHandlerWrapper<TProtoHandler> impl<TProtoHandler> NodeHandler for NodeHandlerWrapper<TProtoHandler>
@ -292,11 +276,11 @@ where
loop { loop {
let poll_result = self.handler.poll()?; let poll_result = self.handler.poll()?;
if self.handler.connection_keep_alive() { self.connection_shutdown = match self.handler.connection_keep_alive() {
self.connection_shutdown = None; KeepAlive::Until(expiration) => Some(Delay::new(expiration)),
} else if self.connection_shutdown.is_none() { KeepAlive::Now => Some(Delay::new(Instant::now())),
self.connection_shutdown = Some(Delay::new(Instant::now() + self.useless_timeout)); KeepAlive::Forever => None,
} };
match poll_result { match poll_result {
Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { Async::Ready(ProtocolsHandlerEvent::Custom(event)) => {

View File

@ -18,11 +18,11 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; use crate::protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr};
use crate::upgrade::{InboundUpgrade, OutboundUpgrade}; use crate::upgrade::{InboundUpgrade, OutboundUpgrade};
use futures::prelude::*; use futures::prelude::*;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{error, marker::PhantomData}; use std::{error, marker::PhantomData, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
/// Implementation of `ProtocolsHandler` that opens a new substream for each individual message. /// Implementation of `ProtocolsHandler` that opens a new substream for each individual message.
@ -46,6 +46,10 @@ where TOutProto: OutboundUpgrade<TSubstream>
dial_negotiated: u32, dial_negotiated: u32,
/// Maximum number of concurrent outbound substreams being opened. Value is never modified. /// Maximum number of concurrent outbound substreams being opened. Value is never modified.
max_dial_negotiated: u32, max_dial_negotiated: u32,
/// Value to return from `connection_keep_alive`.
keep_alive: KeepAlive,
/// After the given duration has elapsed, an inactive connection will shutdown.
inactive_timeout: Duration,
/// Pin the `TSubstream` generic. /// Pin the `TSubstream` generic.
marker: PhantomData<TSubstream>, marker: PhantomData<TSubstream>,
} }
@ -65,6 +69,8 @@ where TOutProto: OutboundUpgrade<TSubstream>
dial_queue: SmallVec::new(), dial_queue: SmallVec::new(),
dial_negotiated: 0, dial_negotiated: 0,
max_dial_negotiated: 8, max_dial_negotiated: 8,
keep_alive: KeepAlive::Forever,
inactive_timeout: Duration::from_secs(10), // TODO: allow configuring
marker: PhantomData, marker: PhantomData,
} }
} }
@ -96,6 +102,7 @@ where TOutProto: OutboundUpgrade<TSubstream>
/// Opens an outbound substream with `upgrade`. /// Opens an outbound substream with `upgrade`.
#[inline] #[inline]
pub fn send_request(&mut self, upgrade: TOutProto) { pub fn send_request(&mut self, upgrade: TOutProto) {
self.keep_alive = KeepAlive::Forever;
self.dial_queue.push(upgrade); self.dial_queue.push(upgrade);
} }
} }
@ -143,6 +150,11 @@ where
return; return;
} }
// If we're shutting down the connection for inactivity, reset the timeout.
if !self.keep_alive.is_forever() {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
}
self.events_out.push(out.into()); self.events_out.push(out.into());
} }
@ -154,6 +166,10 @@ where
) { ) {
self.dial_negotiated -= 1; self.dial_negotiated -= 1;
if self.dial_negotiated == 0 && self.dial_queue.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout);
}
if self.shutting_down { if self.shutting_down {
return; return;
} }
@ -177,8 +193,8 @@ where
fn inject_inbound_closed(&mut self) {} fn inject_inbound_closed(&mut self) {}
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> KeepAlive {
self.dial_negotiated != 0 || !self.dial_queue.is_empty() self.keep_alive
} }
#[inline] #[inline]

View File

@ -23,6 +23,7 @@ use crate::{
either::EitherError, either::EitherError,
either::EitherOutput, either::EitherOutput,
protocols_handler::{ protocols_handler::{
KeepAlive,
Fuse, Fuse,
IntoProtocolsHandler, IntoProtocolsHandler,
ProtocolsHandler, ProtocolsHandler,
@ -38,6 +39,7 @@ use crate::{
} }
}; };
use futures::prelude::*; use futures::prelude::*;
use std::cmp;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
/// Implementation of `IntoProtocolsHandler` that combines two protocols into one. /// Implementation of `IntoProtocolsHandler` that combines two protocols into one.
@ -207,8 +209,12 @@ where
} }
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> KeepAlive {
self.proto1.connection_keep_alive() || self.proto2.connection_keep_alive() match (self.proto1.connection_keep_alive(), self.proto2.connection_keep_alive()) {
(KeepAlive::Forever, _) | (_, KeepAlive::Forever) => KeepAlive::Forever,
(a, KeepAlive::Now) | (KeepAlive::Now, a) => a,
(KeepAlive::Until(a), KeepAlive::Until(b)) => KeepAlive::Until(cmp::max(a, b)),
}
} }
#[inline] #[inline]

View File

@ -21,7 +21,7 @@
use crate::protocol::{IdentifySender, IdentifyProtocolConfig}; use crate::protocol::{IdentifySender, IdentifyProtocolConfig};
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::{ use libp2p_core::{
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade} upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}
}; };
use smallvec::SmallVec; use smallvec::SmallVec;
@ -90,8 +90,8 @@ where
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {} fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> KeepAlive {
false KeepAlive::Now
} }
#[inline] #[inline]

View File

@ -21,7 +21,7 @@
use crate::protocol::{RemoteInfo, IdentifyProtocolConfig}; use crate::protocol::{RemoteInfo, IdentifyProtocolConfig};
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::{ use libp2p_core::{
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
upgrade::{DeniedUpgrade, OutboundUpgrade} upgrade::{DeniedUpgrade, OutboundUpgrade}
}; };
use std::{io, marker::PhantomData, time::{Duration, Instant}}; use std::{io, marker::PhantomData, time::{Duration, Instant}};
@ -123,8 +123,12 @@ where
} }
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> KeepAlive {
!self.first_id_happened if self.first_id_happened {
KeepAlive::Now
} else {
KeepAlive::Forever
}
} }
#[inline] #[inline]

View File

@ -23,10 +23,10 @@ use crate::protocol::{
KademliaProtocolConfig, KademliaProtocolConfig,
}; };
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; use libp2p_core::protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr};
use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId}; use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId};
use multihash::Multihash; use multihash::Multihash;
use std::{error, fmt, io}; use std::{error, fmt, io, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
/// Protocol handler that handles Kademlia communications with the remote. /// Protocol handler that handles Kademlia communications with the remote.
@ -54,6 +54,9 @@ where
/// List of active substreams with the state they are in. /// List of active substreams with the state they are in.
substreams: Vec<SubstreamState<TSubstream, TUserData>>, substreams: Vec<SubstreamState<TSubstream, TUserData>>,
/// Until when to keep the connection alive.
keep_alive: KeepAlive,
} }
/// State of an active substream, opened either by us or by the remote. /// State of an active substream, opened either by us or by the remote.
@ -322,6 +325,7 @@ where
allow_listening, allow_listening,
next_connec_unique_id: UniqueConnecId(0), next_connec_unique_id: UniqueConnecId(0),
substreams: Vec::new(), substreams: Vec::new(),
keep_alive: KeepAlive::Forever,
} }
} }
} }
@ -490,8 +494,8 @@ where
} }
#[inline] #[inline]
fn connection_keep_alive(&self) -> bool { fn connection_keep_alive(&self) -> KeepAlive {
!self.substreams.is_empty() self.keep_alive
} }
#[inline] #[inline]
@ -549,6 +553,12 @@ where
} }
} }
if self.substreams.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10));
} else {
self.keep_alive = KeepAlive::Forever;
}
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }