mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
Some Kademlia improvements (#994)
* Move QueryTarget to the behaviour * Rework query system * Add a few tests * Add some Kademlia tests * More tests * Don't return self entry * Fix tests
This commit is contained in:
parent
f4e7fed742
commit
fc535f532b
@ -33,5 +33,8 @@ unsigned-varint = { version = "0.2.1", features = ["codec"] }
|
|||||||
void = "1.0"
|
void = "1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
libp2p-mplex = { version = "0.5.0", path = "../../muxers/mplex" }
|
||||||
|
libp2p-secio = { version = "0.5.0", path = "../secio" }
|
||||||
libp2p-tcp = { version = "0.5.0", path = "../../transports/tcp" }
|
libp2p-tcp = { version = "0.5.0", path = "../../transports/tcp" }
|
||||||
|
libp2p-yamux = { version = "0.5.0", path = "../../muxers/yamux" }
|
||||||
tokio = "0.1"
|
tokio = "0.1"
|
||||||
|
@ -20,9 +20,9 @@
|
|||||||
|
|
||||||
use crate::addresses::Addresses;
|
use crate::addresses::Addresses;
|
||||||
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId};
|
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId};
|
||||||
use crate::kbucket::{KBucketsTable, Update};
|
use crate::kbucket::{KBucketsTable, KBucketsPeerId, Update};
|
||||||
use crate::protocol::{KadConnectionType, KadPeer};
|
use crate::protocol::{KadConnectionType, KadPeer};
|
||||||
use crate::query::{QueryConfig, QueryState, QueryStatePollOut, QueryTarget};
|
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
|
||||||
use fnv::{FnvHashMap, FnvHashSet};
|
use fnv::{FnvHashMap, FnvHashSet};
|
||||||
use futures::{prelude::*, stream};
|
use futures::{prelude::*, stream};
|
||||||
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||||
@ -34,6 +34,8 @@ use std::{cmp::Ordering, error, marker::PhantomData, time::Duration, time::Insta
|
|||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_timer::Interval;
|
use tokio_timer::Interval;
|
||||||
|
|
||||||
|
mod test;
|
||||||
|
|
||||||
/// Network behaviour that handles Kademlia.
|
/// Network behaviour that handles Kademlia.
|
||||||
pub struct Kademlia<TSubstream> {
|
pub struct Kademlia<TSubstream> {
|
||||||
/// Storage for the nodes. Contains the known multiaddresses for this node.
|
/// Storage for the nodes. Contains the known multiaddresses for this node.
|
||||||
@ -41,10 +43,7 @@ pub struct Kademlia<TSubstream> {
|
|||||||
|
|
||||||
/// All the iterative queries we are currently performing, with their ID. The last parameter
|
/// All the iterative queries we are currently performing, with their ID. The last parameter
|
||||||
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
|
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
|
||||||
active_queries: FnvHashMap<QueryId, (QueryState, QueryPurpose, Vec<PeerId>)>,
|
active_queries: FnvHashMap<QueryId, QueryState<QueryInfo, PeerId>>,
|
||||||
|
|
||||||
/// List of queries to start once we are inside `poll()`.
|
|
||||||
queries_to_starts: SmallVec<[(QueryId, QueryTarget, QueryPurpose); 8]>,
|
|
||||||
|
|
||||||
/// List of peers the swarm is connected to.
|
/// List of peers the swarm is connected to.
|
||||||
connected_peers: FnvHashSet<PeerId>,
|
connected_peers: FnvHashSet<PeerId>,
|
||||||
@ -56,9 +55,6 @@ pub struct Kademlia<TSubstream> {
|
|||||||
/// Identifier for the next query that we start.
|
/// Identifier for the next query that we start.
|
||||||
next_query_id: QueryId,
|
next_query_id: QueryId,
|
||||||
|
|
||||||
/// Requests received by a remote that we should fulfill as soon as possible.
|
|
||||||
remote_requests: SmallVec<[(PeerId, KademliaRequestId, QueryTarget); 4]>,
|
|
||||||
|
|
||||||
/// List of values and peers that are providing them.
|
/// List of values and peers that are providing them.
|
||||||
///
|
///
|
||||||
/// Our local peer ID can be in this container.
|
/// Our local peer ID can be in this container.
|
||||||
@ -96,15 +92,94 @@ pub struct Kademlia<TSubstream> {
|
|||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
|
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
|
||||||
pub struct QueryId(usize);
|
pub struct QueryId(usize);
|
||||||
|
|
||||||
/// Reason why we have this query in the list of queries.
|
/// Information about a query.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
enum QueryPurpose {
|
struct QueryInfo {
|
||||||
|
/// What we are querying and why.
|
||||||
|
inner: QueryInfoInner,
|
||||||
|
/// Temporary addresses used when trying to reach nodes.
|
||||||
|
untrusted_addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Additional information about the query.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
enum QueryInfoInner {
|
||||||
/// The query was created for the Kademlia initialization process.
|
/// The query was created for the Kademlia initialization process.
|
||||||
Initialization,
|
Initialization {
|
||||||
/// The user requested this query to be performed. It should be reported when finished.
|
/// Hash we're targetting to insert ourselves in the k-buckets.
|
||||||
UserRequest,
|
target: PeerId,
|
||||||
/// We should add an `ADD_PROVIDER` message to the peers of the outcome.
|
},
|
||||||
AddProvider(Multihash),
|
|
||||||
|
/// The user requested a `FIND_PEER` query to be performed. It should be reported when finished.
|
||||||
|
FindPeer(PeerId),
|
||||||
|
|
||||||
|
/// The user requested a `GET_PROVIDERS` query to be performed. It should be reported when
|
||||||
|
/// finished.
|
||||||
|
GetProviders {
|
||||||
|
/// Target we are searching the providers of.
|
||||||
|
target: Multihash,
|
||||||
|
/// Results to return. Filled over time.
|
||||||
|
pending_results: Vec<PeerId>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// We are traversing towards `target` and should add an `ADD_PROVIDER` message to the peers
|
||||||
|
/// of the outcome with our own identity.
|
||||||
|
AddProvider {
|
||||||
|
/// Which hash we're targetting.
|
||||||
|
target: Multihash,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KBucketsPeerId<PeerId> for QueryInfo {
|
||||||
|
fn distance_with(&self, other: &PeerId) -> u32 {
|
||||||
|
let other: &Multihash = other.as_ref();
|
||||||
|
self.as_ref().distance_with(other)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn max_distance() -> usize {
|
||||||
|
<PeerId as KBucketsPeerId>::max_distance()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRef<Multihash> for QueryInfo {
|
||||||
|
fn as_ref(&self) -> &Multihash {
|
||||||
|
match &self.inner {
|
||||||
|
QueryInfoInner::Initialization { target } => target.as_ref(),
|
||||||
|
QueryInfoInner::FindPeer(peer) => peer.as_ref(),
|
||||||
|
QueryInfoInner::GetProviders { target, .. } => target,
|
||||||
|
QueryInfoInner::AddProvider { target } => target,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq<PeerId> for QueryInfo {
|
||||||
|
fn eq(&self, other: &PeerId) -> bool {
|
||||||
|
self.as_ref().eq(other)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QueryInfo {
|
||||||
|
/// Creates the corresponding RPC request to send to remote.
|
||||||
|
fn to_rpc_request<TUserData>(&self, user_data: TUserData) -> KademliaHandlerIn<TUserData> {
|
||||||
|
match &self.inner {
|
||||||
|
QueryInfoInner::Initialization { target } => KademliaHandlerIn::FindNodeReq {
|
||||||
|
key: target.clone(),
|
||||||
|
user_data,
|
||||||
|
},
|
||||||
|
QueryInfoInner::FindPeer(key) => KademliaHandlerIn::FindNodeReq {
|
||||||
|
key: key.clone(),
|
||||||
|
user_data,
|
||||||
|
},
|
||||||
|
QueryInfoInner::GetProviders { target, .. } => KademliaHandlerIn::GetProvidersReq {
|
||||||
|
key: target.clone().into(),
|
||||||
|
user_data,
|
||||||
|
},
|
||||||
|
QueryInfoInner::AddProvider { target, .. } => KademliaHandlerIn::FindNodeReq {
|
||||||
|
key: unimplemented!(), // TODO: target.clone(),
|
||||||
|
user_data,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSubstream> Kademlia<TSubstream> {
|
impl<TSubstream> Kademlia<TSubstream> {
|
||||||
@ -151,12 +226,10 @@ impl<TSubstream> Kademlia<TSubstream> {
|
|||||||
let mut behaviour = Kademlia {
|
let mut behaviour = Kademlia {
|
||||||
kbuckets: KBucketsTable::new(local_peer_id, Duration::from_secs(60)), // TODO: constant
|
kbuckets: KBucketsTable::new(local_peer_id, Duration::from_secs(60)), // TODO: constant
|
||||||
queued_events: SmallVec::new(),
|
queued_events: SmallVec::new(),
|
||||||
queries_to_starts: SmallVec::new(),
|
|
||||||
active_queries: Default::default(),
|
active_queries: Default::default(),
|
||||||
connected_peers: Default::default(),
|
connected_peers: Default::default(),
|
||||||
pending_rpcs: SmallVec::with_capacity(parallelism),
|
pending_rpcs: SmallVec::with_capacity(parallelism),
|
||||||
next_query_id: QueryId(0),
|
next_query_id: QueryId(0),
|
||||||
remote_requests: SmallVec::new(),
|
|
||||||
values_providers: FnvHashMap::default(),
|
values_providers: FnvHashMap::default(),
|
||||||
providing_keys: FnvHashSet::default(),
|
providing_keys: FnvHashSet::default(),
|
||||||
refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant
|
refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant
|
||||||
@ -171,57 +244,17 @@ impl<TSubstream> Kademlia<TSubstream> {
|
|||||||
// As part of the initialization process, we start one `FIND_NODE` for each bit of the
|
// As part of the initialization process, we start one `FIND_NODE` for each bit of the
|
||||||
// possible range of peer IDs.
|
// possible range of peer IDs.
|
||||||
for n in 0..256 {
|
for n in 0..256 {
|
||||||
let peer_id = match gen_random_id(behaviour.kbuckets.my_id(), n) {
|
let target = match gen_random_id(behaviour.kbuckets.my_id(), n) {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(()) => continue,
|
Err(()) => continue,
|
||||||
};
|
};
|
||||||
|
|
||||||
behaviour.start_query(QueryTarget::FindPeer(peer_id), QueryPurpose::Initialization);
|
behaviour.start_query(QueryInfoInner::Initialization { target });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
behaviour
|
behaviour
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builds the answer to a request.
|
|
||||||
fn build_result<TUserData>(&mut self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters<'_>)
|
|
||||||
-> KademliaHandlerIn<TUserData>
|
|
||||||
{
|
|
||||||
match query {
|
|
||||||
QueryTarget::FindPeer(key) => {
|
|
||||||
let closer_peers = self.kbuckets
|
|
||||||
.find_closest_with_self(&key)
|
|
||||||
.take(self.num_results)
|
|
||||||
.map(|peer_id| build_kad_peer(peer_id, parameters, &self.kbuckets))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
KademliaHandlerIn::FindNodeRes {
|
|
||||||
closer_peers,
|
|
||||||
request_id,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
QueryTarget::GetProviders(key) => {
|
|
||||||
let closer_peers = self.kbuckets
|
|
||||||
.find_closest_with_self(&key)
|
|
||||||
.take(self.num_results)
|
|
||||||
.map(|peer_id| build_kad_peer(peer_id, parameters, &self.kbuckets))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let provider_peers = self.values_providers
|
|
||||||
.get(&key)
|
|
||||||
.into_iter()
|
|
||||||
.flat_map(|peers| peers)
|
|
||||||
.map(|peer_id| build_kad_peer(peer_id.clone(), parameters, &self.kbuckets))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
KademliaHandlerIn::GetProvidersRes {
|
|
||||||
closer_peers,
|
|
||||||
provider_peers,
|
|
||||||
request_id,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSubstream> Kademlia<TSubstream> {
|
impl<TSubstream> Kademlia<TSubstream> {
|
||||||
@ -231,13 +264,13 @@ impl<TSubstream> Kademlia<TSubstream> {
|
|||||||
/// requested `PeerId`.
|
/// requested `PeerId`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn find_node(&mut self, peer_id: PeerId) {
|
pub fn find_node(&mut self, peer_id: PeerId) {
|
||||||
self.start_query(QueryTarget::FindPeer(peer_id), QueryPurpose::UserRequest);
|
self.start_query(QueryInfoInner::FindPeer(peer_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Starts an iterative `GET_PROVIDERS` request.
|
/// Starts an iterative `GET_PROVIDERS` request.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn get_providers(&mut self, key: Multihash) {
|
pub fn get_providers(&mut self, target: Multihash) {
|
||||||
self.start_query(QueryTarget::GetProviders(key), QueryPurpose::UserRequest);
|
self.start_query(QueryInfoInner::GetProviders { target, pending_results: Vec::new() });
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register the local node as the provider for the given key.
|
/// Register the local node as the provider for the given key.
|
||||||
@ -279,10 +312,29 @@ impl<TSubstream> Kademlia<TSubstream> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Internal function that starts a query.
|
/// Internal function that starts a query.
|
||||||
fn start_query(&mut self, target: QueryTarget, purpose: QueryPurpose) {
|
fn start_query(&mut self, target: QueryInfoInner) {
|
||||||
let query_id = self.next_query_id;
|
let query_id = self.next_query_id;
|
||||||
self.next_query_id.0 += 1;
|
self.next_query_id.0 += 1;
|
||||||
self.queries_to_starts.push((query_id, target, purpose));
|
|
||||||
|
let target = QueryInfo {
|
||||||
|
inner: target,
|
||||||
|
untrusted_addresses: Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let known_closest_peers = self.kbuckets
|
||||||
|
.find_closest::<Multihash>(target.as_ref())
|
||||||
|
.take(self.num_results);
|
||||||
|
|
||||||
|
self.active_queries.insert(
|
||||||
|
query_id,
|
||||||
|
QueryState::new(QueryConfig {
|
||||||
|
target,
|
||||||
|
parallelism: self.parallelism,
|
||||||
|
num_results: self.num_results,
|
||||||
|
rpc_timeout: self.rpc_timeout,
|
||||||
|
known_closest_peers,
|
||||||
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,10 +350,23 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
|
||||||
self.kbuckets
|
// We should order addresses from decreasing likelyhood of connectivity, so start with
|
||||||
|
// the addresses of that peer in the k-buckets.
|
||||||
|
let mut out_list = self.kbuckets
|
||||||
.get(peer_id)
|
.get(peer_id)
|
||||||
.map(|l| l.iter().cloned().collect::<Vec<_>>())
|
.map(|l| l.iter().cloned().collect::<Vec<_>>())
|
||||||
.unwrap_or_else(Vec::new)
|
.unwrap_or_else(Vec::new);
|
||||||
|
|
||||||
|
// We add to that a temporary list of addresses from the ongoing queries.
|
||||||
|
for query in self.active_queries.values() {
|
||||||
|
if let Some(addrs) = query.target().untrusted_addresses.get(peer_id) {
|
||||||
|
for addr in addrs {
|
||||||
|
out_list.push(addr.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out_list
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_connected(&mut self, id: PeerId, endpoint: ConnectedPoint) {
|
fn inject_connected(&mut self, id: PeerId, endpoint: ConnectedPoint) {
|
||||||
@ -342,7 +407,7 @@ where
|
|||||||
let was_in = self.connected_peers.remove(id);
|
let was_in = self.connected_peers.remove(id);
|
||||||
debug_assert!(was_in);
|
debug_assert!(was_in);
|
||||||
|
|
||||||
for (query, _, _) in self.active_queries.values_mut() {
|
for query in self.active_queries.values_mut() {
|
||||||
query.inject_rpc_error(id);
|
query.inject_rpc_error(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -358,7 +423,7 @@ where
|
|||||||
|
|
||||||
fn inject_replaced(&mut self, peer_id: PeerId, old_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
|
fn inject_replaced(&mut self, peer_id: PeerId, old_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
|
||||||
// We need to re-send the active queries.
|
// We need to re-send the active queries.
|
||||||
for (query_id, (query, _, _)) in self.active_queries.iter() {
|
for (query_id, query) in self.active_queries.iter() {
|
||||||
if query.is_waiting(&peer_id) {
|
if query.is_waiting(&peer_id) {
|
||||||
self.queued_events.push(NetworkBehaviourAction::SendEvent {
|
self.queued_events.push(NetworkBehaviourAction::SendEvent {
|
||||||
peer_id: peer_id.clone(),
|
peer_id: peer_id.clone(),
|
||||||
@ -383,8 +448,19 @@ where
|
|||||||
fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent<QueryId>) {
|
fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent<QueryId>) {
|
||||||
match event {
|
match event {
|
||||||
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
|
KademliaHandlerEvent::FindNodeReq { key, request_id } => {
|
||||||
self.remote_requests.push((source, request_id, QueryTarget::FindPeer(key)));
|
let closer_peers = self.kbuckets
|
||||||
return;
|
.find_closest(&key)
|
||||||
|
.take(self.num_results)
|
||||||
|
.map(|peer_id| build_kad_peer(peer_id, &self.kbuckets))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
self.queued_events.push(NetworkBehaviourAction::SendEvent {
|
||||||
|
peer_id: source,
|
||||||
|
event: KademliaHandlerIn::FindNodeRes {
|
||||||
|
closer_peers,
|
||||||
|
request_id,
|
||||||
|
},
|
||||||
|
});
|
||||||
}
|
}
|
||||||
KademliaHandlerEvent::FindNodeRes {
|
KademliaHandlerEvent::FindNodeRes {
|
||||||
closer_peers,
|
closer_peers,
|
||||||
@ -399,13 +475,36 @@ where
|
|||||||
ty: peer.connection_ty,
|
ty: peer.connection_ty,
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
if let Some((query, _, _)) = self.active_queries.get_mut(&user_data) {
|
if let Some(query) = self.active_queries.get_mut(&user_data) {
|
||||||
|
for peer in closer_peers.iter() {
|
||||||
|
query.target_mut().untrusted_addresses
|
||||||
|
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
|
||||||
|
}
|
||||||
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
|
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
|
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
|
||||||
self.remote_requests.push((source, request_id, QueryTarget::GetProviders(key)));
|
let closer_peers = self.kbuckets
|
||||||
return;
|
.find_closest(&key)
|
||||||
|
.take(self.num_results)
|
||||||
|
.map(|peer_id| build_kad_peer(peer_id, &self.kbuckets))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let provider_peers = self.values_providers
|
||||||
|
.get(&key)
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|peers| peers)
|
||||||
|
.map(|peer_id| build_kad_peer(peer_id.clone(), &self.kbuckets))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
self.queued_events.push(NetworkBehaviourAction::SendEvent {
|
||||||
|
peer_id: source,
|
||||||
|
event: KademliaHandlerIn::GetProvidersRes {
|
||||||
|
closer_peers,
|
||||||
|
provider_peers,
|
||||||
|
request_id,
|
||||||
|
},
|
||||||
|
});
|
||||||
}
|
}
|
||||||
KademliaHandlerEvent::GetProvidersRes {
|
KademliaHandlerEvent::GetProvidersRes {
|
||||||
closer_peers,
|
closer_peers,
|
||||||
@ -422,9 +521,15 @@ where
|
|||||||
|
|
||||||
// It is possible that we obtain a response for a query that has finished, which is
|
// It is possible that we obtain a response for a query that has finished, which is
|
||||||
// why we may not find an entry in `self.active_queries`.
|
// why we may not find an entry in `self.active_queries`.
|
||||||
if let Some((query, _, providers)) = self.active_queries.get_mut(&user_data) {
|
if let Some(query) = self.active_queries.get_mut(&user_data) {
|
||||||
for peer in provider_peers {
|
if let QueryInfoInner::GetProviders { pending_results, .. } = &mut query.target_mut().inner {
|
||||||
providers.push(peer.node_id);
|
for peer in provider_peers {
|
||||||
|
pending_results.push(peer.node_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for peer in closer_peers.iter() {
|
||||||
|
query.target_mut().untrusted_addresses
|
||||||
|
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
|
||||||
}
|
}
|
||||||
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
|
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
|
||||||
}
|
}
|
||||||
@ -432,7 +537,7 @@ where
|
|||||||
KademliaHandlerEvent::QueryError { user_data, .. } => {
|
KademliaHandlerEvent::QueryError { user_data, .. } => {
|
||||||
// It is possible that we obtain a response for a query that has finished, which is
|
// It is possible that we obtain a response for a query that has finished, which is
|
||||||
// why we may not find an entry in `self.active_queries`.
|
// why we may not find an entry in `self.active_queries`.
|
||||||
if let Some((query, _, _)) = self.active_queries.get_mut(&user_data) {
|
if let Some(query) = self.active_queries.get_mut(&user_data) {
|
||||||
query.inject_rpc_error(&source)
|
query.inject_rpc_error(&source)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -474,50 +579,14 @@ where
|
|||||||
match self.refresh_add_providers.poll() {
|
match self.refresh_add_providers.poll() {
|
||||||
Ok(Async::NotReady) => {},
|
Ok(Async::NotReady) => {},
|
||||||
Ok(Async::Ready(Some(_))) => {
|
Ok(Async::Ready(Some(_))) => {
|
||||||
for provided in self.providing_keys.clone().into_iter() {
|
for target in self.providing_keys.clone().into_iter() {
|
||||||
let purpose = QueryPurpose::AddProvider(provided.clone());
|
self.start_query(QueryInfoInner::AddProvider { target });
|
||||||
// TODO: messy because of the PeerId/Multihash division
|
|
||||||
if let Ok(key_as_peer) = PeerId::from_multihash(provided) {
|
|
||||||
self.start_query(QueryTarget::FindPeer(key_as_peer), purpose);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
// Ignore errors.
|
// Ignore errors.
|
||||||
Ok(Async::Ready(None)) | Err(_) => {},
|
Ok(Async::Ready(None)) | Err(_) => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start queries that are waiting to start.
|
|
||||||
for (query_id, query_target, query_purpose) in self.queries_to_starts.drain() {
|
|
||||||
let known_closest_peers = self.kbuckets
|
|
||||||
.find_closest(query_target.as_hash())
|
|
||||||
.take(self.num_results);
|
|
||||||
self.active_queries.insert(
|
|
||||||
query_id,
|
|
||||||
(
|
|
||||||
QueryState::new(QueryConfig {
|
|
||||||
target: query_target,
|
|
||||||
parallelism: self.parallelism,
|
|
||||||
num_results: self.num_results,
|
|
||||||
rpc_timeout: self.rpc_timeout,
|
|
||||||
known_closest_peers,
|
|
||||||
}),
|
|
||||||
query_purpose,
|
|
||||||
Vec::new() // TODO: insert ourselves if we provide the data?
|
|
||||||
)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
self.queries_to_starts.shrink_to_fit();
|
|
||||||
|
|
||||||
// Handle remote queries.
|
|
||||||
if !self.remote_requests.is_empty() {
|
|
||||||
let (peer_id, request_id, query) = self.remote_requests.remove(0);
|
|
||||||
let result = self.build_result(query, request_id, parameters);
|
|
||||||
return Async::Ready(NetworkBehaviourAction::SendEvent {
|
|
||||||
peer_id,
|
|
||||||
event: result,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Handle events queued by other parts of this struct
|
// Handle events queued by other parts of this struct
|
||||||
if !self.queued_events.is_empty() {
|
if !self.queued_events.is_empty() {
|
||||||
@ -528,7 +597,7 @@ where
|
|||||||
// If iterating finds a query that is finished, stores it here and stops looping.
|
// If iterating finds a query that is finished, stores it here and stops looping.
|
||||||
let mut finished_query = None;
|
let mut finished_query = None;
|
||||||
|
|
||||||
'queries_iter: for (&query_id, (query, _, _)) in self.active_queries.iter_mut() {
|
'queries_iter: for (&query_id, query) in self.active_queries.iter_mut() {
|
||||||
loop {
|
loop {
|
||||||
match query.poll() {
|
match query.poll() {
|
||||||
Async::Ready(QueryStatePollOut::Finished) => {
|
Async::Ready(QueryStatePollOut::Finished) => {
|
||||||
@ -562,39 +631,37 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(finished_query) = finished_query {
|
if let Some(finished_query) = finished_query {
|
||||||
let (query, purpose, provider_peers) = self
|
let (query_info, closer_peers) = self
|
||||||
.active_queries
|
.active_queries
|
||||||
.remove(&finished_query)
|
.remove(&finished_query)
|
||||||
.expect("finished_query was gathered when iterating active_queries; QED.");
|
.expect("finished_query was gathered when iterating active_queries; QED.")
|
||||||
match purpose {
|
.into_target_and_closest_peers();
|
||||||
QueryPurpose::Initialization => {},
|
|
||||||
QueryPurpose::UserRequest => {
|
match query_info.inner {
|
||||||
let event = match query.target().clone() {
|
QueryInfoInner::Initialization { .. } => {},
|
||||||
QueryTarget::FindPeer(key) => {
|
QueryInfoInner::FindPeer(target) => {
|
||||||
debug_assert!(provider_peers.is_empty());
|
let event = KademliaOut::FindNodeResult {
|
||||||
KademliaOut::FindNodeResult {
|
key: target,
|
||||||
key,
|
closer_peers: closer_peers.collect(),
|
||||||
closer_peers: query.into_closest_peers().collect(),
|
};
|
||||||
}
|
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||||
},
|
},
|
||||||
QueryTarget::GetProviders(key) => {
|
QueryInfoInner::GetProviders { target, pending_results } => {
|
||||||
KademliaOut::GetProvidersResult {
|
let event = KademliaOut::GetProvidersResult {
|
||||||
key,
|
key: target,
|
||||||
closer_peers: query.into_closest_peers().collect(),
|
closer_peers: closer_peers.collect(),
|
||||||
provider_peers,
|
provider_peers: pending_results,
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
|
||||||
},
|
},
|
||||||
QueryPurpose::AddProvider(key) => {
|
QueryInfoInner::AddProvider { target } => {
|
||||||
for closest in query.into_closest_peers() {
|
for closest in closer_peers {
|
||||||
let event = NetworkBehaviourAction::SendEvent {
|
let event = NetworkBehaviourAction::SendEvent {
|
||||||
peer_id: closest,
|
peer_id: closest,
|
||||||
event: KademliaHandlerIn::AddProvider {
|
event: KademliaHandlerIn::AddProvider {
|
||||||
key: key.clone(),
|
key: target.clone(),
|
||||||
provider_peer: build_kad_peer(parameters.local_peer_id().clone(), parameters, &self.kbuckets),
|
provider_peer: build_kad_peer(parameters.local_peer_id().clone(), &self.kbuckets),
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -675,25 +742,16 @@ fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Builds a `KadPeer` struct corresponding to the given `PeerId`.
|
/// Builds a `KadPeer` struct corresponding to the given `PeerId`.
|
||||||
/// The `PeerId` can be the same as the local one.
|
/// The `PeerId` cannot be the same as the local one.
|
||||||
///
|
///
|
||||||
/// > **Note**: This is just a convenience function that doesn't do anything note-worthy.
|
/// > **Note**: This is just a convenience function that doesn't do anything note-worthy.
|
||||||
fn build_kad_peer(
|
fn build_kad_peer(
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
parameters: &mut PollParameters<'_>,
|
|
||||||
kbuckets: &KBucketsTable<PeerId, Addresses>
|
kbuckets: &KBucketsTable<PeerId, Addresses>
|
||||||
) -> KadPeer {
|
) -> KadPeer {
|
||||||
let is_self = peer_id == *parameters.local_peer_id();
|
debug_assert_ne!(*kbuckets.my_id(), peer_id);
|
||||||
|
|
||||||
let (multiaddrs, connection_ty) = if is_self {
|
let (multiaddrs, connection_ty) = if let Some(addresses) = kbuckets.get(&peer_id) {
|
||||||
let mut addrs = parameters
|
|
||||||
.listened_addresses()
|
|
||||||
.cloned()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
addrs.extend(parameters.external_addresses());
|
|
||||||
(addrs, KadConnectionType::Connected)
|
|
||||||
|
|
||||||
} else if let Some(addresses) = kbuckets.get(&peer_id) {
|
|
||||||
let connected = if addresses.is_connected() {
|
let connected = if addresses.is_connected() {
|
||||||
KadConnectionType::Connected
|
KadConnectionType::Connected
|
||||||
} else {
|
} else {
|
||||||
|
303
protocols/kad/src/behaviour/test.rs
Normal file
303
protocols/kad/src/behaviour/test.rs
Normal file
@ -0,0 +1,303 @@
|
|||||||
|
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
#![cfg(test)]
|
||||||
|
|
||||||
|
use crate::{Kademlia, KademliaOut};
|
||||||
|
use futures::prelude::*;
|
||||||
|
use libp2p_core::{upgrade, upgrade::InboundUpgradeExt, upgrade::OutboundUpgradeExt, PeerId, Swarm, Transport};
|
||||||
|
use libp2p_core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
|
||||||
|
use std::io;
|
||||||
|
|
||||||
|
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
|
||||||
|
/// This is to be used only for testing, and a panic will happen if something goes wrong.
|
||||||
|
fn build_nodes(num: usize)
|
||||||
|
-> Vec<Swarm<Boxed<(PeerId, StreamMuxerBox), io::Error>, Kademlia<Substream<StreamMuxerBox>>>>
|
||||||
|
{
|
||||||
|
let mut result: Vec<Swarm<_, _>> = Vec::with_capacity(num);
|
||||||
|
|
||||||
|
for _ in 0..num {
|
||||||
|
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
||||||
|
// is about creating the transport
|
||||||
|
let local_key = libp2p_core::identity::Keypair::generate_ed25519();
|
||||||
|
let local_public_key = local_key.public();
|
||||||
|
let transport = libp2p_tcp::TcpConfig::new()
|
||||||
|
.with_upgrade(libp2p_secio::SecioConfig::new(local_key))
|
||||||
|
.and_then(move |out, endpoint| {
|
||||||
|
let peer_id = out.remote_key.into_peer_id();
|
||||||
|
let peer_id2 = peer_id.clone();
|
||||||
|
let upgrade = libp2p_yamux::Config::default()
|
||||||
|
.map_inbound(move |muxer| (peer_id, muxer))
|
||||||
|
.map_outbound(move |muxer| (peer_id2, muxer));
|
||||||
|
upgrade::apply(out.stream, upgrade, endpoint)
|
||||||
|
.map(|(id, muxer)| (id, StreamMuxerBox::new(muxer)))
|
||||||
|
})
|
||||||
|
.map_err(|_| panic!())
|
||||||
|
.boxed();
|
||||||
|
|
||||||
|
let kad = Kademlia::without_init(local_public_key.clone().into_peer_id());
|
||||||
|
result.push(Swarm::new(transport, kad, local_public_key.into_peer_id()));
|
||||||
|
}
|
||||||
|
|
||||||
|
for s in result.iter_mut() {
|
||||||
|
Swarm::listen_on(s, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn basic_find_node() {
|
||||||
|
// Build two nodes. Node #2 only knows about node #1. Node #2 is asked for a random peer ID.
|
||||||
|
// Node #2 must return the identity of node #1.
|
||||||
|
|
||||||
|
let mut swarms = build_nodes(2);
|
||||||
|
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
|
||||||
|
|
||||||
|
// Connect second to first.
|
||||||
|
{
|
||||||
|
let listen_addr = Swarm::listeners(&swarms[0]).next().unwrap().clone();
|
||||||
|
swarms[1].add_not_connected_address(&first_peer_id, listen_addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
let search_target = PeerId::random();
|
||||||
|
swarms[1].find_node(search_target.clone());
|
||||||
|
|
||||||
|
tokio::runtime::Runtime::new()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
|
||||||
|
for swarm in &mut swarms {
|
||||||
|
loop {
|
||||||
|
match swarm.poll().unwrap() {
|
||||||
|
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
|
||||||
|
assert_eq!(key, search_target);
|
||||||
|
assert_eq!(closer_peers.len(), 1);
|
||||||
|
assert_eq!(closer_peers[0], first_peer_id);
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
Async::Ready(_) => (),
|
||||||
|
Async::NotReady => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn direct_query() {
|
||||||
|
// Build three nodes. Node #2 knows about node #1. Node #3 knows about node #2. Node #3 is
|
||||||
|
// asked about a random peer and should return nodes #1 and #2.
|
||||||
|
|
||||||
|
let mut swarms = build_nodes(3);
|
||||||
|
|
||||||
|
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
|
||||||
|
let second_peer_id = Swarm::local_peer_id(&swarms[1]).clone();
|
||||||
|
|
||||||
|
// Connect second to first.
|
||||||
|
{
|
||||||
|
let listen_addr = Swarm::listeners(&swarms[0]).next().unwrap().clone();
|
||||||
|
swarms[1].add_not_connected_address(&first_peer_id, listen_addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect third to second.
|
||||||
|
{
|
||||||
|
let listen_addr = Swarm::listeners(&swarms[1]).next().unwrap().clone();
|
||||||
|
swarms[2].add_not_connected_address(&second_peer_id, listen_addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ask third to search a random value.
|
||||||
|
let search_target = PeerId::random();
|
||||||
|
swarms[2].find_node(search_target.clone());
|
||||||
|
|
||||||
|
tokio::runtime::Runtime::new()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
|
||||||
|
for swarm in &mut swarms {
|
||||||
|
loop {
|
||||||
|
match swarm.poll().unwrap() {
|
||||||
|
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
|
||||||
|
assert_eq!(key, search_target);
|
||||||
|
assert_eq!(closer_peers.len(), 2);
|
||||||
|
assert!((closer_peers[0] == first_peer_id) != (closer_peers[1] == first_peer_id));
|
||||||
|
assert!((closer_peers[0] == second_peer_id) != (closer_peers[1] == second_peer_id));
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
Async::Ready(_) => (),
|
||||||
|
Async::NotReady => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn indirect_query() {
|
||||||
|
// Build four nodes. Node #2 knows about node #1. Node #3 knows about node #2. Node #4 knows
|
||||||
|
// about node #3. Node #4 is asked about a random peer and should return nodes #1, #2 and #3.
|
||||||
|
|
||||||
|
let mut swarms = build_nodes(4);
|
||||||
|
|
||||||
|
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
|
||||||
|
let second_peer_id = Swarm::local_peer_id(&swarms[1]).clone();
|
||||||
|
let third_peer_id = Swarm::local_peer_id(&swarms[2]).clone();
|
||||||
|
|
||||||
|
// Connect second to first.
|
||||||
|
{
|
||||||
|
let listen_addr = Swarm::listeners(&swarms[0]).next().unwrap().clone();
|
||||||
|
swarms[1].add_not_connected_address(&first_peer_id, listen_addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect third to second.
|
||||||
|
{
|
||||||
|
let listen_addr = Swarm::listeners(&swarms[1]).next().unwrap().clone();
|
||||||
|
swarms[2].add_not_connected_address(&second_peer_id, listen_addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect fourth to third.
|
||||||
|
{
|
||||||
|
let listen_addr = Swarm::listeners(&swarms[2]).next().unwrap().clone();
|
||||||
|
swarms[3].add_not_connected_address(&third_peer_id, listen_addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ask fourth to search a random value.
|
||||||
|
let search_target = PeerId::random();
|
||||||
|
swarms[3].find_node(search_target.clone());
|
||||||
|
|
||||||
|
tokio::runtime::Runtime::new()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
|
||||||
|
for swarm in &mut swarms {
|
||||||
|
loop {
|
||||||
|
match swarm.poll().unwrap() {
|
||||||
|
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
|
||||||
|
assert_eq!(key, search_target);
|
||||||
|
assert_eq!(closer_peers.len(), 3);
|
||||||
|
assert_eq!(closer_peers.iter().filter(|p| **p == first_peer_id).count(), 1);
|
||||||
|
assert_eq!(closer_peers.iter().filter(|p| **p == second_peer_id).count(), 1);
|
||||||
|
assert_eq!(closer_peers.iter().filter(|p| **p == third_peer_id).count(), 1);
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
Async::Ready(_) => (),
|
||||||
|
Async::NotReady => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unresponsive_not_returned_direct() {
|
||||||
|
// Build one node. It contains fake addresses to non-existing nodes. We ask it to find a
|
||||||
|
// random peer. We make sure that no fake address is returned.
|
||||||
|
|
||||||
|
let mut swarms = build_nodes(1);
|
||||||
|
|
||||||
|
// Add fake addresses.
|
||||||
|
for _ in 0 .. 10 {
|
||||||
|
swarms[0].add_not_connected_address(
|
||||||
|
&PeerId::random(),
|
||||||
|
libp2p_core::multiaddr::multiaddr![Udp(10u16)]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ask first to search a random value.
|
||||||
|
let search_target = PeerId::random();
|
||||||
|
swarms[0].find_node(search_target.clone());
|
||||||
|
|
||||||
|
tokio::runtime::Runtime::new()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
|
||||||
|
for swarm in &mut swarms {
|
||||||
|
loop {
|
||||||
|
match swarm.poll().unwrap() {
|
||||||
|
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
|
||||||
|
assert_eq!(key, search_target);
|
||||||
|
assert_eq!(closer_peers.len(), 0);
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
Async::Ready(_) => (),
|
||||||
|
Async::NotReady => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unresponsive_not_returned_indirect() {
|
||||||
|
// Build two nodes. Node #2 knows about node #1. Node #1 contains fake addresses to
|
||||||
|
// non-existing nodes. We ask node #1 to find a random peer. We make sure that no fake address
|
||||||
|
// is returned.
|
||||||
|
|
||||||
|
let mut swarms = build_nodes(2);
|
||||||
|
|
||||||
|
// Add fake addresses to first.
|
||||||
|
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
|
||||||
|
for _ in 0 .. 10 {
|
||||||
|
swarms[0].add_not_connected_address(
|
||||||
|
&PeerId::random(),
|
||||||
|
libp2p_core::multiaddr::multiaddr![Udp(10u16)]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect second to first.
|
||||||
|
{
|
||||||
|
let listen_addr = Swarm::listeners(&swarms[0]).next().unwrap().clone();
|
||||||
|
swarms[1].add_not_connected_address(&first_peer_id, listen_addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ask second to search a random value.
|
||||||
|
let search_target = PeerId::random();
|
||||||
|
swarms[1].find_node(search_target.clone());
|
||||||
|
|
||||||
|
tokio::runtime::Runtime::new()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
|
||||||
|
for swarm in &mut swarms {
|
||||||
|
loop {
|
||||||
|
match swarm.poll().unwrap() {
|
||||||
|
Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => {
|
||||||
|
assert_eq!(key, search_target);
|
||||||
|
assert_eq!(closer_peers.len(), 1);
|
||||||
|
assert_eq!(closer_peers[0], first_peer_id);
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
Async::Ready(_) => (),
|
||||||
|
Async::NotReady => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
}
|
@ -23,13 +23,10 @@
|
|||||||
//! This allows one to create queries that iterate on the DHT on nodes that become closer and
|
//! This allows one to create queries that iterate on the DHT on nodes that become closer and
|
||||||
//! closer to the target.
|
//! closer to the target.
|
||||||
|
|
||||||
use crate::handler::KademliaHandlerIn;
|
|
||||||
use crate::kbucket::KBucketsPeerId;
|
use crate::kbucket::KBucketsPeerId;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p_core::PeerId;
|
|
||||||
use multihash::Multihash;
|
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::time::{Duration, Instant};
|
use std::{cmp::PartialEq, time::Duration, time::Instant};
|
||||||
use tokio_timer::Delay;
|
use tokio_timer::Delay;
|
||||||
|
|
||||||
/// State of a query iterative process.
|
/// State of a query iterative process.
|
||||||
@ -43,9 +40,9 @@ use tokio_timer::Delay;
|
|||||||
/// `GET_PROVIDERS`, you need to extract yourself the value or list of providers from RPC requests
|
/// `GET_PROVIDERS`, you need to extract yourself the value or list of providers from RPC requests
|
||||||
/// received by remotes as this is not handled by the `QueryState`.
|
/// received by remotes as this is not handled by the `QueryState`.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct QueryState {
|
pub struct QueryState<TTarget, TPeerId> {
|
||||||
/// Target we're looking for.
|
/// Target we're looking for.
|
||||||
target: QueryTarget,
|
target: TTarget,
|
||||||
|
|
||||||
/// Stage of the query. See the documentation of `QueryStage`.
|
/// Stage of the query. See the documentation of `QueryStage`.
|
||||||
stage: QueryStage,
|
stage: QueryStage,
|
||||||
@ -53,7 +50,7 @@ pub struct QueryState {
|
|||||||
/// Ordered list of the peers closest to the result we're looking for.
|
/// Ordered list of the peers closest to the result we're looking for.
|
||||||
/// Entries that are `InProgress` shouldn't be removed from the list before they complete.
|
/// Entries that are `InProgress` shouldn't be removed from the list before they complete.
|
||||||
/// Must never contain two entries with the same peer IDs.
|
/// Must never contain two entries with the same peer IDs.
|
||||||
closest_peers: SmallVec<[(PeerId, QueryPeerState); 32]>,
|
closest_peers: SmallVec<[(TPeerId, QueryPeerState); 32]>,
|
||||||
|
|
||||||
/// Allowed level of parallelism.
|
/// Allowed level of parallelism.
|
||||||
parallelism: usize,
|
parallelism: usize,
|
||||||
@ -67,9 +64,9 @@ pub struct QueryState {
|
|||||||
|
|
||||||
/// Configuration for a query.
|
/// Configuration for a query.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct QueryConfig<TIter> {
|
pub struct QueryConfig<TIter, TTarget> {
|
||||||
/// Target of the query.
|
/// Target of the query.
|
||||||
pub target: QueryTarget,
|
pub target: TTarget,
|
||||||
|
|
||||||
/// Iterator to a list of `num_results` nodes that we know of whose distance is close to the
|
/// Iterator to a list of `num_results` nodes that we know of whose distance is close to the
|
||||||
/// target.
|
/// target.
|
||||||
@ -101,11 +98,15 @@ enum QueryStage {
|
|||||||
Frozen,
|
Frozen,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QueryState {
|
impl<TTarget, TPeerId> QueryState<TTarget, TPeerId>
|
||||||
|
where
|
||||||
|
TPeerId: Eq,
|
||||||
|
TTarget: KBucketsPeerId<TPeerId>
|
||||||
|
{
|
||||||
/// Creates a new query.
|
/// Creates a new query.
|
||||||
///
|
///
|
||||||
/// You should call `poll()` this function returns in order to know what to do.
|
/// You should call `poll()` this function returns in order to know what to do.
|
||||||
pub fn new(config: QueryConfig<impl IntoIterator<Item = PeerId>>) -> QueryState {
|
pub fn new(config: QueryConfig<impl IntoIterator<Item = TPeerId>, TTarget>) -> Self {
|
||||||
let mut closest_peers: SmallVec<[_; 32]> = config
|
let mut closest_peers: SmallVec<[_; 32]> = config
|
||||||
.known_closest_peers
|
.known_closest_peers
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@ -113,7 +114,7 @@ impl QueryState {
|
|||||||
.take(config.num_results)
|
.take(config.num_results)
|
||||||
.collect();
|
.collect();
|
||||||
let target = config.target;
|
let target = config.target;
|
||||||
closest_peers.sort_by_key(|e| target.as_hash().distance_with(e.0.as_ref()));
|
closest_peers.sort_by_key(|e| target.distance_with(&e.0));
|
||||||
closest_peers.dedup_by(|a, b| a.0 == b.0);
|
closest_peers.dedup_by(|a, b| a.0 == b.0);
|
||||||
|
|
||||||
QueryState {
|
QueryState {
|
||||||
@ -130,10 +131,19 @@ impl QueryState {
|
|||||||
|
|
||||||
/// Returns the target of the query. Always the same as what was passed to `new()`.
|
/// Returns the target of the query. Always the same as what was passed to `new()`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn target(&self) -> &QueryTarget {
|
pub fn target(&self) -> &TTarget {
|
||||||
&self.target
|
&self.target
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the target of the query. Always the same as what was passed to `new()`.
|
||||||
|
///
|
||||||
|
/// You shouldn't modify the target in such a way that modifies the target of the query,
|
||||||
|
/// otherwise logic errors will likely happen.
|
||||||
|
#[inline]
|
||||||
|
pub fn target_mut(&mut self) -> &mut TTarget {
|
||||||
|
&mut self.target
|
||||||
|
}
|
||||||
|
|
||||||
/// After `poll()` returned `SendRpc`, this method should be called when the node sends back
|
/// After `poll()` returned `SendRpc`, this method should be called when the node sends back
|
||||||
/// the result of the query.
|
/// the result of the query.
|
||||||
///
|
///
|
||||||
@ -143,12 +153,12 @@ impl QueryState {
|
|||||||
/// After this function returns, you should call `poll()` again.
|
/// After this function returns, you should call `poll()` again.
|
||||||
pub fn inject_rpc_result(
|
pub fn inject_rpc_result(
|
||||||
&mut self,
|
&mut self,
|
||||||
result_source: &PeerId,
|
result_source: &impl PartialEq<TPeerId>,
|
||||||
closer_peers: impl IntoIterator<Item = PeerId>,
|
closer_peers: impl IntoIterator<Item = TPeerId>,
|
||||||
) {
|
) {
|
||||||
// Mark the peer as succeeded.
|
// Mark the peer as succeeded.
|
||||||
for (peer_id, state) in self.closest_peers.iter_mut() {
|
for (peer_id, state) in self.closest_peers.iter_mut() {
|
||||||
if peer_id == result_source {
|
if result_source == peer_id {
|
||||||
if let state @ QueryPeerState::InProgress(_) = state {
|
if let state @ QueryPeerState::InProgress(_) = state {
|
||||||
*state = QueryPeerState::Succeeded;
|
*state = QueryPeerState::Succeeded;
|
||||||
}
|
}
|
||||||
@ -165,9 +175,9 @@ impl QueryState {
|
|||||||
|
|
||||||
for elem_to_add in closer_peers {
|
for elem_to_add in closer_peers {
|
||||||
let target = &self.target;
|
let target = &self.target;
|
||||||
let elem_to_add_distance = target.as_hash().distance_with(elem_to_add.as_ref());
|
let elem_to_add_distance = target.distance_with(&elem_to_add);
|
||||||
let insert_pos_start = self.closest_peers.iter().position(|(id, _)| {
|
let insert_pos_start = self.closest_peers.iter().position(|(id, _)| {
|
||||||
target.as_hash().distance_with(id.as_ref()) >= elem_to_add_distance
|
target.distance_with(&id) >= elem_to_add_distance
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Some(insert_pos_start) = insert_pos_start {
|
if let Some(insert_pos_start) = insert_pos_start {
|
||||||
@ -176,7 +186,7 @@ impl QueryState {
|
|||||||
let insert_pos_size = self.closest_peers.iter()
|
let insert_pos_size = self.closest_peers.iter()
|
||||||
.skip(insert_pos_start)
|
.skip(insert_pos_start)
|
||||||
.position(|(id, _)| {
|
.position(|(id, _)| {
|
||||||
target.as_hash().distance_with(id.as_ref()) > elem_to_add_distance
|
target.distance_with(&id) > elem_to_add_distance
|
||||||
});
|
});
|
||||||
|
|
||||||
// Make sure we don't insert duplicates.
|
// Make sure we don't insert duplicates.
|
||||||
@ -216,31 +226,26 @@ impl QueryState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the list of peers for which we are waiting for an answer.
|
||||||
|
pub fn waiting(&self) -> impl Iterator<Item = &TPeerId> {
|
||||||
|
self.closest_peers
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, state)| {
|
||||||
|
match state {
|
||||||
|
QueryPeerState::InProgress(_) => true,
|
||||||
|
QueryPeerState::NotContacted => false,
|
||||||
|
QueryPeerState::Succeeded => false,
|
||||||
|
QueryPeerState::Failed => false,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map(|(id, _)| id)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns true if we are waiting for a query answer from that peer.
|
/// Returns true if we are waiting for a query answer from that peer.
|
||||||
///
|
///
|
||||||
/// After `poll()` returned `SendRpc`, this function will return `true`.
|
/// After `poll()` returned `SendRpc`, this function will return `true`.
|
||||||
pub fn is_waiting(&self, id: &PeerId) -> bool {
|
pub fn is_waiting(&self, id: &impl PartialEq<TPeerId>) -> bool {
|
||||||
let state = self
|
self.waiting().any(|peer_id| id == peer_id)
|
||||||
.closest_peers
|
|
||||||
.iter()
|
|
||||||
.filter_map(
|
|
||||||
|(peer_id, state)| {
|
|
||||||
if peer_id == id {
|
|
||||||
Some(state)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.next();
|
|
||||||
|
|
||||||
match state {
|
|
||||||
Some(&QueryPeerState::InProgress(_)) => true,
|
|
||||||
Some(&QueryPeerState::NotContacted) => false,
|
|
||||||
Some(&QueryPeerState::Succeeded) => false,
|
|
||||||
Some(&QueryPeerState::Failed) => false,
|
|
||||||
None => false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// After `poll()` returned `SendRpc`, this function should be called if we were unable to
|
/// After `poll()` returned `SendRpc`, this function should be called if we were unable to
|
||||||
@ -250,7 +255,7 @@ impl QueryState {
|
|||||||
/// function whenever an error happens on the network.
|
/// function whenever an error happens on the network.
|
||||||
///
|
///
|
||||||
/// After this function returns, you should call `poll()` again.
|
/// After this function returns, you should call `poll()` again.
|
||||||
pub fn inject_rpc_error(&mut self, id: &PeerId) {
|
pub fn inject_rpc_error(&mut self, id: &TPeerId) {
|
||||||
let state = self
|
let state = self
|
||||||
.closest_peers
|
.closest_peers
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@ -275,7 +280,7 @@ impl QueryState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Polls this individual query.
|
/// Polls this individual query.
|
||||||
pub fn poll(&mut self) -> Async<QueryStatePollOut<'_>> {
|
pub fn poll(&mut self) -> Async<QueryStatePollOut<'_, TTarget, TPeerId>> {
|
||||||
// While iterating over peers, count the number of queries currently being processed.
|
// While iterating over peers, count the number of queries currently being processed.
|
||||||
// This is used to not go over the limit of parallel requests.
|
// This is used to not go over the limit of parallel requests.
|
||||||
// If this is still 0 at the end of the function, that means the query is finished.
|
// If this is still 0 at the end of the function, that means the query is finished.
|
||||||
@ -331,10 +336,7 @@ impl QueryState {
|
|||||||
let need_connect = match state {
|
let need_connect = match state {
|
||||||
QueryPeerState::NotContacted => match self.stage {
|
QueryPeerState::NotContacted => match self.stage {
|
||||||
QueryStage::Iterating { .. } => active_counter < self.parallelism,
|
QueryStage::Iterating { .. } => active_counter < self.parallelism,
|
||||||
QueryStage::Frozen => match self.target {
|
QueryStage::Frozen => true, // TODO: as an optimization, could be false if we're not trying to find peers
|
||||||
QueryTarget::FindPeer(_) => true,
|
|
||||||
QueryTarget::GetProviders(_) => false,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
_ => false,
|
_ => false,
|
||||||
};
|
};
|
||||||
@ -358,12 +360,12 @@ impl QueryState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Consumes the query and returns the known closest peers.
|
/// Consumes the query and returns the targe tand known closest peers.
|
||||||
///
|
///
|
||||||
/// > **Note**: This can be called at any time, but you normally only do that once the query
|
/// > **Note**: This can be called at any time, but you normally only do that once the query
|
||||||
/// > is finished.
|
/// > is finished.
|
||||||
pub fn into_closest_peers(self) -> impl Iterator<Item = PeerId> {
|
pub fn into_target_and_closest_peers(self) -> (TTarget, impl Iterator<Item = TPeerId>) {
|
||||||
self.closest_peers
|
let closest = self.closest_peers
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|(peer_id, state)| {
|
.filter_map(|(peer_id, state)| {
|
||||||
if let QueryPeerState::Succeeded = state {
|
if let QueryPeerState::Succeeded = state {
|
||||||
@ -372,13 +374,22 @@ impl QueryState {
|
|||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.take(self.num_results)
|
.take(self.num_results);
|
||||||
|
(self.target, closest)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consumes the query and returns the known closest peers.
|
||||||
|
///
|
||||||
|
/// > **Note**: This can be called at any time, but you normally only do that once the query
|
||||||
|
/// > is finished.
|
||||||
|
pub fn into_closest_peers(self) -> impl Iterator<Item = TPeerId> {
|
||||||
|
self.into_target_and_closest_peers().1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Outcome of polling a query.
|
/// Outcome of polling a query.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum QueryStatePollOut<'a> {
|
pub enum QueryStatePollOut<'a, TTarget, TPeerId> {
|
||||||
/// The query is finished.
|
/// The query is finished.
|
||||||
///
|
///
|
||||||
/// If this is a `FindValue` query, the user is supposed to extract the record themselves from
|
/// If this is a `FindValue` query, the user is supposed to extract the record themselves from
|
||||||
@ -399,9 +410,9 @@ pub enum QueryStatePollOut<'a> {
|
|||||||
/// `inject_rpc_error` at a later point in time.
|
/// `inject_rpc_error` at a later point in time.
|
||||||
SendRpc {
|
SendRpc {
|
||||||
/// The peer to send the RPC query to.
|
/// The peer to send the RPC query to.
|
||||||
peer_id: &'a PeerId,
|
peer_id: &'a TPeerId,
|
||||||
/// A reminder of the query target. Same as what you obtain by calling `target()`.
|
/// A reminder of the query target. Same as what you obtain by calling `target()`.
|
||||||
query_target: &'a QueryTarget,
|
query_target: &'a TTarget,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// We no longer need to send a query to this specific node.
|
/// We no longer need to send a query to this specific node.
|
||||||
@ -409,49 +420,10 @@ pub enum QueryStatePollOut<'a> {
|
|||||||
/// It is guaranteed that an earlier polling returned `SendRpc` with this peer id.
|
/// It is guaranteed that an earlier polling returned `SendRpc` with this peer id.
|
||||||
CancelRpc {
|
CancelRpc {
|
||||||
/// The target.
|
/// The target.
|
||||||
peer_id: &'a PeerId,
|
peer_id: &'a TPeerId,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// What we're aiming for with our query.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum QueryTarget {
|
|
||||||
/// Finding a peer.
|
|
||||||
FindPeer(PeerId),
|
|
||||||
/// Find the peers that provide a certain value.
|
|
||||||
GetProviders(Multihash),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl QueryTarget {
|
|
||||||
/// Creates the corresponding RPC request to send to remote.
|
|
||||||
#[inline]
|
|
||||||
pub fn to_rpc_request<TUserData>(&self, user_data: TUserData) -> KademliaHandlerIn<TUserData> {
|
|
||||||
self.clone().into_rpc_request(user_data)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates the corresponding RPC request to send to remote.
|
|
||||||
pub fn into_rpc_request<TUserData>(self, user_data: TUserData) -> KademliaHandlerIn<TUserData> {
|
|
||||||
match self {
|
|
||||||
QueryTarget::FindPeer(key) => KademliaHandlerIn::FindNodeReq {
|
|
||||||
key,
|
|
||||||
user_data,
|
|
||||||
},
|
|
||||||
QueryTarget::GetProviders(key) => KademliaHandlerIn::GetProvidersReq {
|
|
||||||
key,
|
|
||||||
user_data,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the hash of the thing we're looking for.
|
|
||||||
pub fn as_hash(&self) -> &Multihash {
|
|
||||||
match self {
|
|
||||||
QueryTarget::FindPeer(peer) => peer.as_ref(),
|
|
||||||
QueryTarget::GetProviders(key) => key,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// State of peer in the context of a query.
|
/// State of peer in the context of a query.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum QueryPeerState {
|
enum QueryPeerState {
|
||||||
@ -467,7 +439,7 @@ enum QueryPeerState {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::{QueryConfig, QueryState, QueryStatePollOut, QueryTarget};
|
use super::{QueryConfig, QueryState, QueryStatePollOut};
|
||||||
use futures::{self, try_ready, prelude::*};
|
use futures::{self, try_ready, prelude::*};
|
||||||
use libp2p_core::PeerId;
|
use libp2p_core::PeerId;
|
||||||
use std::{iter, time::Duration, sync::Arc, sync::Mutex, thread};
|
use std::{iter, time::Duration, sync::Arc, sync::Mutex, thread};
|
||||||
@ -476,7 +448,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn start_by_sending_rpc_to_known_peers() {
|
fn start_by_sending_rpc_to_known_peers() {
|
||||||
let random_id = PeerId::random();
|
let random_id = PeerId::random();
|
||||||
let target = QueryTarget::FindPeer(PeerId::random());
|
let target = PeerId::random();
|
||||||
|
|
||||||
let mut query = QueryState::new(QueryConfig {
|
let mut query = QueryState::new(QueryConfig {
|
||||||
target,
|
target,
|
||||||
@ -500,7 +472,7 @@ mod tests {
|
|||||||
fn continue_second_result() {
|
fn continue_second_result() {
|
||||||
let random_id = PeerId::random();
|
let random_id = PeerId::random();
|
||||||
let random_id2 = PeerId::random();
|
let random_id2 = PeerId::random();
|
||||||
let target = QueryTarget::FindPeer(PeerId::random());
|
let target = PeerId::random();
|
||||||
|
|
||||||
let query = Arc::new(Mutex::new(QueryState::new(QueryConfig {
|
let query = Arc::new(Mutex::new(QueryState::new(QueryConfig {
|
||||||
target,
|
target,
|
||||||
@ -546,7 +518,7 @@ mod tests {
|
|||||||
let random_id = PeerId::random();
|
let random_id = PeerId::random();
|
||||||
|
|
||||||
let query = Arc::new(Mutex::new(QueryState::new(QueryConfig {
|
let query = Arc::new(Mutex::new(QueryState::new(QueryConfig {
|
||||||
target: QueryTarget::FindPeer(PeerId::random()),
|
target: PeerId::random(),
|
||||||
known_closest_peers: iter::once(random_id.clone()),
|
known_closest_peers: iter::once(random_id.clone()),
|
||||||
parallelism: 3,
|
parallelism: 3,
|
||||||
num_results: 100,
|
num_results: 100,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user