From 439dc8246ec83b37eae900e60b1b72b7c1a49b6d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 19 Mar 2020 17:01:34 +0100 Subject: [PATCH] Allow customizing the delay before closing a Kademlia connection (#1477) * Reduce the delay before closing a Kademlia connection * Rework pull request * Update protocols/kad/src/behaviour.rs Co-Authored-By: Roman Borschel * Update protocols/kad/src/behaviour.rs Co-Authored-By: Roman Borschel * Rework the pull request Co-authored-by: Roman Borschel --- protocols/kad/src/behaviour.rs | 29 +++++++++++--- protocols/kad/src/handler.rs | 70 +++++++++++++++++----------------- 2 files changed, 58 insertions(+), 41 deletions(-) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 83666458..2da11ee1 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -24,10 +24,10 @@ mod test; use crate::K_VALUE; use crate::addresses::Addresses; -use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn}; +use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn}; use crate::jobs::*; use crate::kbucket::{self, KBucketsTable, NodeStatus}; -use crate::protocol::{KadConnectionType, KadPeer}; +use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer}; use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState}; use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord}; use fnv::{FnvHashMap, FnvHashSet}; @@ -77,6 +77,9 @@ pub struct Kademlia { /// The TTL of provider records. provider_record_ttl: Option, + /// How long to keep connections alive when they're idle. + connection_idle_timeout: Duration, + /// Queued events to return when the behaviour is being polled. queued_events: VecDeque, KademliaEvent>>, @@ -97,6 +100,7 @@ pub struct KademliaConfig { record_publication_interval: Option, provider_record_ttl: Option, provider_publication_interval: Option, + connection_idle_timeout: Duration, } impl Default for KademliaConfig { @@ -110,6 +114,7 @@ impl Default for KademliaConfig { record_publication_interval: Some(Duration::from_secs(24 * 60 * 60)), provider_publication_interval: Some(Duration::from_secs(12 * 60 * 60)), provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)), + connection_idle_timeout: Duration::from_secs(10), } } } @@ -217,6 +222,12 @@ impl KademliaConfig { self.provider_publication_interval = interval; self } + + /// Sets the amount of time to keep connections alive when they're idle. + pub fn set_connection_idle_timeout(&mut self, duration: Duration) -> &mut Self { + self.connection_idle_timeout = duration; + self + } } impl Kademlia @@ -264,6 +275,7 @@ where put_record_job, record_ttl: config.record_ttl, provider_record_ttl: config.provider_record_ttl, + connection_idle_timeout: config.connection_idle_timeout, } } @@ -1052,11 +1064,16 @@ where type OutEvent = KademliaEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - let mut handler = KademliaHandler::dial_and_listen(); + let mut protocol_config = KademliaProtocolConfig::default(); if let Some(name) = self.protocol_name_override.as_ref() { - handler = handler.with_protocol_name(name.clone()); + protocol_config = protocol_config.with_protocol_name(name.clone()); } - handler + + KademliaHandler::new(KademliaHandlerConfig { + protocol_config, + allow_listening: true, + idle_timeout: self.connection_idle_timeout, + }) } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { @@ -1335,7 +1352,7 @@ where fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll< NetworkBehaviourAction< - ::InEvent, + as ProtocolsHandler>::InEvent, Self::OutEvent, >, > { diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index ac2430f0..57f3c55b 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -37,7 +37,7 @@ use libp2p_core::{ upgrade::{self, InboundUpgrade, OutboundUpgrade} }; use log::trace; -use std::{borrow::Cow, error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; +use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; /// Protocol handler that handles Kademlia communications with the remote. @@ -48,10 +48,7 @@ use wasm_timer::Instant; /// It also handles requests made by the remote. pub struct KademliaHandler { /// Configuration for the Kademlia protocol. - config: KademliaProtocolConfig, - - /// If false, we always refuse incoming Kademlia substreams. - allow_listening: bool, + config: KademliaHandlerConfig, /// Next unique ID of a connection. next_connec_unique_id: UniqueConnecId, @@ -63,6 +60,19 @@ pub struct KademliaHandler { keep_alive: KeepAlive, } +/// Configuration of a [`KademliaHandler`]. +#[derive(Debug, Clone)] +pub struct KademliaHandlerConfig { + /// Configuration of the wire protocol. + pub protocol_config: KademliaProtocolConfig, + + /// If false, we deny incoming requests. + pub allow_listening: bool, + + /// Time after which we close an idle connection. + pub idle_timeout: Duration, +} + /// State of an active substream, opened either by us or by the remote. enum SubstreamState { /// We haven't started opening the outgoing substream yet. @@ -369,42 +379,22 @@ pub struct KademliaRequestId { struct UniqueConnecId(u64); impl KademliaHandler { - /// Create a `KademliaHandler` that only allows sending messages to the remote but denying - /// incoming connections. - pub fn dial_only() -> Self { - KademliaHandler::with_allow_listening(false) - } + /// Create a [`KademliaHandler`] using the given configuration. + pub fn new(config: KademliaHandlerConfig) -> Self { + let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); - /// Create a `KademliaHandler` that only allows sending messages but also receive incoming - /// requests. - /// - /// The `Default` trait implementation wraps around this function. - pub fn dial_and_listen() -> Self { - KademliaHandler::with_allow_listening(true) - } - - fn with_allow_listening(allow_listening: bool) -> Self { KademliaHandler { - config: Default::default(), - allow_listening, + config, next_connec_unique_id: UniqueConnecId(0), substreams: Vec::new(), - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(10)), + keep_alive, } } - - /// Modifies the protocol name used on the wire. Can be used to create incompatibilities - /// between networks on purpose. - pub fn with_protocol_name(mut self, name: impl Into>) -> Self { - self.config = self.config.with_protocol_name(name); - self - } } impl Default for KademliaHandler { - #[inline] fn default() -> Self { - KademliaHandler::dial_and_listen() + KademliaHandler::new(Default::default()) } } @@ -422,8 +412,8 @@ where #[inline] fn listen_protocol(&self) -> SubstreamProtocol { - if self.allow_listening { - SubstreamProtocol::new(self.config.clone()).map_upgrade(upgrade::EitherUpgrade::A) + if self.config.allow_listening { + SubstreamProtocol::new(self.config.protocol_config.clone()).map_upgrade(upgrade::EitherUpgrade::A) } else { SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)) } @@ -449,7 +439,7 @@ where EitherOutput::Second(p) => void::unreachable(p), }; - debug_assert!(self.allow_listening); + debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; self.substreams @@ -635,7 +625,7 @@ where let mut substream = self.substreams.swap_remove(n); loop { - match advance_substream(substream, self.config.clone(), cx) { + match advance_substream(substream, self.config.protocol_config.clone(), cx) { (Some(new_state), Some(event), _) => { self.substreams.push(new_state); return Poll::Ready(event); @@ -672,6 +662,16 @@ where } } +impl Default for KademliaHandlerConfig { + fn default() -> Self { + KademliaHandlerConfig { + protocol_config: Default::default(), + allow_listening: true, + idle_timeout: Duration::from_secs(10), + } + } +} + /// Advances one substream. /// /// Returns the new state for that substream, an event to generate, and whether the substream