Refactor iterative queries. (#1154)

Refactoring of iterative queries (`query.rs`) to improve both
correctness and performance (for larger DHTs):

Correctness:

  1. Queries no longer terminate prematurely due to counting results
     from peers farther from the target while results from closer
     peers are still pending. (#1105).

  2. Queries no longer ignore reported closer peers that are not duplicates
     just because they are currently not among the `num_results` closest.
     The currently `max_results` closest may contain peers marked as failed
     or pending / waiting. Hence all reported closer peers that are not
     duplicates must be considered candidates that may still end up
     among the `num_results` closest that successfully responded.

  3. Bounded parallelism based on the `active_counter` was not working
     correctly, as new (not yet contacted) peers closer to the target
     may be discovered at any time and thus appear in `closer_peers`
     before the already active / pending peers.

  4. The `Frozen` query mechanism allowed all remaining not-yet contacted
     peers to be contacted, but their results were discarded, because
     `inject_rpc_result` would only incorporate results while the
     query is `Iterating`. The `Frozen` state has been reworked into
     a `Stalled` state that implements a slightly more permissive
     variant of the following from the paper / specs: "If a round of
     FIND_NODEs fails to return a node any closer than the closest
     already seen, the initiator resends the FIND_NODE to all of the
     k closest nodes it has not already queried.". Importantly, though
     not explicitly mentioned, the query can move back to `Iterating`
     if it makes further progress again as a result of these requests.
     The `Stalled` state thus allows (temporarily) higher parallelism
     in an effort to make progress and bring the query to an end.

Performance:

  1. Repeated distance calculations between the same peers and the
     target is avoided.

  2. Enabled by #1108, use of a more appropriate data structure (`BTreeMap`) for
     the incrementally updated list of closer peers. The data structure needs
     efficient lookups (to avoid duplicates) and insertions at any position,
     both of which large(r) vectors are not that good at. Unscientific benchmarks
     showed a ~40-60% improvement in somewhat pathological scenarios with at least
     20 healthy nodes, each possibly returning a distinct list of closer 20 peers
     to the requestor. A previous assumption may have been that the vector always
     stays very small, but that is not the case in larger clusters: Even if the
     lists of closer peers reported by the 20 contacted peers are heavily overlapping,
     typically a lot more than 20 peers have to be (at least temporarily) considered
     as closest peers until the query completes. See also issue (2) above.

New tests are added for:

  * Query termination conditions.
  * Bounded parallelism.
  * Absence of duplicates.
This commit is contained in:
Roman Borschel 2019-06-20 13:26:09 +02:00 committed by GitHub
parent acebab07f3
commit 69bd0dfffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 683 additions and 511 deletions

View File

@ -22,7 +22,7 @@ use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
use crate::query::{QueryConfig, Query, QueryState};
use crate::write::WriteState;
use crate::record::{MemoryRecordStorage, RecordStore, Record, RecordStorageError};
use fnv::{FnvHashMap, FnvHashSet};
@ -47,7 +47,7 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag
/// All the iterative queries we are currently performing, with their ID. The last parameter
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
active_queries: FnvHashMap<QueryId, QueryState<QueryInfo, PeerId>>,
active_queries: FnvHashMap<QueryId, Query<QueryInfo, PeerId>>,
/// All the `PUT_VALUE` actions we are currently performing
active_writes: FnvHashMap<QueryId, WriteState<PeerId, Multihash>>,
@ -65,7 +65,6 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag
/// List of values and peers that are providing them.
///
/// Our local peer ID can be in this container.
// TODO: Note that in reality the value is a SHA-256 of the actual value (https://github.com/libp2p/rust-libp2p/issues/694)
values_providers: FnvHashMap<Multihash, SmallVec<[PeerId; 20]>>,
/// List of values that we are providing ourselves. Must be kept in sync with
@ -75,16 +74,8 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag
/// Interval to send `ADD_PROVIDER` messages to everyone.
refresh_add_providers: stream::Fuse<Interval>,
/// `α` in the Kademlia reference papers. Designates the maximum number of queries that we
/// perform in parallel.
parallelism: usize,
/// The number of results to return from a query. Defaults to the maximum number
/// of entries in a single k-bucket, i.e. the `k` parameter.
num_results: usize,
/// Timeout for a single RPC.
rpc_timeout: Duration,
/// The configuration for iterative queries.
query_config: QueryConfig,
/// Queued events to return when the behaviour is being polled.
queued_events: SmallVec<[NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaOut>; 32]>,
@ -215,8 +206,7 @@ impl<TSubstream, TRecordStorage> Kademlia<TSubstream, TRecordStorage>
where
TRecordStorage: RecordStore
{
/// Creates a `Kademlia`.
#[inline]
/// Creates a new `Kademlia` network behaviour with the given local `PeerId`.
pub fn new(local_peer_id: PeerId) -> Self
where
TRecordStorage: Default
@ -249,7 +239,6 @@ where
///
/// Contrary to `new`, doesn't perform the initialization queries that store our local ID into
/// the DHT and fill our buckets.
#[inline]
#[deprecated(note="this function is now equivalent to new() and will be removed in the future")]
pub fn without_init(local_peer_id: PeerId) -> Self
where TRecordStorage: Default
@ -299,7 +288,7 @@ where
/// Inner implementation of the constructors.
fn new_inner(local_peer_id: PeerId, records: TRecordStorage) -> Self {
let parallelism = 3;
let query_config = QueryConfig::default();
Kademlia {
kbuckets: KBucketsTable::new(kbucket::Key::new(local_peer_id), Duration::from_secs(60)), // TODO: constant
@ -308,14 +297,12 @@ where
active_queries: Default::default(),
active_writes: Default::default(),
connected_peers: Default::default(),
pending_rpcs: SmallVec::with_capacity(parallelism),
pending_rpcs: SmallVec::with_capacity(query_config.parallelism),
next_query_id: QueryId(0),
values_providers: FnvHashMap::default(),
providing_keys: FnvHashSet::default(),
refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant
parallelism,
num_results: kbucket::MAX_NODES_PER_BUCKET,
rpc_timeout: Duration::from_secs(8),
query_config,
add_provider: SmallVec::new(),
marker: PhantomData,
records,
@ -429,24 +416,13 @@ where
};
let target_key = kbucket::Key::new(target.clone());
let known_closest_peers = self.kbuckets.closest_keys(&target_key);
let query = Query::with_config(self.query_config.clone(), target, known_closest_peers);
let known_closest_peers = self.kbuckets
.closest_keys(&target_key)
.take(self.num_results);
self.active_queries.insert(
query_id,
QueryState::new(QueryConfig {
target,
parallelism: self.parallelism,
num_results: self.num_results,
rpc_timeout: self.rpc_timeout,
known_closest_peers,
})
);
self.active_queries.insert(query_id, query);
}
/// Processes discovered peers from a query.
/// Processes discovered peers from an iterative `Query`.
fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
where
I: Iterator<Item=&'a KadPeer> + Clone
@ -469,7 +445,7 @@ where
query.target_mut().untrusted_addresses
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
}
query.inject_rpc_result(source, others_iter.cloned().map(|kp| kp.node_id))
query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
}
}
@ -480,7 +456,7 @@ where
self.kbuckets
.closest(target)
.filter(|e| e.node.key.preimage() != source)
.take(self.num_results)
.take(self.query_config.num_results)
.map(KadPeer::from)
.collect()
}
@ -628,7 +604,7 @@ where
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
for query in self.active_queries.values_mut() {
query.inject_rpc_error(peer_id);
query.on_failure(peer_id);
}
for write in self.active_writes.values_mut() {
write.inject_write_error(peer_id);
@ -637,7 +613,7 @@ where
fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) {
for query in self.active_queries.values_mut() {
query.inject_rpc_error(id);
query.on_failure(id);
}
for write in self.active_writes.values_mut() {
write.inject_write_error(id);
@ -659,6 +635,7 @@ where
if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() {
if let ConnectedPoint::Dialer { address } = new_endpoint {
// TODO: Remove the old address, i.e. from `_old`?
addrs.insert(address);
}
}
@ -715,7 +692,7 @@ where
// It is possible that we obtain a response for a query that has finished, which is
// why we may not find an entry in `self.active_queries`.
if let Some(query) = self.active_queries.get_mut(&user_data) {
query.inject_rpc_error(&source)
query.on_failure(&source)
}
if let Some(write) = self.active_writes.get_mut(&user_data) {
@ -774,13 +751,12 @@ where
}
if let Some(finished_query) = finished_query {
let (query_info, _) = self
.active_queries
let result = self.active_queries
.remove(&finished_query)
.expect("finished_query was gathered when peeking into active_queries; QED.")
.into_target_and_closest_peers();
.into_result();
match query_info.inner {
match result.target.inner {
QueryInfoInner::GetValue { key: _, results, .. } => {
let result = GetValueResult::Found { results };
let event = KademliaOut::GetValueResult(result);
@ -831,6 +807,8 @@ where
Self::OutEvent,
>,
> {
let now = Instant::now();
// Flush the changes to the topology that we want to make.
for (key, provider) in self.add_provider.drain() {
// Don't add ourselves to the providers.
@ -877,33 +855,29 @@ where
'queries_iter: for (&query_id, query) in self.active_queries.iter_mut() {
loop {
match query.poll() {
Async::Ready(QueryStatePollOut::Finished) => {
match query.next(now) {
QueryState::Finished => {
finished_query = Some(query_id);
break 'queries_iter;
}
Async::Ready(QueryStatePollOut::SendRpc {
peer_id,
query_target,
}) => {
let rpc = query_target.to_rpc_request(query_id);
QueryState::Waiting(Some(peer_id)) => {
if self.connected_peers.contains(peer_id) {
let peer_id = peer_id.clone();
let event = query.target().to_rpc_request(query_id);
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: rpc,
peer_id,
event
});
} else if peer_id != self.kbuckets.local_key().preimage() {
self.pending_rpcs.push((peer_id.clone(), rpc));
let peer_id = peer_id.clone();
let event = query.target().to_rpc_request(query_id);
self.pending_rpcs.push((peer_id.clone(), event));
return Async::Ready(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),
peer_id,
});
}
}
Async::Ready(QueryStatePollOut::CancelRpc { peer_id }) => {
// We don't cancel if the RPC has already been sent out.
self.pending_rpcs.retain(|(id, _)| id != peer_id);
}
Async::NotReady => break,
QueryState::Waiting(None) | QueryState::WaitingAtCapacity => break,
}
}
}
@ -932,32 +906,31 @@ where
}
if let Some(finished_query) = finished_query {
let (query_info, closer_peers) = self
.active_queries
let result = self.active_queries
.remove(&finished_query)
.expect("finished_query was gathered when iterating active_queries; QED.")
.into_target_and_closest_peers();
.into_result();
match query_info.inner {
match result.target.inner {
QueryInfoInner::Initialization { .. } => {},
QueryInfoInner::FindPeer(target) => {
let event = KademliaOut::FindNodeResult {
key: target,
closer_peers: closer_peers.collect(),
closer_peers: result.closest_peers.collect(),
};
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::GetProviders { target, pending_results } => {
let event = KademliaOut::GetProvidersResult {
key: target,
closer_peers: closer_peers.collect(),
closer_peers: result.closest_peers.collect(),
provider_peers: pending_results,
};
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::AddProvider { target } => {
for closest in closer_peers {
for closest in result.closest_peers {
let event = NetworkBehaviourAction::SendEvent {
peer_id: closest,
event: KademliaHandlerIn::AddProvider {
@ -974,11 +947,11 @@ where
},
QueryInfoInner::GetValue { key, results, .. } => {
let result = match results.len() {
0 => GetValueResult::NotFound{
0 => GetValueResult::NotFound {
key,
closest_peers: closer_peers.collect()
closest_peers: result.closest_peers.collect()
},
_ => GetValueResult::Found{ results },
_ => GetValueResult::Found { results },
};
let event = KademliaOut::GetValueResult(result);
@ -986,8 +959,8 @@ where
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::PutValue { key, value } => {
let closer_peers = Vec::from_iter(closer_peers);
for peer in &closer_peers {
let closest_peers = Vec::from_iter(result.closest_peers);
for peer in &closest_peers {
let event = KademliaHandlerIn::PutValue {
key: key.clone(),
value: value.clone(),
@ -1008,7 +981,7 @@ where
}
}
self.active_writes.insert(finished_query, WriteState::new(key, closer_peers));
self.active_writes.insert(finished_query, WriteState::new(key, closest_peers));
},
}
} else {

File diff suppressed because it is too large Load Diff