diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index f6fd35c6..7cea6f11 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -22,7 +22,7 @@ use crate::addresses::Addresses; use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn}; use crate::kbucket::{self, KBucketsTable, NodeStatus}; use crate::protocol::{KadConnectionType, KadPeer}; -use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; +use crate::query::{QueryConfig, Query, QueryState}; use crate::write::WriteState; use crate::record::{MemoryRecordStorage, RecordStore, Record, RecordStorageError}; use fnv::{FnvHashMap, FnvHashSet}; @@ -47,7 +47,7 @@ pub struct Kademlia>, + active_queries: FnvHashMap>, /// All the `PUT_VALUE` actions we are currently performing active_writes: FnvHashMap>, @@ -65,7 +65,6 @@ pub struct Kademlia>, /// List of values that we are providing ourselves. Must be kept in sync with @@ -75,16 +74,8 @@ pub struct Kademlia, - /// `α` in the Kademlia reference papers. Designates the maximum number of queries that we - /// perform in parallel. - parallelism: usize, - - /// The number of results to return from a query. Defaults to the maximum number - /// of entries in a single k-bucket, i.e. the `k` parameter. - num_results: usize, - - /// Timeout for a single RPC. - rpc_timeout: Duration, + /// The configuration for iterative queries. + query_config: QueryConfig, /// Queued events to return when the behaviour is being polled. queued_events: SmallVec<[NetworkBehaviourAction, KademliaOut>; 32]>, @@ -215,8 +206,7 @@ impl Kademlia where TRecordStorage: RecordStore { - /// Creates a `Kademlia`. - #[inline] + /// Creates a new `Kademlia` network behaviour with the given local `PeerId`. pub fn new(local_peer_id: PeerId) -> Self where TRecordStorage: Default @@ -249,7 +239,6 @@ where /// /// Contrary to `new`, doesn't perform the initialization queries that store our local ID into /// the DHT and fill our buckets. - #[inline] #[deprecated(note="this function is now equivalent to new() and will be removed in the future")] pub fn without_init(local_peer_id: PeerId) -> Self where TRecordStorage: Default @@ -299,7 +288,7 @@ where /// Inner implementation of the constructors. fn new_inner(local_peer_id: PeerId, records: TRecordStorage) -> Self { - let parallelism = 3; + let query_config = QueryConfig::default(); Kademlia { kbuckets: KBucketsTable::new(kbucket::Key::new(local_peer_id), Duration::from_secs(60)), // TODO: constant @@ -308,14 +297,12 @@ where active_queries: Default::default(), active_writes: Default::default(), connected_peers: Default::default(), - pending_rpcs: SmallVec::with_capacity(parallelism), + pending_rpcs: SmallVec::with_capacity(query_config.parallelism), next_query_id: QueryId(0), values_providers: FnvHashMap::default(), providing_keys: FnvHashSet::default(), refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant - parallelism, - num_results: kbucket::MAX_NODES_PER_BUCKET, - rpc_timeout: Duration::from_secs(8), + query_config, add_provider: SmallVec::new(), marker: PhantomData, records, @@ -429,24 +416,13 @@ where }; let target_key = kbucket::Key::new(target.clone()); + let known_closest_peers = self.kbuckets.closest_keys(&target_key); + let query = Query::with_config(self.query_config.clone(), target, known_closest_peers); - let known_closest_peers = self.kbuckets - .closest_keys(&target_key) - .take(self.num_results); - - self.active_queries.insert( - query_id, - QueryState::new(QueryConfig { - target, - parallelism: self.parallelism, - num_results: self.num_results, - rpc_timeout: self.rpc_timeout, - known_closest_peers, - }) - ); + self.active_queries.insert(query_id, query); } - /// Processes discovered peers from a query. + /// Processes discovered peers from an iterative `Query`. fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I) where I: Iterator + Clone @@ -469,7 +445,7 @@ where query.target_mut().untrusted_addresses .insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect()); } - query.inject_rpc_result(source, others_iter.cloned().map(|kp| kp.node_id)) + query.on_success(source, others_iter.cloned().map(|kp| kp.node_id)) } } @@ -480,7 +456,7 @@ where self.kbuckets .closest(target) .filter(|e| e.node.key.preimage() != source) - .take(self.num_results) + .take(self.query_config.num_results) .map(KadPeer::from) .collect() } @@ -628,7 +604,7 @@ where fn inject_dial_failure(&mut self, peer_id: &PeerId) { for query in self.active_queries.values_mut() { - query.inject_rpc_error(peer_id); + query.on_failure(peer_id); } for write in self.active_writes.values_mut() { write.inject_write_error(peer_id); @@ -637,7 +613,7 @@ where fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) { for query in self.active_queries.values_mut() { - query.inject_rpc_error(id); + query.on_failure(id); } for write in self.active_writes.values_mut() { write.inject_write_error(id); @@ -659,6 +635,7 @@ where if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() { if let ConnectedPoint::Dialer { address } = new_endpoint { + // TODO: Remove the old address, i.e. from `_old`? addrs.insert(address); } } @@ -715,7 +692,7 @@ where // It is possible that we obtain a response for a query that has finished, which is // why we may not find an entry in `self.active_queries`. if let Some(query) = self.active_queries.get_mut(&user_data) { - query.inject_rpc_error(&source) + query.on_failure(&source) } if let Some(write) = self.active_writes.get_mut(&user_data) { @@ -774,13 +751,12 @@ where } if let Some(finished_query) = finished_query { - let (query_info, _) = self - .active_queries + let result = self.active_queries .remove(&finished_query) .expect("finished_query was gathered when peeking into active_queries; QED.") - .into_target_and_closest_peers(); + .into_result(); - match query_info.inner { + match result.target.inner { QueryInfoInner::GetValue { key: _, results, .. } => { let result = GetValueResult::Found { results }; let event = KademliaOut::GetValueResult(result); @@ -831,6 +807,8 @@ where Self::OutEvent, >, > { + let now = Instant::now(); + // Flush the changes to the topology that we want to make. for (key, provider) in self.add_provider.drain() { // Don't add ourselves to the providers. @@ -877,33 +855,29 @@ where 'queries_iter: for (&query_id, query) in self.active_queries.iter_mut() { loop { - match query.poll() { - Async::Ready(QueryStatePollOut::Finished) => { + match query.next(now) { + QueryState::Finished => { finished_query = Some(query_id); break 'queries_iter; } - Async::Ready(QueryStatePollOut::SendRpc { - peer_id, - query_target, - }) => { - let rpc = query_target.to_rpc_request(query_id); + QueryState::Waiting(Some(peer_id)) => { if self.connected_peers.contains(peer_id) { + let peer_id = peer_id.clone(); + let event = query.target().to_rpc_request(query_id); return Async::Ready(NetworkBehaviourAction::SendEvent { - peer_id: peer_id.clone(), - event: rpc, + peer_id, + event }); } else if peer_id != self.kbuckets.local_key().preimage() { - self.pending_rpcs.push((peer_id.clone(), rpc)); + let peer_id = peer_id.clone(); + let event = query.target().to_rpc_request(query_id); + self.pending_rpcs.push((peer_id.clone(), event)); return Async::Ready(NetworkBehaviourAction::DialPeer { - peer_id: peer_id.clone(), + peer_id, }); } } - Async::Ready(QueryStatePollOut::CancelRpc { peer_id }) => { - // We don't cancel if the RPC has already been sent out. - self.pending_rpcs.retain(|(id, _)| id != peer_id); - } - Async::NotReady => break, + QueryState::Waiting(None) | QueryState::WaitingAtCapacity => break, } } } @@ -932,32 +906,31 @@ where } if let Some(finished_query) = finished_query { - let (query_info, closer_peers) = self - .active_queries + let result = self.active_queries .remove(&finished_query) .expect("finished_query was gathered when iterating active_queries; QED.") - .into_target_and_closest_peers(); + .into_result(); - match query_info.inner { + match result.target.inner { QueryInfoInner::Initialization { .. } => {}, QueryInfoInner::FindPeer(target) => { let event = KademliaOut::FindNodeResult { key: target, - closer_peers: closer_peers.collect(), + closer_peers: result.closest_peers.collect(), }; break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); }, QueryInfoInner::GetProviders { target, pending_results } => { let event = KademliaOut::GetProvidersResult { key: target, - closer_peers: closer_peers.collect(), + closer_peers: result.closest_peers.collect(), provider_peers: pending_results, }; break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); }, QueryInfoInner::AddProvider { target } => { - for closest in closer_peers { + for closest in result.closest_peers { let event = NetworkBehaviourAction::SendEvent { peer_id: closest, event: KademliaHandlerIn::AddProvider { @@ -974,11 +947,11 @@ where }, QueryInfoInner::GetValue { key, results, .. } => { let result = match results.len() { - 0 => GetValueResult::NotFound{ + 0 => GetValueResult::NotFound { key, - closest_peers: closer_peers.collect() + closest_peers: result.closest_peers.collect() }, - _ => GetValueResult::Found{ results }, + _ => GetValueResult::Found { results }, }; let event = KademliaOut::GetValueResult(result); @@ -986,13 +959,13 @@ where break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); }, QueryInfoInner::PutValue { key, value } => { - let closer_peers = Vec::from_iter(closer_peers); - for peer in &closer_peers { + let closest_peers = Vec::from_iter(result.closest_peers); + for peer in &closest_peers { let event = KademliaHandlerIn::PutValue { - key: key.clone(), - value: value.clone(), - user_data: finished_query, - }; + key: key.clone(), + value: value.clone(), + user_data: finished_query, + }; if self.connected_peers.contains(peer) { let event = NetworkBehaviourAction::SendEvent { @@ -1008,7 +981,7 @@ where } } - self.active_writes.insert(finished_query, WriteState::new(key, closer_peers)); + self.active_writes.insert(finished_query, WriteState::new(key, closest_peers)); }, } } else { diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 1a097808..66075520 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -18,555 +18,754 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Contains the iterative querying process of Kademlia. +//! A state machine for an iterative Kademlia query. //! -//! This allows one to create queries that iterate on the DHT on nodes that become closer and -//! closer to the target. +//! Using a [`Query`] typically involves performing the following steps +//! repeatedly and in an alternating fashion: +//! +//! 1. Calling [`next`] to observe the next state of the query and determine +//! what to do, which is to either issue new requests to peers or continue +//! waiting for responses. +//! +//! 2. When responses are received or requests fail, providing input to the +//! query via the [`on_success`] and [`on_failure`] callbacks, +//! respectively, followed by repeating step (1). +//! +//! When a call to [`next`] returns [`Finished`], the query is finished and the +//! resulting closest peers can be obtained from [`into_result`]. +//! +//! A query can be finished prematurely at any time through [`finish`] +//! (e.g. if a response to a `FIND_VALUE` request returns the value being +//! searched for). +//! +//! [`Query`]: query::Query +//! [`next`]: query::Query::next +//! [`finish`]: query::Query::finish +//! [`on_success`]: query::Query::on_success +//! [`on_failure`]: query::Query::on_failure +//! [`into_result`]: query::Query::into_result +//! [`Finished`]: query::QueryState::Finished -use crate::kbucket; -use futures::prelude::*; -use smallvec::SmallVec; -use std::{cmp::PartialEq, time::Duration}; -use wasm_timer::{Delay, Instant}; +use crate::kbucket::{Key, Distance, MAX_NODES_PER_BUCKET}; +use std::{time::Duration, iter::FromIterator}; +use std::collections::btree_map::{BTreeMap, Entry}; +use wasm_timer::Instant; -/// State of a query iterative process. -/// -/// The API of this state machine is similar to the one of `Future`, `Stream` or `Swarm`. You need -/// to call `poll()` to query the state for actions to perform. If `NotReady` is returned, the -/// current task will be woken up automatically when `poll()` needs to be called again. -/// -/// Note that this struct only handles iterating over nodes that are close to the target. For -/// `FIND_NODE` queries you don't need more than that. However for `FIND_VALUE` and -/// `GET_PROVIDERS`, you need to extract yourself the value or list of providers from RPC requests -/// received by remotes as this is not handled by the `QueryState`. -#[derive(Debug)] -pub struct QueryState { - /// Target we're looking for. +/// A `Query` is a state machine for an iterative Kademlia query. +#[derive(Debug, Clone)] +pub struct Query { + /// The configuration of the query. + config: QueryConfig, + + /// The target of the query. target: TTarget, - /// The `kbucket::Key` representation of the `target`. - target_key: kbucket::Key, + /// The target of the query as a `Key`. + target_key: Key, - /// Stage of the query. See the documentation of `QueryStage`. - stage: QueryStage, + /// The current state of progress of the query. + progress: QueryProgress, - /// Ordered list of the peers closest to the result we're looking for. - /// Entries that are `InProgress` shouldn't be removed from the list before they complete. - /// Must never contain two entries with the same peer IDs. - closest_peers: SmallVec<[(kbucket::Key, QueryPeerState); 32]>, + /// The closest peers to the target, ordered by increasing distance. + closest_peers: BTreeMap>, - /// Allowed level of parallelism. - parallelism: usize, - - /// Number of results to produce. - num_results: usize, - - /// Timeout for each individual RPC query. - rpc_timeout: Duration, + /// The number of peers for which the query is currently waiting for results. + num_waiting: usize, } -/// Configuration for a query. +/// Configuration for a `Query`. #[derive(Debug, Clone)] -pub struct QueryConfig { - /// Target of the query. - pub target: TTarget, - - /// Iterator to a list of `num_results` nodes that we know of whose distance is close to the - /// target. - pub known_closest_peers: TIter, - +pub struct QueryConfig { /// Allowed level of parallelism. + /// + /// The `α` parameter in the Kademlia paper. The maximum number of peers that a query + /// is allowed to wait for in parallel while iterating towards the closest + /// nodes to a target. Defaults to `3`. pub parallelism: usize, /// Number of results to produce. + /// + /// The number of closest peers that a query must obtain successful results + /// for before it terminates. Defaults to the maximum number of entries in a + /// single k-bucket, i.e. the `k` parameter in the Kademlia paper. pub num_results: usize, - /// Timeout for each individual RPC query. - pub rpc_timeout: Duration, -} - -/// Stage of the query. -#[derive(Debug)] -enum QueryStage { - /// We are trying to find a closest node. - Iterating { - /// Number of successful query results in a row that didn't find any closer node. - // TODO: this is not great, because we don't necessarily receive responses in the order - // we made the queries. It is possible that we query multiple far-away nodes in a - // row, and obtain results before the result of the closest nodes. - no_closer_in_a_row: usize, - }, - - // We have found the closest node, and we are now pinging the nodes we know about. - Frozen, -} - -impl QueryState -where - TTarget: Into> + Clone, - TPeerId: Into> + Eq -{ - /// Creates a new query. + /// The timeout for a single peer. /// - /// You should call `poll()` this function returns in order to know what to do. - pub fn new(config: QueryConfig>, TTarget>) -> Self { - let mut closest_peers: SmallVec<[_; 32]> = config - .known_closest_peers - .into_iter() - .map(|key| (key, QueryPeerState::NotContacted)) - .take(config.num_results) - .collect(); + /// If a successful result is not reported for a peer within this timeout + /// window, a query considers the peer unresponsive and will not consider + /// the peer when evaluating the termination conditions of a query until + /// and unless it responds. Defaults to `10` seconds. + pub peer_timeout: Duration, +} - let target_key = config.target.clone().into(); - closest_peers.sort_by_key(|e| target_key.distance(&e.0)); - closest_peers.dedup_by(|a, b| a.0 == b.0); +impl Default for QueryConfig { + fn default() -> Self { + QueryConfig { + parallelism: 3, + peer_timeout: Duration::from_secs(10), + num_results: MAX_NODES_PER_BUCKET + } + } +} - QueryState { - target: config.target, +/// The result of a `Query`. +pub struct QueryResult { + /// The target of the query. + pub target: TTarget, + /// The closest peers to the target found by the query. + pub closest_peers: TClosest +} + +/// The state of the query reported by [`Query::next`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum QueryState<'a, TPeerId> { + /// The query is waiting for results. + /// + /// `Some(peer)` indicates that the query is now waiting for a result + /// from `peer`, in addition to any other peers for which the query is already + /// waiting for results. + /// + /// `None` indicates that the query is waiting for results and there is no + /// new peer to contact, despite the query not being at capacity w.r.t. + /// the permitted parallelism. + Waiting(Option<&'a TPeerId>), + + /// The query is waiting for results and is at capacity w.r.t. the + /// permitted parallelism. + WaitingAtCapacity, + + /// The query finished. + Finished +} + +impl Query +where + TTarget: Into> + Clone, + TPeerId: Into> + Eq + Clone +{ + /// Creates a new query with a default configuration. + pub fn new(target: TTarget, known_closest_peers: I) -> Self + where + I: IntoIterator> + { + Self::with_config(QueryConfig::default(), target, known_closest_peers) + } + + /// Creates a new query with the given configuration. + pub fn with_config(config: QueryConfig, target: TTarget, known_closest_peers: I) -> Self + where + I: IntoIterator> + { + let target_key = target.clone().into(); + + // Initialise the closest peers to begin the query with. + let closest_peers = BTreeMap::from_iter( + known_closest_peers + .into_iter() + .map(|key| { + let distance = key.distance(&target_key); + let state = QueryPeerState::NotContacted; + (distance, QueryPeer { key, state }) + }) + .take(config.num_results)); + + // The query initially makes progress by iterating towards the target. + let progress = QueryProgress::Iterating { no_progress : 0 }; + + Query { + config, + target, target_key, - stage: QueryStage::Iterating { - no_closer_in_a_row: 0, - }, + progress, closest_peers, - parallelism: config.parallelism, - num_results: config.num_results, - rpc_timeout: config.rpc_timeout, + num_waiting: 0 } } - /// Returns the target of the query. Always the same as what was passed to `new()`. - #[inline] + /// Borrows the underlying target of the query. pub fn target(&self) -> &TTarget { &self.target } - /// Returns the target of the query. Always the same as what was passed to `new()`. - /// - /// You shouldn't modify the target in such a way that modifies the target of the query, - /// otherwise logic errors will likely happen. - #[inline] + /// Mutably borrows the underlying target of the query. pub fn target_mut(&mut self) -> &mut TTarget { &mut self.target } - /// After `poll()` returned `SendRpc`, this method should be called when the node sends back - /// the result of the query. + /// Callback for delivering the result of a successful request to a peer + /// that the query is waiting on. /// - /// Note that if this query is a `FindValue` query and a node returns a record, feel free to - /// immediately drop the query altogether and use the record. + /// Delivering results of requests back to the query allows the query to make + /// progress. The query is said to make progress either when the given + /// `closer_peers` contain a peer closer to the target than any peer seen so far, + /// or when the query did not yet accumulate `num_results` closest peers and + /// `closer_peers` contains a new peer, regardless of its distance to the target. /// - /// After this function returns, you should call `poll()` again. - pub fn inject_rpc_result( - &mut self, - result_source: &impl PartialEq, - closer_peers: impl IntoIterator, - ) { + /// After calling this function, `next` should eventually be called again + /// to advance the state of the query. + /// + /// If the query is finished, the query is not currently waiting for a + /// result from `peer`, or a result for `peer` has already been reported, + /// calling this function has no effect. + pub fn on_success(&mut self, peer: &TPeerId, closer_peers: I) + where + I: IntoIterator + { + if let QueryProgress::Finished = self.progress { + return + } + + let key = peer.clone().into(); + let distance = key.distance(&self.target_key); + // Mark the peer as succeeded. - for (peer_id, state) in self.closest_peers.iter_mut() { - if result_source == peer_id.preimage() { - if let state @ QueryPeerState::InProgress(_) = state { - *state = QueryPeerState::Succeeded; + match self.closest_peers.entry(distance) { + Entry::Vacant(..) => return, + Entry::Occupied(mut e) => match e.get().state { + QueryPeerState::Waiting(..) => { + debug_assert!(self.num_waiting > 0); + self.num_waiting -= 1; + e.get_mut().state = QueryPeerState::Succeeded; } + QueryPeerState::Unresponsive => { + e.get_mut().state = QueryPeerState::Succeeded; + } + QueryPeerState::NotContacted + | QueryPeerState::Failed + | QueryPeerState::Succeeded => return } } let num_closest = self.closest_peers.len(); + let mut progress = false; - // Add the entries in `closest_peers`. - if let QueryStage::Iterating { - ref mut no_closer_in_a_row, - } = self.stage - { - let target = &self.target_key; - - // We increment now, and reset to 0 if we find a closer node. - *no_closer_in_a_row += 1; - - for peer in closer_peers { - let peer_key = peer.into(); - let peer_distance = target.distance(&peer_key); - let insert_pos_start = self.closest_peers.iter().position(|(key, _)| { - target.distance(&key) >= peer_distance - }); - - if let Some(insert_pos_start) = insert_pos_start { - // We need to insert the element between `insert_pos_start` and - // `insert_pos_start + insert_pos_size`. - let insert_pos_size = self.closest_peers.iter() - .skip(insert_pos_start) - .position(|(key, _)| { - target.distance(&key) > peer_distance - }); - - // Make sure we don't insert duplicates. - let mut iter_start = self.closest_peers.iter().skip(insert_pos_start); - let duplicate = if let Some(insert_pos_size) = insert_pos_size { - iter_start.take(insert_pos_size).any(|e| e.0 == peer_key) - } else { - iter_start.any(|e| e.0 == peer_key) - }; - - if !duplicate { - if insert_pos_start == 0 { - *no_closer_in_a_row = 0; - } - debug_assert!(self.closest_peers.iter().all(|e| e.0 != peer_key)); - self.closest_peers - .insert(insert_pos_start, (peer_key, QueryPeerState::NotContacted)); - } - } else if num_closest < self.num_results { - debug_assert!(self.closest_peers.iter().all(|e| e.0 != peer_key)); - self.closest_peers.push((peer_key, QueryPeerState::NotContacted)); - } - } + // Incorporate the reported closer peers into the query. + for peer in closer_peers { + let key = peer.into(); + let distance = self.target_key.distance(&key); + let peer = QueryPeer { key, state: QueryPeerState::NotContacted }; + self.closest_peers.entry(distance).or_insert(peer); + // The query makes progress if the new peer is either closer to the target + // than any peer seen so far (i.e. is the first entry), or the query did + // not yet accumulate enough closest peers. + progress = self.closest_peers.keys().next() == Some(&distance) + || num_closest < self.config.num_results; } - // Check for duplicates in `closest_peers`. - debug_assert!(self.closest_peers.windows(2).all(|w| w[0].0 != w[1].0)); + // Update the query progress. + self.progress = match self.progress { + QueryProgress::Iterating { no_progress } => { + let no_progress = if progress { 0 } else { no_progress + 1 }; + if no_progress >= self.config.parallelism { + QueryProgress::Stalled + } else { + QueryProgress::Iterating { no_progress } + } + } + QueryProgress::Stalled => + if progress { + QueryProgress::Iterating { no_progress: 0 } + } else { + QueryProgress::Stalled + } + QueryProgress::Finished => QueryProgress::Finished + } + } - let num_closest_new = self.closest_peers.len(); + /// Callback for informing the query about a failed request to a peer + /// that the query is waiting on. + /// + /// After calling this function, `next` should eventually be called again + /// to advance the state of the query. + /// + /// If the query is finished, the query is not currently waiting for a + /// result from `peer`, or a result for `peer` has already been reported, + /// calling this function has no effect. + pub fn on_failure(&mut self, peer: &TPeerId) { + if let QueryProgress::Finished = self.progress { + return + } - // Termination condition: If at least `self.parallelism` consecutive - // responses yield no peer closer to the target and either no new peers - // were discovered or the number of discovered peers reached the desired - // number of results, then the query is considered complete. - if let QueryStage::Iterating { no_closer_in_a_row } = self.stage { - if no_closer_in_a_row >= self.parallelism && - (num_closest == num_closest_new || - num_closest_new >= self.num_results) - { - self.stage = QueryStage::Frozen; + let key = peer.clone().into(); + let distance = key.distance(&self.target_key); + + match self.closest_peers.entry(distance) { + Entry::Vacant(_) => return, + Entry::Occupied(mut e) => match e.get().state { + QueryPeerState::Waiting(_) => { + debug_assert!(self.num_waiting > 0); + self.num_waiting -= 1; + e.get_mut().state = QueryPeerState::Failed + } + QueryPeerState::Unresponsive => { + e.get_mut().state = QueryPeerState::Failed + } + _ => {} } } } - /// Returns the list of peers for which we are waiting for an answer. + /// Returns the list of peers for which the query is currently waiting + /// for results. pub fn waiting(&self) -> impl Iterator { - self.closest_peers - .iter() - .filter(|(_, state)| { - match state { - QueryPeerState::InProgress(_) => true, - QueryPeerState::NotContacted => false, - QueryPeerState::Succeeded => false, - QueryPeerState::Failed => false, - } + self.closest_peers.values().filter_map(|peer| + match peer.state { + QueryPeerState::Waiting(..) => Some(peer.key.preimage()), + _ => None }) - .map(|(key, _)| key.preimage()) } - /// Returns true if we are waiting for a query answer from that peer. - /// - /// After `poll()` returned `SendRpc`, this function will return `true`. - pub fn is_waiting(&self, id: &impl PartialEq) -> bool { - self.waiting().any(|peer_id| id == peer_id) + /// Returns the number of peers for which the query is currently + /// waiting for results. + pub fn num_waiting(&self) -> usize { + self.num_waiting } - /// After `poll()` returned `SendRpc`, this function should be called if we were unable to - /// reach the peer, or if an error of some sort happened. - /// - /// Has no effect if the peer ID is not relevant to the query, so feel free to call this - /// function whenever an error happens on the network. - /// - /// After this function returns, you should call `poll()` again. - pub fn inject_rpc_error(&mut self, id: &TPeerId) { - let state = self - .closest_peers - .iter_mut() - .find_map(|(peer_id, state)| - if peer_id.preimage() == id { - Some(state) - } else { - None - }); + /// Returns true if the query is waiting for a response from the given peer. + pub fn is_waiting(&self, peer: &TPeerId) -> bool { + self.waiting().any(|p| peer == p) + } - match state { - Some(state @ &mut QueryPeerState::InProgress(_)) => *state = QueryPeerState::Failed, - Some(&mut QueryPeerState::NotContacted) => (), - Some(&mut QueryPeerState::Succeeded) => (), - Some(&mut QueryPeerState::Failed) => (), - None => (), + /// Advances the state of the query, potentially getting a new peer to contact. + /// + /// See [`QueryState`]. + pub fn next(&mut self, now: Instant) -> QueryState { + if let QueryProgress::Finished = self.progress { + return QueryState::Finished } - } - /// Polls this individual query. - pub fn poll(&mut self) -> Async> { - // While iterating over peers, count the number of queries currently being processed. - // This is used to not go over the limit of parallel requests. - // If this is still 0 at the end of the function, that means the query is finished. - let mut active_counter = 0; + // Count the number of peers that returned a result. If there is a + // request in progress to one of the `num_results` closest peers, the + // counter is set to `None` as the query can only finish once + // `num_results` closest peers have responded (or there are no more + // peers to contact, see `active_counter`). + let mut result_counter = Some(0); - // While iterating over peers, count the number of queries in a row (from closer to further - // away from target) that are in the succeeded state. - let mut succeeded_counter = Some(0); + // Check if the query is at capacity w.r.t. the allowed parallelism. + let at_capacity = self.at_capacity(); - // Extract `self.num_results` to avoid borrowing errors with closures. - let num_results = self.num_results; - - for &mut (ref peer_id, ref mut state) in self.closest_peers.iter_mut() { - // Start by "killing" the query if it timed out. - if let QueryPeerState::InProgress(timeout) = state { - match timeout.poll() { - Ok(Async::Ready(_)) | Err(_) => { - *state = QueryPeerState::Failed; - return Async::Ready(QueryStatePollOut::CancelRpc { peer_id: peer_id.preimage() }); + for peer in self.closest_peers.values_mut() { + match peer.state { + QueryPeerState::Waiting(timeout) => { + if now >= timeout { + // Unresponsive peers no longer count towards the limit for the + // bounded parallelism, though they might still be ongoing and + // their results can still be delivered to the query. + debug_assert!(self.num_waiting > 0); + self.num_waiting -= 1; + peer.state = QueryPeerState::Unresponsive } - Ok(Async::NotReady) => { - succeeded_counter = None; - active_counter += 1 + else if at_capacity { + // The query is still waiting for a result from a peer and is + // at capacity w.r.t. the maximum number of peers being waited on. + return QueryState::WaitingAtCapacity + } + else { + // The query is still waiting for a result from a peer and the + // `result_counter` did not yet reach `num_results`. Therefore + // the query is not yet done, regardless of already successful + // queries to peers farther from the target. + result_counter = None; } } - } - if let QueryPeerState::Succeeded = state { - if let Some(ref mut cnt) = succeeded_counter { - *cnt += 1; - // If we have enough results; the query is done. - if *cnt >= num_results { - return Async::Ready(QueryStatePollOut::Finished) + QueryPeerState::Succeeded => + if let Some(ref mut cnt) = result_counter { + *cnt += 1; + // If `num_results` successful results have been delivered for the + // closest peers, the query is done. + if *cnt >= self.config.num_results { + self.progress = QueryProgress::Finished; + return QueryState::Finished + } } - } - } - if let QueryPeerState::NotContacted = state { - let connect = match self.stage { - QueryStage::Frozen => true, - QueryStage::Iterating {..} => active_counter < self.parallelism, - }; - if connect { - let delay = Delay::new(Instant::now() + self.rpc_timeout); - *state = QueryPeerState::InProgress(delay); - return Async::Ready(QueryStatePollOut::SendRpc { - peer_id: peer_id.preimage(), - query_target: &self.target, - }); - } else { - // The peer is among the `num_results` closest and still - // needs to be contacted, but the query is currently at - // capacity w.r.t. the allowed parallelism. - return Async::NotReady + QueryPeerState::NotContacted => + if !at_capacity { + let timeout = now + self.config.peer_timeout; + peer.state = QueryPeerState::Waiting(timeout); + self.num_waiting += 1; + return QueryState::Waiting(Some(peer.key.preimage())) + } else { + return QueryState::WaitingAtCapacity + } + + QueryPeerState::Unresponsive | QueryPeerState::Failed => { + // Skip over unresponsive or failed peers. } } } - // If we don't have any query in progress, return `Finished` as we don't have - // anything more we can do. - if active_counter > 0 { - Async::NotReady + if self.num_waiting > 0 { + // The query is still waiting for results and not at capacity w.r.t. + // the allowed parallelism, but there are no new peers to contact + // at the moment. + QueryState::Waiting(None) } else { - Async::Ready(QueryStatePollOut::Finished) + // The query is finished because all available peers have been contacted + // and the query is not waiting for any more results. + self.progress = QueryProgress::Finished; + QueryState::Finished } } - /// Consumes the query and returns the target and known closest peers. - /// - /// > **Note**: This can be called at any time, but you normally only do that once the query - /// > is finished. - pub fn into_target_and_closest_peers(self) -> (TTarget, impl Iterator) { - let closest = self.closest_peers + /// Immediately transitions the query to [`QueryState::Finished`]. + pub fn finish(&mut self) { + self.progress = QueryProgress::Finished + } + + /// Checks whether the query has finished. + pub fn finished(&self) -> bool { + self.progress == QueryProgress::Finished + } + + /// Consumes the query, returning the target and the closest peers. + pub fn into_result(self) -> QueryResult> { + let closest_peers = self.closest_peers .into_iter() - .filter_map(|(peer_id, state)| { - if let QueryPeerState::Succeeded = state { - Some(peer_id.into_preimage()) + .filter_map(|(_, peer)| { + if let QueryPeerState::Succeeded = peer.state { + Some(peer.key.into_preimage()) } else { None } }) - .take(self.num_results); - (self.target, closest) + .take(self.config.num_results); + + QueryResult { target: self.target, closest_peers } } - /// Consumes the query and returns the known closest peers. + /// Checks if the query is at capacity w.r.t. the permitted parallelism. /// - /// > **Note**: This can be called at any time, but you normally only do that once the query - /// > is finished. - pub fn into_closest_peers(self) -> impl Iterator { - self.into_target_and_closest_peers().1 + /// While the query is stalled, up to `num_results` parallel requests + /// are allowed. This is a slightly more permissive variant of the + /// requirement that the initiator "resends the FIND_NODE to all of the + /// k closest nodes it has not already queried". + fn at_capacity(&self) -> bool { + match self.progress { + QueryProgress::Stalled => self.num_waiting >= self.config.num_results, + QueryProgress::Iterating { .. } => self.num_waiting >= self.config.parallelism, + QueryProgress::Finished => true + } } } -/// Outcome of polling a query. -#[derive(Debug, Clone)] -pub enum QueryStatePollOut<'a, TTarget, TPeerId> { +//////////////////////////////////////////////////////////////////////////////// +// Private state + +/// Stage of the query. +#[derive(Debug, PartialEq, Eq, Copy, Clone)] +enum QueryProgress { + /// The query is making progress by iterating towards `num_results` closest + /// peers to the target with a maximum of `parallelism` peers for which the + /// query is waiting for results at a time. + /// + /// > **Note**: When the query switches back to `Iterating` after being + /// > `Stalled`, it may temporarily be waiting for more than `parallelism` + /// > results from peers, with new peers only being considered once + /// > the number pending results drops below `parallelism`. + Iterating { + /// The number of consecutive results that did not yield a peer closer + /// to the target. When this number reaches `parallelism` and no new + /// peer was discovered or at least `num_results` peers are known to + /// the query, it is considered `Stalled`. + no_progress: usize, + }, + + /// A query is stalled when it did not make progress after `parallelism` + /// consecutive successful results (see `on_success`). + /// + /// While the query is stalled, the maximum allowed parallelism for pending + /// results is increased to `num_results` in an attempt to finish the query. + /// If the query can make progress again upon receiving the remaining + /// results, it switches back to `Iterating`. Otherwise it will be finished. + Stalled, + /// The query is finished. /// - /// If this is a `FindValue` query, the user is supposed to extract the record themselves from - /// any RPC result sent by a remote. If the query finished without that happening, this means - /// that we didn't find any record. - /// Similarly, if this is a `GetProviders` query, the user is supposed to extract the providers - /// from any RPC result sent by a remote. - /// - /// If this is a `FindNode` query, you can call `into_closest_peers` in order to obtain the - /// result. - Finished, - - /// We need to send an RPC query to the given peer. - /// - /// The RPC query to send can be derived from the target of the query. - /// - /// After this has been returned, you should call either `inject_rpc_result` or - /// `inject_rpc_error` at a later point in time. - SendRpc { - /// The peer to send the RPC query to. - peer_id: &'a TPeerId, - /// A reminder of the query target. Same as what you obtain by calling `target()`. - query_target: &'a TTarget, - }, - - /// We no longer need to send a query to this specific node. - /// - /// It is guaranteed that an earlier polling returned `SendRpc` with this peer id. - CancelRpc { - /// The target. - peer_id: &'a TPeerId, - }, + /// A query finishes either when it has collected `num_results` results + /// from the closest peers (not counting those that failed or are unresponsive) + /// or because the query ran out of peers that have not yet delivered + /// results (or failed). + Finished } -/// State of peer in the context of a query. -#[derive(Debug)] +/// Representation of a peer in the context of a query. +#[derive(Debug, Clone)] +struct QueryPeer { + key: Key, + state: QueryPeerState +} + +/// The state of `QueryPeer` in the context of a query. +#[derive(Debug, Copy, Clone)] enum QueryPeerState { - /// We haven't tried contacting the node. + /// The peer has not yet been contacted. + /// + /// This is the starting state for every peer known to, or discovered by, a query. NotContacted, - /// Waiting for an answer from the node to our RPC query. Includes a timeout. - InProgress(Delay), - /// We successfully reached the node. - Succeeded, - /// We tried to reach the node but failed. + + /// The query is waiting for a result from the peer. + Waiting(Instant), + + /// A result was not delivered for the peer within the configured timeout. + /// + /// The peer is not taken into account for the termination conditions + /// of the query until and unless it responds. + Unresponsive, + + /// Obtaining a result from the peer has failed. + /// + /// This is a final state, reached as a result of a call to `on_failure`. Failed, + + /// A successful result from the peer has been delivered. + /// + /// This is a final state, reached as a result of a call to `on_success`. + Succeeded, } #[cfg(test)] mod tests { - use super::{kbucket, QueryConfig, QueryState, QueryStatePollOut}; - use futures::{self, try_ready, prelude::*}; + use super::*; use libp2p_core::PeerId; - use std::{iter, time::Duration, sync::Arc, sync::Mutex, thread}; - use tokio; + use quickcheck::*; + use rand::{Rng, thread_rng}; + use std::{iter, time::Duration}; - #[test] - fn start_by_sending_rpc_to_known_peers() { - let random_id = PeerId::random(); - let random_key = kbucket::Key::new(random_id.clone()); + type TestQuery = Query; + + fn random_peers(n: usize) -> impl Iterator + Clone { + (0 .. n).map(|_| PeerId::random()) + } + + fn random_query(g: &mut G) -> TestQuery { + let known_closest_peers = random_peers(g.gen_range(1, 60)).map(Key::from); let target = PeerId::random(); + let config = QueryConfig { + parallelism: g.gen_range(1, 10), + num_results: g.gen_range(1, 25), + peer_timeout: Duration::from_secs(g.gen_range(10, 30)), + }; + Query::with_config(config, target, known_closest_peers) + } - let mut query = QueryState::new(QueryConfig { - target, - known_closest_peers: iter::once(random_key), - parallelism: 3, - num_results: 100, - rpc_timeout: Duration::from_secs(10), - }); + fn sorted(target: &Key, peers: &Vec>) -> bool { + peers.windows(2).all(|w| w[0].distance(&target) < w[1].distance(&target)) + } - tokio::run(futures::future::poll_fn(move || { - match try_ready!(Ok(query.poll())) { - QueryStatePollOut::SendRpc { peer_id, .. } if peer_id == &random_id => { - Ok(Async::Ready(())) - } - _ => panic!(), - } - })); + impl Arbitrary for TestQuery { + fn arbitrary(g: &mut G) -> TestQuery { + random_query(g) + } } #[test] - fn continue_second_result() { - let random_id = PeerId::random(); - let random_key = kbucket::Key::from(random_id.clone()); - let random_id2 = PeerId::random(); - let target = PeerId::random(); + fn new_query() { + let query = random_query(&mut thread_rng()); + let target = Key::from(query.target().clone()); - let query = Arc::new(Mutex::new(QueryState::new(QueryConfig { - target, - known_closest_peers: iter::once(random_key), - parallelism: 3, - num_results: 100, - rpc_timeout: Duration::from_secs(10), - }))); + let (keys, states): (Vec<_>, Vec<_>) = query.closest_peers + .values() + .map(|e| (e.key.clone(), &e.state)) + .unzip(); - // Let's do a first polling round to obtain the `SendRpc` request. - tokio::run(futures::future::poll_fn({ - let random_id = random_id.clone(); - let query = query.clone(); - move || { - match try_ready!(Ok(query.lock().unwrap().poll())) { - QueryStatePollOut::SendRpc { peer_id, .. } if peer_id == &random_id => { - Ok(Async::Ready(())) - } - _ => panic!(), - } - } - })); + let none_contacted = states + .iter() + .all(|s| match s { + QueryPeerState::NotContacted => true, + _ => false + }); - // Send the reply. - query.lock().unwrap().inject_rpc_result(&random_id, iter::once(random_id2.clone())); - - // Second polling round to check the second `SendRpc` request. - tokio::run(futures::future::poll_fn({ - let query = query.clone(); - move || { - match try_ready!(Ok(query.lock().unwrap().poll())) { - QueryStatePollOut::SendRpc { peer_id, .. } if peer_id == &random_id2 => { - Ok(Async::Ready(())) - } - _ => panic!(), - } - } - })); + assert!(none_contacted, + "Unexpected peer state in new query."); + assert!(sorted(&target, &keys), + "Closest peers in new query not sorted by distance to target."); + assert_eq!(query.num_waiting(), 0, + "Unexpected peers in progress in new query."); + assert_eq!(query.into_result().closest_peers.count(), 0, + "Unexpected closest peers in new query"); } #[test] - fn timeout_works() { - let random_id = PeerId::random(); - let random_key = kbucket::Key::from(random_id.clone()); + fn termination_and_parallelism() { + fn prop(mut query: TestQuery) { + let now = Instant::now(); + let mut rng = thread_rng(); - let query = Arc::new(Mutex::new(QueryState::new(QueryConfig { - target: PeerId::random(), - known_closest_peers: iter::once(random_key), - parallelism: 3, - num_results: 100, - rpc_timeout: Duration::from_millis(100), - }))); + let mut expected = query.closest_peers + .values() + .map(|e| e.key.clone()) + .collect::>(); + let num_known = expected.len(); + let max_parallelism = usize::min(query.config.parallelism, num_known); - // Let's do a first polling round to obtain the `SendRpc` request. - tokio::run(futures::future::poll_fn({ - let random_id = random_id.clone(); - let query = query.clone(); - move || { - match try_ready!(Ok(query.lock().unwrap().poll())) { - QueryStatePollOut::SendRpc { peer_id, .. } if peer_id == &random_id => { - Ok(Async::Ready(())) - } - _ => panic!(), + let target = Key::from(query.target().clone()); + let mut remaining; + let mut num_failures = 0; + + 'finished: loop { + if expected.len() == 0 { + break; } - } - })); - - // Wait for a bit. - thread::sleep(Duration::from_millis(200)); - - // Second polling round to check the timeout. - tokio::run(futures::future::poll_fn({ - let query = query.clone(); - move || { - match try_ready!(Ok(query.lock().unwrap().poll())) { - QueryStatePollOut::CancelRpc { peer_id, .. } if peer_id == &random_id => { - Ok(Async::Ready(())) - } - _ => panic!(), + // Split off the next up to `parallelism` expected peers. + else if expected.len() < max_parallelism { + remaining = Vec::new(); } - } - })); - - // Third polling round for finished. - tokio::run(futures::future::poll_fn({ - let query = query.clone(); - move || { - match try_ready!(Ok(query.lock().unwrap().poll())) { - QueryStatePollOut::Finished => { - Ok(Async::Ready(())) - } - _ => panic!(), + else { + remaining = expected.split_off(max_parallelism); } + + // Advance the query for maximum parallelism. + for k in expected.iter() { + match query.next(now) { + QueryState::Finished => break 'finished, + QueryState::Waiting(Some(p)) => assert_eq!(p, k.preimage()), + QueryState::Waiting(None) => panic!("Expected another peer."), + QueryState::WaitingAtCapacity => panic!("Unexpectedly reached capacity.") + } + } + let num_waiting = query.num_waiting(); + assert_eq!(num_waiting, expected.len()); + + // Check the bounded parallelism. + if query.at_capacity() { + assert_eq!(query.next(now), QueryState::WaitingAtCapacity) + } + + // Report results back to the query with a random number of "closer" + // peers or an error, thus finishing the "in-flight requests". + for (i, k) in expected.iter().enumerate() { + if rng.gen_bool(0.75) { + let num_closer = rng.gen_range(0, query.config.num_results + 1); + let closer_peers = random_peers(num_closer).collect::>(); + remaining.extend(closer_peers.iter().cloned().map(Key::from)); + query.on_success(k.preimage(), closer_peers); + } else { + num_failures += 1; + query.on_failure(k.preimage()); + } + assert_eq!(query.num_waiting(), num_waiting - (i + 1)); + } + + // Re-sort the remaining expected peers for the next "round". + remaining.sort_by_key(|k| target.distance(&k)); + + expected = remaining } - })); + + // The query must be finished. + assert_eq!(query.next(now), QueryState::Finished); + assert_eq!(query.progress, QueryProgress::Finished); + + // Determine if all peers have been contacted by the query. This _must_ be + // the case if the query finished with fewer than the requested number + // of results. + let all_contacted = query.closest_peers.values().all(|e| match e.state { + QueryPeerState::NotContacted | QueryPeerState::Waiting { .. } => false, + _ => true + }); + + let target = query.target().clone(); + let num_results = query.config.num_results; + let result = query.into_result(); + let closest = result.closest_peers.map(Key::from).collect::>(); + + assert_eq!(result.target, target); + assert!(sorted(&Key::from(target), &closest)); + + if closest.len() < num_results { + // The query returned fewer results than requested. Therefore + // either the initial number of known peers must have been + // less than the desired number of results, or there must + // have been failures. + assert!(num_known < num_results || num_failures > 0); + // All peers must have been contacted. + assert!(all_contacted, "Not all peers have been contacted."); + } else { + assert_eq!(num_results, closest.len(), "Too many results."); + } + } + + QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _) + } + + #[test] + fn no_duplicates() { + fn prop(mut query: TestQuery) -> bool { + let now = Instant::now(); + let closer = random_peers(1).collect::>(); + + // A first peer reports a "closer" peer. + let peer1 = match query.next(now) { + QueryState::Waiting(Some(p)) => p.clone(), + _ => panic!("No peer.") + }; + query.on_success(&peer1, closer.clone()); + // Duplicate result from te same peer. + query.on_success(&peer1, closer.clone()); + + // If there is a second peer, let it also report the same "closer" peer. + match query.next(now) { + QueryState::Waiting(Some(p)) => { + let peer2 = p.clone(); + query.on_success(&peer2, closer.clone()) + } + QueryState::Finished => {} + _ => panic!("Unexpectedly query state."), + }; + + // The "closer" peer must only be in the query once. + let n = query.closest_peers.values().filter(|e| e.key.preimage() == &closer[0]).count(); + assert_eq!(n, 1); + + true + } + + QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _) + } + + #[test] + fn timeout() { + fn prop(mut query: TestQuery) -> bool { + let mut now = Instant::now(); + let peer = query.closest_peers.values().next().unwrap().key.clone().into_preimage(); + + // Poll the query for the first peer to be in progress. + match query.next(now) { + QueryState::Waiting(Some(id)) => assert_eq!(id, &peer), + _ => panic!() + } + + // Artificially advance the clock. + now = now + query.config.peer_timeout; + + // Advancing the query again should mark the first peer as unresponsive. + let _ = query.next(now); + match &query.closest_peers.values().next().unwrap() { + QueryPeer { key, state: QueryPeerState::Unresponsive } => { + assert_eq!(key.preimage(), &peer); + }, + QueryPeer { state, .. } => panic!("Unexpected peer state: {:?}", state) + } + + let finished = query.finished(); + query.on_success(&peer, iter::empty()); + let closest = query.into_result().closest_peers.collect::>(); + + if finished { + // Delivering results when the query already finished must have + // no effect. + assert_eq!(Vec::::new(), closest) + } else { + // Unresponsive peers can still deliver results while the query + // is not finished. + assert_eq!(vec![peer], closest) + } + true + } + + QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _) } }