feat(kad): allow to explicitly set Mode::{Client,Server}

The current implementation in Kademlia relies on an external address to determine if it should be in client or server mode. However there are instances where a node, which may know an external address, wishes to remain in client mode and switch to server mode at a later point.

This PR introduces `Kademlia::set_mode`, which accepts an `Option<Mode>` that would allow one to set `Mode::Client` for client mode, `Mode::Server` for server mode, or `None` to determine if we should operate as a client or server based on our external addresses.

Resolves #4074.

Pull-Request: #4132.
This commit is contained in:
Darius Clark 2023-07-04 02:07:56 -04:00 committed by GitHub
parent 54640928f6
commit cbb9c7c0f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 153 additions and 60 deletions

2
Cargo.lock generated
View File

@ -2767,7 +2767,7 @@ dependencies = [
[[package]]
name = "libp2p-kad"
version = "0.44.1"
version = "0.44.2"
dependencies = [
"arrayvec",
"async-std",

View File

@ -73,7 +73,7 @@ libp2p-floodsub = { version = "0.43.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.45.0", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.43.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.1" }
libp2p-kad = { version = "0.44.1", path = "protocols/kad" }
libp2p-kad = { version = "0.44.2", path = "protocols/kad" }
libp2p-mdns = { version = "0.44.0", path = "protocols/mdns" }
libp2p-metrics = { version = "0.13.0", path = "misc/metrics" }
libp2p-mplex = { version = "0.40.0", path = "muxers/mplex" }

View File

@ -1,3 +1,10 @@
## 0.44.2 - unreleased
- Allow to explicitly set `Mode::{Client,Server}`.
See [PR 4132]
[PR 4132]: https://github.com/libp2p/rust-libp2p/pull/4132
## 0.44.1
- Expose `KBucketDistance`.

View File

@ -3,7 +3,7 @@ name = "libp2p-kad"
edition = "2021"
rust-version = "1.65.0"
description = "Kademlia protocol for libp2p"
version = "0.44.1"
version = "0.44.2"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -52,7 +52,7 @@ use smallvec::SmallVec;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::fmt;
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
use std::vec;
use thiserror::Error;
@ -114,6 +114,8 @@ pub struct Kademlia<TStore> {
local_peer_id: PeerId,
mode: Mode,
auto_mode: bool,
no_events_waker: Option<Waker>,
/// The record storage.
store: TStore,
@ -456,6 +458,8 @@ where
local_peer_id: id,
connections: Default::default(),
mode: Mode::Client,
auto_mode: true,
no_events_waker: None,
}
}
@ -990,6 +994,94 @@ where
id
}
/// Set the [`Mode`] in which we should operate.
///
/// By default, we are in [`Mode::Client`] and will swap into [`Mode::Server`] as soon as we have a confirmed, external address via [`FromSwarm::ExternalAddrConfirmed`].
///
/// Setting a mode via this function disables this automatic behaviour and unconditionally operates in the specified mode.
/// To reactivate the automatic configuration, pass [`None`] instead.
pub fn set_mode(&mut self, mode: Option<Mode>) {
match mode {
Some(mode) => {
self.mode = mode;
self.auto_mode = false;
self.reconfigure_mode();
}
None => {
self.auto_mode = true;
self.determine_mode_from_external_addresses();
}
}
if let Some(waker) = self.no_events_waker.take() {
waker.wake();
}
}
fn reconfigure_mode(&mut self) {
if self.connections.is_empty() {
return;
}
let num_connections = self.connections.len();
log::debug!(
"Re-configuring {} established connection{}",
num_connections,
if num_connections > 1 { "s" } else { "" }
);
self.queued_events
.extend(
self.connections
.iter()
.map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*conn_id),
event: KademliaHandlerIn::ReconfigureMode {
new_mode: self.mode,
},
}),
);
}
fn determine_mode_from_external_addresses(&mut self) {
self.mode = match (self.external_addresses.as_slice(), self.mode) {
([], Mode::Server) => {
log::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
Mode::Client
}
([], Mode::Client) => {
// Previously client-mode, now also client-mode because no external addresses.
Mode::Client
}
(confirmed_external_addresses, Mode::Client) => {
if log::log_enabled!(log::Level::Debug) {
let confirmed_external_addresses =
to_comma_separated_list(confirmed_external_addresses);
log::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
}
Mode::Server
}
(confirmed_external_addresses, Mode::Server) => {
debug_assert!(
!confirmed_external_addresses.is_empty(),
"Previous match arm handled empty list"
);
// Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.
Mode::Server
}
};
self.reconfigure_mode();
}
/// Processes discovered peers from a successful request in an iterative `Query`.
fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
where
@ -2428,6 +2520,8 @@ where
// If no new events have been queued either, signal `NotReady` to
// be polled again later.
if self.queued_events.is_empty() {
self.no_events_waker = Some(cx.waker().clone());
return Poll::Pending;
}
}
@ -2437,60 +2531,8 @@ where
self.listen_addresses.on_swarm_event(&event);
let external_addresses_changed = self.external_addresses.on_swarm_event(&event);
self.mode = match (self.external_addresses.as_slice(), self.mode) {
([], Mode::Server) => {
log::debug!("Switching to client-mode because we no longer have any confirmed external addresses");
Mode::Client
}
([], Mode::Client) => {
// Previously client-mode, now also client-mode because no external addresses.
Mode::Client
}
(confirmed_external_addresses, Mode::Client) => {
if log::log_enabled!(log::Level::Debug) {
let confirmed_external_addresses =
to_comma_separated_list(confirmed_external_addresses);
log::debug!("Switching to server-mode assuming that one of [{confirmed_external_addresses}] is externally reachable");
}
Mode::Server
}
(confirmed_external_addresses, Mode::Server) => {
debug_assert!(
!confirmed_external_addresses.is_empty(),
"Previous match arm handled empty list"
);
// Previously, server-mode, now also server-mode because > 1 external address. Don't log anything to avoid spam.
Mode::Server
}
};
if external_addresses_changed && !self.connections.is_empty() {
let num_connections = self.connections.len();
log::debug!(
"External addresses changed, re-configuring {} established connection{}",
num_connections,
if num_connections > 1 { "s" } else { "" }
);
self.queued_events
.extend(
self.connections
.iter()
.map(|(conn_id, peer_id)| ToSwarm::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*conn_id),
event: KademliaHandlerIn::ReconfigureMode {
new_mode: self.mode,
},
}),
);
if self.auto_mode && external_addresses_changed {
self.determine_mode_from_external_addresses();
}
match event {

View File

@ -68,7 +68,7 @@ pub use behaviour::{
AddProviderContext, AddProviderError, AddProviderOk, AddProviderPhase, AddProviderResult,
BootstrapError, BootstrapOk, BootstrapResult, GetClosestPeersError, GetClosestPeersOk,
GetClosestPeersResult, GetProvidersError, GetProvidersOk, GetProvidersResult, GetRecordError,
GetRecordOk, GetRecordResult, InboundRequest, NoKnownPeers, PeerRecord, PutRecordContext,
GetRecordOk, GetRecordResult, InboundRequest, Mode, NoKnownPeers, PeerRecord, PutRecordContext,
PutRecordError, PutRecordOk, PutRecordPhase, PutRecordResult, QueryInfo, QueryMut, QueryRef,
QueryResult, QueryStats, RoutingUpdate,
};

View File

@ -1,7 +1,7 @@
use libp2p_identify as identify;
use libp2p_identity as identity;
use libp2p_kad::store::MemoryStore;
use libp2p_kad::{Kademlia, KademliaConfig, KademliaEvent};
use libp2p_kad::{Kademlia, KademliaConfig, KademliaEvent, Mode};
use libp2p_swarm::Swarm;
use libp2p_swarm_test::SwarmExt;
@ -111,6 +111,50 @@ async fn adding_an_external_addresses_activates_server_mode_on_existing_connecti
}
}
#[async_std::test]
async fn set_client_to_server_mode() {
let _ = env_logger::try_init();
let mut client = Swarm::new_ephemeral(MyBehaviour::new);
client.behaviour_mut().kad.set_mode(Some(Mode::Client));
let mut server = Swarm::new_ephemeral(MyBehaviour::new);
server.listen().await;
client.connect(&mut server).await;
let server_peer_id = *server.local_peer_id();
match libp2p_swarm_test::drive(&mut client, &mut server).await {
(
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(_), MyBehaviourEvent::Kad(KademliaEvent::RoutingUpdated { peer, .. })],
[MyBehaviourEvent::Identify(_), MyBehaviourEvent::Identify(identify::Event::Received { info, .. })],
) => {
assert_eq!(peer, server_peer_id);
assert!(info
.protocols
.iter()
.all(|proto| libp2p_kad::PROTOCOL_NAME.ne(proto)))
}
other => panic!("Unexpected events: {other:?}"),
}
client.behaviour_mut().kad.set_mode(Some(Mode::Server));
match libp2p_swarm_test::drive(&mut client, &mut server).await {
(
[MyBehaviourEvent::Identify(_)],
[MyBehaviourEvent::Identify(identify::Event::Received { info, .. }), MyBehaviourEvent::Kad(_)],
) => {
assert!(info
.protocols
.iter()
.any(|proto| libp2p_kad::PROTOCOL_NAME.eq(proto)))
}
other => panic!("Unexpected events: {other:?}"),
}
}
#[derive(libp2p_swarm::NetworkBehaviour)]
#[behaviour(prelude = "libp2p_swarm::derive_prelude")]
struct MyBehaviour {