mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-28 01:01:34 +00:00
feat(swarm): allow configuration to idle connection timeout
Previously, a connection would be shut down immediately as soon as its `ConnectionHandler` reports `KeepAlive::No`. As we have gained experience with libp2p, it turned out that this isn't ideal. For one, tests often need to keep connections alive longer than the configured protocols require. Plus, some usecases require connections to be kept alive in general. Both of these needs are currently served by the `keep_alive::Behaviour`. That one does essentially nothing other than statically returning `KeepAlive::Yes` from its `ConnectionHandler`. It makes much more sense to deprecate `keep_alive::Behaviour` and instead allow users to globally configure an `idle_conncetion_timeout` on the `Swarm`. This timeout comes into effect once a `ConnectionHandler` reports `KeepAlive::No`. To start with, this timeout is 0. Together with https://github.com/libp2p/rust-libp2p/issues/3844, this will allow us to move towards a much more aggressive closing of idle connections, together with a more ergonomic way of opting out of this behaviour. Fixes #4121. Pull-Request: #4161.
This commit is contained in:
@ -52,6 +52,7 @@ use libp2p_core::upgrade;
|
||||
use libp2p_core::upgrade::{NegotiationError, ProtocolError};
|
||||
use libp2p_core::Endpoint;
|
||||
use libp2p_identity::PeerId;
|
||||
use std::cmp::max;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::future::Future;
|
||||
@ -156,6 +157,7 @@ where
|
||||
|
||||
local_supported_protocols: HashSet<StreamProtocol>,
|
||||
remote_supported_protocols: HashSet<StreamProtocol>,
|
||||
idle_timeout: Duration,
|
||||
}
|
||||
|
||||
impl<THandler> fmt::Debug for Connection<THandler>
|
||||
@ -183,9 +185,9 @@ where
|
||||
mut handler: THandler,
|
||||
substream_upgrade_protocol_override: Option<upgrade::Version>,
|
||||
max_negotiating_inbound_streams: usize,
|
||||
idle_timeout: Duration,
|
||||
) -> Self {
|
||||
let initial_protocols = gather_supported_protocols(&handler);
|
||||
|
||||
if !initial_protocols.is_empty() {
|
||||
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(
|
||||
ProtocolsChange::Added(ProtocolsAdded::from_set(&initial_protocols)),
|
||||
@ -203,6 +205,7 @@ where
|
||||
requested_substreams: Default::default(),
|
||||
local_supported_protocols: initial_protocols,
|
||||
remote_supported_protocols: Default::default(),
|
||||
idle_timeout,
|
||||
}
|
||||
}
|
||||
|
||||
@ -234,6 +237,7 @@ where
|
||||
substream_upgrade_protocol_override,
|
||||
local_supported_protocols: supported_protocols,
|
||||
remote_supported_protocols,
|
||||
idle_timeout,
|
||||
} = self.get_mut();
|
||||
|
||||
loop {
|
||||
@ -348,17 +352,36 @@ where
|
||||
(Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => {
|
||||
if *deadline != t {
|
||||
*deadline = t;
|
||||
if let Some(dur) = deadline.checked_duration_since(Instant::now()) {
|
||||
timer.reset(dur)
|
||||
if let Some(new_duration) = deadline.checked_duration_since(Instant::now())
|
||||
{
|
||||
let effective_keep_alive = max(new_duration, *idle_timeout);
|
||||
|
||||
timer.reset(effective_keep_alive)
|
||||
}
|
||||
}
|
||||
}
|
||||
(_, KeepAlive::Until(t)) => {
|
||||
if let Some(dur) = t.checked_duration_since(Instant::now()) {
|
||||
*shutdown = Shutdown::Later(Delay::new(dur), t)
|
||||
(_, KeepAlive::Until(earliest_shutdown)) => {
|
||||
if let Some(requested_keep_alive) =
|
||||
earliest_shutdown.checked_duration_since(Instant::now())
|
||||
{
|
||||
let effective_keep_alive = max(requested_keep_alive, *idle_timeout);
|
||||
|
||||
// Important: We store the _original_ `Instant` given by the `ConnectionHandler` in the `Later` instance to ensure we can compare it in the above branch.
|
||||
// This is quite subtle but will hopefully become simpler soon once `KeepAlive::Until` is fully deprecated. See <https://github.com/libp2p/rust-libp2p/issues/3844>/
|
||||
*shutdown =
|
||||
Shutdown::Later(Delay::new(effective_keep_alive), earliest_shutdown)
|
||||
}
|
||||
}
|
||||
(_, KeepAlive::No) => *shutdown = Shutdown::Asap,
|
||||
(_, KeepAlive::No) if idle_timeout == &Duration::ZERO => {
|
||||
*shutdown = Shutdown::Asap;
|
||||
}
|
||||
(Shutdown::Later(_, _), KeepAlive::No) => {
|
||||
// Do nothing, i.e. let the shutdown timer continue to tick.
|
||||
}
|
||||
(_, KeepAlive::No) => {
|
||||
let deadline = Instant::now() + *idle_timeout;
|
||||
*shutdown = Shutdown::Later(Delay::new(*idle_timeout), deadline);
|
||||
}
|
||||
(_, KeepAlive::Yes) => *shutdown = Shutdown::None,
|
||||
};
|
||||
|
||||
@ -696,7 +719,7 @@ enum Shutdown {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::keep_alive;
|
||||
use crate::dummy;
|
||||
use futures::future;
|
||||
use futures::AsyncRead;
|
||||
use futures::AsyncWrite;
|
||||
@ -704,6 +727,7 @@ mod tests {
|
||||
use libp2p_core::StreamMuxer;
|
||||
use quickcheck::*;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Instant;
|
||||
use void::Void;
|
||||
|
||||
#[test]
|
||||
@ -712,14 +736,14 @@ mod tests {
|
||||
let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into();
|
||||
|
||||
let alive_substream_counter = Arc::new(());
|
||||
|
||||
let mut connection = Connection::new(
|
||||
StreamMuxerBox::new(DummyStreamMuxer {
|
||||
counter: alive_substream_counter.clone(),
|
||||
}),
|
||||
keep_alive::ConnectionHandler,
|
||||
MockConnectionHandler::new(Duration::ZERO),
|
||||
None,
|
||||
max_negotiating_inbound_streams,
|
||||
Duration::ZERO,
|
||||
);
|
||||
|
||||
let result = connection.poll_noop_waker();
|
||||
@ -743,6 +767,7 @@ mod tests {
|
||||
MockConnectionHandler::new(upgrade_timeout),
|
||||
None,
|
||||
2,
|
||||
Duration::ZERO,
|
||||
);
|
||||
|
||||
connection.handler.open_new_outbound();
|
||||
@ -765,6 +790,7 @@ mod tests {
|
||||
ConfigurableProtocolConnectionHandler::default(),
|
||||
None,
|
||||
0,
|
||||
Duration::ZERO,
|
||||
);
|
||||
|
||||
// First, start listening on a single protocol.
|
||||
@ -803,6 +829,7 @@ mod tests {
|
||||
ConfigurableProtocolConnectionHandler::default(),
|
||||
None,
|
||||
0,
|
||||
Duration::ZERO,
|
||||
);
|
||||
|
||||
// First, remote supports a single protocol.
|
||||
@ -846,6 +873,141 @@ mod tests {
|
||||
assert_eq!(connection.handler.remote_removed, vec![vec!["/bar"]]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn idle_timeout_with_keep_alive_no() {
|
||||
let idle_timeout = Duration::from_millis(100);
|
||||
|
||||
let mut connection = Connection::new(
|
||||
StreamMuxerBox::new(PendingStreamMuxer),
|
||||
dummy::ConnectionHandler,
|
||||
None,
|
||||
0,
|
||||
idle_timeout,
|
||||
);
|
||||
|
||||
assert!(connection.poll_noop_waker().is_pending());
|
||||
|
||||
tokio::time::sleep(idle_timeout).await;
|
||||
|
||||
assert!(matches!(
|
||||
connection.poll_noop_waker(),
|
||||
Poll::Ready(Err(ConnectionError::KeepAliveTimeout))
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn idle_timeout_with_keep_alive_until_greater_than_idle_timeout() {
|
||||
let idle_timeout = Duration::from_millis(100);
|
||||
|
||||
let mut connection = Connection::new(
|
||||
StreamMuxerBox::new(PendingStreamMuxer),
|
||||
KeepAliveUntilConnectionHandler {
|
||||
until: Instant::now() + idle_timeout * 2,
|
||||
},
|
||||
None,
|
||||
0,
|
||||
idle_timeout,
|
||||
);
|
||||
|
||||
assert!(connection.poll_noop_waker().is_pending());
|
||||
|
||||
tokio::time::sleep(idle_timeout).await;
|
||||
|
||||
assert!(
|
||||
connection.poll_noop_waker().is_pending(),
|
||||
"`KeepAlive::Until` is greater than idle-timeout, continue sleeping"
|
||||
);
|
||||
|
||||
tokio::time::sleep(idle_timeout).await;
|
||||
|
||||
assert!(matches!(
|
||||
connection.poll_noop_waker(),
|
||||
Poll::Ready(Err(ConnectionError::KeepAliveTimeout))
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn idle_timeout_with_keep_alive_until_less_than_idle_timeout() {
|
||||
let idle_timeout = Duration::from_millis(100);
|
||||
|
||||
let mut connection = Connection::new(
|
||||
StreamMuxerBox::new(PendingStreamMuxer),
|
||||
KeepAliveUntilConnectionHandler {
|
||||
until: Instant::now() + idle_timeout / 2,
|
||||
},
|
||||
None,
|
||||
0,
|
||||
idle_timeout,
|
||||
);
|
||||
|
||||
assert!(connection.poll_noop_waker().is_pending());
|
||||
|
||||
tokio::time::sleep(idle_timeout / 2).await;
|
||||
|
||||
assert!(
|
||||
connection.poll_noop_waker().is_pending(),
|
||||
"`KeepAlive::Until` is less than idle-timeout, honor idle-timeout"
|
||||
);
|
||||
|
||||
tokio::time::sleep(idle_timeout / 2).await;
|
||||
|
||||
assert!(matches!(
|
||||
connection.poll_noop_waker(),
|
||||
Poll::Ready(Err(ConnectionError::KeepAliveTimeout))
|
||||
));
|
||||
}
|
||||
|
||||
struct KeepAliveUntilConnectionHandler {
|
||||
until: Instant,
|
||||
}
|
||||
|
||||
impl ConnectionHandler for KeepAliveUntilConnectionHandler {
|
||||
type FromBehaviour = Void;
|
||||
type ToBehaviour = Void;
|
||||
type Error = Void;
|
||||
type InboundProtocol = DeniedUpgrade;
|
||||
type OutboundProtocol = DeniedUpgrade;
|
||||
type InboundOpenInfo = ();
|
||||
type OutboundOpenInfo = Void;
|
||||
|
||||
fn listen_protocol(
|
||||
&self,
|
||||
) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
SubstreamProtocol::new(DeniedUpgrade, ())
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
KeepAlive::Until(self.until)
|
||||
}
|
||||
|
||||
fn poll(
|
||||
&mut self,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<
|
||||
ConnectionHandlerEvent<
|
||||
Self::OutboundProtocol,
|
||||
Self::OutboundOpenInfo,
|
||||
Self::ToBehaviour,
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
fn on_behaviour_event(&mut self, _: Self::FromBehaviour) {}
|
||||
|
||||
fn on_connection_event(
|
||||
&mut self,
|
||||
_: ConnectionEvent<
|
||||
Self::InboundProtocol,
|
||||
Self::OutboundProtocol,
|
||||
Self::InboundOpenInfo,
|
||||
Self::OutboundOpenInfo,
|
||||
>,
|
||||
) {
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyStreamMuxer {
|
||||
counter: Arc<()>,
|
||||
}
|
||||
|
Reference in New Issue
Block a user