diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 13bbbbd4..b7caa59a 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -33,5 +33,8 @@ unsigned-varint = { version = "0.2.1", features = ["codec"] } void = "1.0" [dev-dependencies] +libp2p-mplex = { version = "0.5.0", path = "../../muxers/mplex" } +libp2p-secio = { version = "0.5.0", path = "../secio" } libp2p-tcp = { version = "0.5.0", path = "../../transports/tcp" } +libp2p-yamux = { version = "0.5.0", path = "../../muxers/yamux" } tokio = "0.1" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 97991885..89d86125 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -20,9 +20,9 @@ use crate::addresses::Addresses; use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId}; -use crate::kbucket::{KBucketsTable, Update}; +use crate::kbucket::{KBucketsTable, KBucketsPeerId, Update}; use crate::protocol::{KadConnectionType, KadPeer}; -use crate::query::{QueryConfig, QueryState, QueryStatePollOut, QueryTarget}; +use crate::query::{QueryConfig, QueryState, QueryStatePollOut}; use fnv::{FnvHashMap, FnvHashSet}; use futures::{prelude::*, stream}; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; @@ -34,6 +34,8 @@ use std::{cmp::Ordering, error, marker::PhantomData, time::Duration, time::Insta use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Interval; +mod test; + /// Network behaviour that handles Kademlia. pub struct Kademlia { /// Storage for the nodes. Contains the known multiaddresses for this node. @@ -41,10 +43,7 @@ pub struct Kademlia { /// All the iterative queries we are currently performing, with their ID. The last parameter /// is the list of accumulated providers for `GET_PROVIDERS` queries. - active_queries: FnvHashMap)>, - - /// List of queries to start once we are inside `poll()`. - queries_to_starts: SmallVec<[(QueryId, QueryTarget, QueryPurpose); 8]>, + active_queries: FnvHashMap>, /// List of peers the swarm is connected to. connected_peers: FnvHashSet, @@ -56,9 +55,6 @@ pub struct Kademlia { /// Identifier for the next query that we start. next_query_id: QueryId, - /// Requests received by a remote that we should fulfill as soon as possible. - remote_requests: SmallVec<[(PeerId, KademliaRequestId, QueryTarget); 4]>, - /// List of values and peers that are providing them. /// /// Our local peer ID can be in this container. @@ -96,15 +92,94 @@ pub struct Kademlia { #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct QueryId(usize); -/// Reason why we have this query in the list of queries. +/// Information about a query. #[derive(Debug, Clone, PartialEq, Eq)] -enum QueryPurpose { +struct QueryInfo { + /// What we are querying and why. + inner: QueryInfoInner, + /// Temporary addresses used when trying to reach nodes. + untrusted_addresses: FnvHashMap>, +} + +/// Additional information about the query. +#[derive(Debug, Clone, PartialEq, Eq)] +enum QueryInfoInner { /// The query was created for the Kademlia initialization process. - Initialization, - /// The user requested this query to be performed. It should be reported when finished. - UserRequest, - /// We should add an `ADD_PROVIDER` message to the peers of the outcome. - AddProvider(Multihash), + Initialization { + /// Hash we're targetting to insert ourselves in the k-buckets. + target: PeerId, + }, + + /// The user requested a `FIND_PEER` query to be performed. It should be reported when finished. + FindPeer(PeerId), + + /// The user requested a `GET_PROVIDERS` query to be performed. It should be reported when + /// finished. + GetProviders { + /// Target we are searching the providers of. + target: Multihash, + /// Results to return. Filled over time. + pending_results: Vec, + }, + + /// We are traversing towards `target` and should add an `ADD_PROVIDER` message to the peers + /// of the outcome with our own identity. + AddProvider { + /// Which hash we're targetting. + target: Multihash, + }, +} + +impl KBucketsPeerId for QueryInfo { + fn distance_with(&self, other: &PeerId) -> u32 { + let other: &Multihash = other.as_ref(); + self.as_ref().distance_with(other) + } + + fn max_distance() -> usize { + ::max_distance() + } +} + +impl AsRef for QueryInfo { + fn as_ref(&self) -> &Multihash { + match &self.inner { + QueryInfoInner::Initialization { target } => target.as_ref(), + QueryInfoInner::FindPeer(peer) => peer.as_ref(), + QueryInfoInner::GetProviders { target, .. } => target, + QueryInfoInner::AddProvider { target } => target, + } + } +} + +impl PartialEq for QueryInfo { + fn eq(&self, other: &PeerId) -> bool { + self.as_ref().eq(other) + } +} + +impl QueryInfo { + /// Creates the corresponding RPC request to send to remote. + fn to_rpc_request(&self, user_data: TUserData) -> KademliaHandlerIn { + match &self.inner { + QueryInfoInner::Initialization { target } => KademliaHandlerIn::FindNodeReq { + key: target.clone(), + user_data, + }, + QueryInfoInner::FindPeer(key) => KademliaHandlerIn::FindNodeReq { + key: key.clone(), + user_data, + }, + QueryInfoInner::GetProviders { target, .. } => KademliaHandlerIn::GetProvidersReq { + key: target.clone().into(), + user_data, + }, + QueryInfoInner::AddProvider { target, .. } => KademliaHandlerIn::FindNodeReq { + key: unimplemented!(), // TODO: target.clone(), + user_data, + }, + } + } } impl Kademlia { @@ -151,12 +226,10 @@ impl Kademlia { let mut behaviour = Kademlia { kbuckets: KBucketsTable::new(local_peer_id, Duration::from_secs(60)), // TODO: constant queued_events: SmallVec::new(), - queries_to_starts: SmallVec::new(), active_queries: Default::default(), connected_peers: Default::default(), pending_rpcs: SmallVec::with_capacity(parallelism), next_query_id: QueryId(0), - remote_requests: SmallVec::new(), values_providers: FnvHashMap::default(), providing_keys: FnvHashSet::default(), refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant @@ -171,57 +244,17 @@ impl Kademlia { // As part of the initialization process, we start one `FIND_NODE` for each bit of the // possible range of peer IDs. for n in 0..256 { - let peer_id = match gen_random_id(behaviour.kbuckets.my_id(), n) { + let target = match gen_random_id(behaviour.kbuckets.my_id(), n) { Ok(p) => p, Err(()) => continue, }; - behaviour.start_query(QueryTarget::FindPeer(peer_id), QueryPurpose::Initialization); + behaviour.start_query(QueryInfoInner::Initialization { target }); } } behaviour } - - /// Builds the answer to a request. - fn build_result(&mut self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters<'_>) - -> KademliaHandlerIn - { - match query { - QueryTarget::FindPeer(key) => { - let closer_peers = self.kbuckets - .find_closest_with_self(&key) - .take(self.num_results) - .map(|peer_id| build_kad_peer(peer_id, parameters, &self.kbuckets)) - .collect(); - - KademliaHandlerIn::FindNodeRes { - closer_peers, - request_id, - } - }, - QueryTarget::GetProviders(key) => { - let closer_peers = self.kbuckets - .find_closest_with_self(&key) - .take(self.num_results) - .map(|peer_id| build_kad_peer(peer_id, parameters, &self.kbuckets)) - .collect(); - - let provider_peers = self.values_providers - .get(&key) - .into_iter() - .flat_map(|peers| peers) - .map(|peer_id| build_kad_peer(peer_id.clone(), parameters, &self.kbuckets)) - .collect(); - - KademliaHandlerIn::GetProvidersRes { - closer_peers, - provider_peers, - request_id, - } - }, - } - } } impl Kademlia { @@ -231,13 +264,13 @@ impl Kademlia { /// requested `PeerId`. #[inline] pub fn find_node(&mut self, peer_id: PeerId) { - self.start_query(QueryTarget::FindPeer(peer_id), QueryPurpose::UserRequest); + self.start_query(QueryInfoInner::FindPeer(peer_id)); } /// Starts an iterative `GET_PROVIDERS` request. #[inline] - pub fn get_providers(&mut self, key: Multihash) { - self.start_query(QueryTarget::GetProviders(key), QueryPurpose::UserRequest); + pub fn get_providers(&mut self, target: Multihash) { + self.start_query(QueryInfoInner::GetProviders { target, pending_results: Vec::new() }); } /// Register the local node as the provider for the given key. @@ -279,10 +312,29 @@ impl Kademlia { } /// Internal function that starts a query. - fn start_query(&mut self, target: QueryTarget, purpose: QueryPurpose) { + fn start_query(&mut self, target: QueryInfoInner) { let query_id = self.next_query_id; self.next_query_id.0 += 1; - self.queries_to_starts.push((query_id, target, purpose)); + + let target = QueryInfo { + inner: target, + untrusted_addresses: Default::default(), + }; + + let known_closest_peers = self.kbuckets + .find_closest::(target.as_ref()) + .take(self.num_results); + + self.active_queries.insert( + query_id, + QueryState::new(QueryConfig { + target, + parallelism: self.parallelism, + num_results: self.num_results, + rpc_timeout: self.rpc_timeout, + known_closest_peers, + }) + ); } } @@ -298,10 +350,23 @@ where } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.kbuckets + // We should order addresses from decreasing likelyhood of connectivity, so start with + // the addresses of that peer in the k-buckets. + let mut out_list = self.kbuckets .get(peer_id) .map(|l| l.iter().cloned().collect::>()) - .unwrap_or_else(Vec::new) + .unwrap_or_else(Vec::new); + + // We add to that a temporary list of addresses from the ongoing queries. + for query in self.active_queries.values() { + if let Some(addrs) = query.target().untrusted_addresses.get(peer_id) { + for addr in addrs { + out_list.push(addr.clone()); + } + } + } + + out_list } fn inject_connected(&mut self, id: PeerId, endpoint: ConnectedPoint) { @@ -342,7 +407,7 @@ where let was_in = self.connected_peers.remove(id); debug_assert!(was_in); - for (query, _, _) in self.active_queries.values_mut() { + for query in self.active_queries.values_mut() { query.inject_rpc_error(id); } @@ -358,7 +423,7 @@ where fn inject_replaced(&mut self, peer_id: PeerId, old_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { // We need to re-send the active queries. - for (query_id, (query, _, _)) in self.active_queries.iter() { + for (query_id, query) in self.active_queries.iter() { if query.is_waiting(&peer_id) { self.queued_events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), @@ -383,8 +448,19 @@ where fn inject_node_event(&mut self, source: PeerId, event: KademliaHandlerEvent) { match event { KademliaHandlerEvent::FindNodeReq { key, request_id } => { - self.remote_requests.push((source, request_id, QueryTarget::FindPeer(key))); - return; + let closer_peers = self.kbuckets + .find_closest(&key) + .take(self.num_results) + .map(|peer_id| build_kad_peer(peer_id, &self.kbuckets)) + .collect(); + + self.queued_events.push(NetworkBehaviourAction::SendEvent { + peer_id: source, + event: KademliaHandlerIn::FindNodeRes { + closer_peers, + request_id, + }, + }); } KademliaHandlerEvent::FindNodeRes { closer_peers, @@ -399,13 +475,36 @@ where ty: peer.connection_ty, })); } - if let Some((query, _, _)) = self.active_queries.get_mut(&user_data) { + if let Some(query) = self.active_queries.get_mut(&user_data) { + for peer in closer_peers.iter() { + query.target_mut().untrusted_addresses + .insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect()); + } query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id)) } } KademliaHandlerEvent::GetProvidersReq { key, request_id } => { - self.remote_requests.push((source, request_id, QueryTarget::GetProviders(key))); - return; + let closer_peers = self.kbuckets + .find_closest(&key) + .take(self.num_results) + .map(|peer_id| build_kad_peer(peer_id, &self.kbuckets)) + .collect(); + + let provider_peers = self.values_providers + .get(&key) + .into_iter() + .flat_map(|peers| peers) + .map(|peer_id| build_kad_peer(peer_id.clone(), &self.kbuckets)) + .collect(); + + self.queued_events.push(NetworkBehaviourAction::SendEvent { + peer_id: source, + event: KademliaHandlerIn::GetProvidersRes { + closer_peers, + provider_peers, + request_id, + }, + }); } KademliaHandlerEvent::GetProvidersRes { closer_peers, @@ -422,9 +521,15 @@ where // It is possible that we obtain a response for a query that has finished, which is // why we may not find an entry in `self.active_queries`. - if let Some((query, _, providers)) = self.active_queries.get_mut(&user_data) { - for peer in provider_peers { - providers.push(peer.node_id); + if let Some(query) = self.active_queries.get_mut(&user_data) { + if let QueryInfoInner::GetProviders { pending_results, .. } = &mut query.target_mut().inner { + for peer in provider_peers { + pending_results.push(peer.node_id); + } + } + for peer in closer_peers.iter() { + query.target_mut().untrusted_addresses + .insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect()); } query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id)) } @@ -432,7 +537,7 @@ where KademliaHandlerEvent::QueryError { user_data, .. } => { // It is possible that we obtain a response for a query that has finished, which is // why we may not find an entry in `self.active_queries`. - if let Some((query, _, _)) = self.active_queries.get_mut(&user_data) { + if let Some(query) = self.active_queries.get_mut(&user_data) { query.inject_rpc_error(&source) } } @@ -474,50 +579,14 @@ where match self.refresh_add_providers.poll() { Ok(Async::NotReady) => {}, Ok(Async::Ready(Some(_))) => { - for provided in self.providing_keys.clone().into_iter() { - let purpose = QueryPurpose::AddProvider(provided.clone()); - // TODO: messy because of the PeerId/Multihash division - if let Ok(key_as_peer) = PeerId::from_multihash(provided) { - self.start_query(QueryTarget::FindPeer(key_as_peer), purpose); - } + for target in self.providing_keys.clone().into_iter() { + self.start_query(QueryInfoInner::AddProvider { target }); } }, // Ignore errors. Ok(Async::Ready(None)) | Err(_) => {}, } - // Start queries that are waiting to start. - for (query_id, query_target, query_purpose) in self.queries_to_starts.drain() { - let known_closest_peers = self.kbuckets - .find_closest(query_target.as_hash()) - .take(self.num_results); - self.active_queries.insert( - query_id, - ( - QueryState::new(QueryConfig { - target: query_target, - parallelism: self.parallelism, - num_results: self.num_results, - rpc_timeout: self.rpc_timeout, - known_closest_peers, - }), - query_purpose, - Vec::new() // TODO: insert ourselves if we provide the data? - ) - ); - } - self.queries_to_starts.shrink_to_fit(); - - // Handle remote queries. - if !self.remote_requests.is_empty() { - let (peer_id, request_id, query) = self.remote_requests.remove(0); - let result = self.build_result(query, request_id, parameters); - return Async::Ready(NetworkBehaviourAction::SendEvent { - peer_id, - event: result, - }); - } - loop { // Handle events queued by other parts of this struct if !self.queued_events.is_empty() { @@ -528,7 +597,7 @@ where // If iterating finds a query that is finished, stores it here and stops looping. let mut finished_query = None; - 'queries_iter: for (&query_id, (query, _, _)) in self.active_queries.iter_mut() { + 'queries_iter: for (&query_id, query) in self.active_queries.iter_mut() { loop { match query.poll() { Async::Ready(QueryStatePollOut::Finished) => { @@ -562,39 +631,37 @@ where } if let Some(finished_query) = finished_query { - let (query, purpose, provider_peers) = self + let (query_info, closer_peers) = self .active_queries .remove(&finished_query) - .expect("finished_query was gathered when iterating active_queries; QED."); - match purpose { - QueryPurpose::Initialization => {}, - QueryPurpose::UserRequest => { - let event = match query.target().clone() { - QueryTarget::FindPeer(key) => { - debug_assert!(provider_peers.is_empty()); - KademliaOut::FindNodeResult { - key, - closer_peers: query.into_closest_peers().collect(), - } - }, - QueryTarget::GetProviders(key) => { - KademliaOut::GetProvidersResult { - key, - closer_peers: query.into_closest_peers().collect(), - provider_peers, - } - }, + .expect("finished_query was gathered when iterating active_queries; QED.") + .into_target_and_closest_peers(); + + match query_info.inner { + QueryInfoInner::Initialization { .. } => {}, + QueryInfoInner::FindPeer(target) => { + let event = KademliaOut::FindNodeResult { + key: target, + closer_peers: closer_peers.collect(), + }; + break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); + }, + QueryInfoInner::GetProviders { target, pending_results } => { + let event = KademliaOut::GetProvidersResult { + key: target, + closer_peers: closer_peers.collect(), + provider_peers: pending_results, }; break Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); }, - QueryPurpose::AddProvider(key) => { - for closest in query.into_closest_peers() { + QueryInfoInner::AddProvider { target } => { + for closest in closer_peers { let event = NetworkBehaviourAction::SendEvent { peer_id: closest, event: KademliaHandlerIn::AddProvider { - key: key.clone(), - provider_peer: build_kad_peer(parameters.local_peer_id().clone(), parameters, &self.kbuckets), + key: target.clone(), + provider_peer: build_kad_peer(parameters.local_peer_id().clone(), &self.kbuckets), }, }; @@ -675,25 +742,16 @@ fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result { } /// Builds a `KadPeer` struct corresponding to the given `PeerId`. -/// The `PeerId` can be the same as the local one. +/// The `PeerId` cannot be the same as the local one. /// /// > **Note**: This is just a convenience function that doesn't do anything note-worthy. fn build_kad_peer( peer_id: PeerId, - parameters: &mut PollParameters<'_>, kbuckets: &KBucketsTable ) -> KadPeer { - let is_self = peer_id == *parameters.local_peer_id(); + debug_assert_ne!(*kbuckets.my_id(), peer_id); - let (multiaddrs, connection_ty) = if is_self { - let mut addrs = parameters - .listened_addresses() - .cloned() - .collect::>(); - addrs.extend(parameters.external_addresses()); - (addrs, KadConnectionType::Connected) - - } else if let Some(addresses) = kbuckets.get(&peer_id) { + let (multiaddrs, connection_ty) = if let Some(addresses) = kbuckets.get(&peer_id) { let connected = if addresses.is_connected() { KadConnectionType::Connected } else { diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs new file mode 100644 index 00000000..8d5b5b5b --- /dev/null +++ b/protocols/kad/src/behaviour/test.rs @@ -0,0 +1,303 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![cfg(test)] + +use crate::{Kademlia, KademliaOut}; +use futures::prelude::*; +use libp2p_core::{upgrade, upgrade::InboundUpgradeExt, upgrade::OutboundUpgradeExt, PeerId, Swarm, Transport}; +use libp2p_core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; +use std::io; + +/// Builds swarms, each listening on a port. Does *not* connect the nodes together. +/// This is to be used only for testing, and a panic will happen if something goes wrong. +fn build_nodes(num: usize) + -> Vec, Kademlia>>> +{ + let mut result: Vec> = Vec::with_capacity(num); + + for _ in 0..num { + // TODO: make creating the transport more elegant ; literaly half of the code of the test + // is about creating the transport + let local_key = libp2p_core::identity::Keypair::generate_ed25519(); + let local_public_key = local_key.public(); + let transport = libp2p_tcp::TcpConfig::new() + .with_upgrade(libp2p_secio::SecioConfig::new(local_key)) + .and_then(move |out, endpoint| { + let peer_id = out.remote_key.into_peer_id(); + let peer_id2 = peer_id.clone(); + let upgrade = libp2p_yamux::Config::default() + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + upgrade::apply(out.stream, upgrade, endpoint) + .map(|(id, muxer)| (id, StreamMuxerBox::new(muxer))) + }) + .map_err(|_| panic!()) + .boxed(); + + let kad = Kademlia::without_init(local_public_key.clone().into_peer_id()); + result.push(Swarm::new(transport, kad, local_public_key.into_peer_id())); + } + + for s in result.iter_mut() { + Swarm::listen_on(s, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); + } + + result +} + +#[test] +fn basic_find_node() { + // Build two nodes. Node #2 only knows about node #1. Node #2 is asked for a random peer ID. + // Node #2 must return the identity of node #1. + + let mut swarms = build_nodes(2); + let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone(); + + // Connect second to first. + { + let listen_addr = Swarm::listeners(&swarms[0]).next().unwrap().clone(); + swarms[1].add_not_connected_address(&first_peer_id, listen_addr); + } + + let search_target = PeerId::random(); + swarms[1].find_node(search_target.clone()); + + tokio::runtime::Runtime::new() + .unwrap() + .block_on(futures::future::poll_fn(move || -> Result<_, io::Error> { + for swarm in &mut swarms { + loop { + match swarm.poll().unwrap() { + Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => { + assert_eq!(key, search_target); + assert_eq!(closer_peers.len(), 1); + assert_eq!(closer_peers[0], first_peer_id); + return Ok(Async::Ready(())); + } + Async::Ready(_) => (), + Async::NotReady => break, + } + } + } + + Ok(Async::NotReady) + })) + .unwrap(); +} + +#[test] +fn direct_query() { + // Build three nodes. Node #2 knows about node #1. Node #3 knows about node #2. Node #3 is + // asked about a random peer and should return nodes #1 and #2. + + let mut swarms = build_nodes(3); + + let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone(); + let second_peer_id = Swarm::local_peer_id(&swarms[1]).clone(); + + // Connect second to first. + { + let listen_addr = Swarm::listeners(&swarms[0]).next().unwrap().clone(); + swarms[1].add_not_connected_address(&first_peer_id, listen_addr); + } + + // Connect third to second. + { + let listen_addr = Swarm::listeners(&swarms[1]).next().unwrap().clone(); + swarms[2].add_not_connected_address(&second_peer_id, listen_addr); + } + + // Ask third to search a random value. + let search_target = PeerId::random(); + swarms[2].find_node(search_target.clone()); + + tokio::runtime::Runtime::new() + .unwrap() + .block_on(futures::future::poll_fn(move || -> Result<_, io::Error> { + for swarm in &mut swarms { + loop { + match swarm.poll().unwrap() { + Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => { + assert_eq!(key, search_target); + assert_eq!(closer_peers.len(), 2); + assert!((closer_peers[0] == first_peer_id) != (closer_peers[1] == first_peer_id)); + assert!((closer_peers[0] == second_peer_id) != (closer_peers[1] == second_peer_id)); + return Ok(Async::Ready(())); + } + Async::Ready(_) => (), + Async::NotReady => break, + } + } + } + + Ok(Async::NotReady) + })) + .unwrap(); +} + +#[test] +fn indirect_query() { + // Build four nodes. Node #2 knows about node #1. Node #3 knows about node #2. Node #4 knows + // about node #3. Node #4 is asked about a random peer and should return nodes #1, #2 and #3. + + let mut swarms = build_nodes(4); + + let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone(); + let second_peer_id = Swarm::local_peer_id(&swarms[1]).clone(); + let third_peer_id = Swarm::local_peer_id(&swarms[2]).clone(); + + // Connect second to first. + { + let listen_addr = Swarm::listeners(&swarms[0]).next().unwrap().clone(); + swarms[1].add_not_connected_address(&first_peer_id, listen_addr); + } + + // Connect third to second. + { + let listen_addr = Swarm::listeners(&swarms[1]).next().unwrap().clone(); + swarms[2].add_not_connected_address(&second_peer_id, listen_addr); + } + + // Connect fourth to third. + { + let listen_addr = Swarm::listeners(&swarms[2]).next().unwrap().clone(); + swarms[3].add_not_connected_address(&third_peer_id, listen_addr); + } + + // Ask fourth to search a random value. + let search_target = PeerId::random(); + swarms[3].find_node(search_target.clone()); + + tokio::runtime::Runtime::new() + .unwrap() + .block_on(futures::future::poll_fn(move || -> Result<_, io::Error> { + for swarm in &mut swarms { + loop { + match swarm.poll().unwrap() { + Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => { + assert_eq!(key, search_target); + assert_eq!(closer_peers.len(), 3); + assert_eq!(closer_peers.iter().filter(|p| **p == first_peer_id).count(), 1); + assert_eq!(closer_peers.iter().filter(|p| **p == second_peer_id).count(), 1); + assert_eq!(closer_peers.iter().filter(|p| **p == third_peer_id).count(), 1); + return Ok(Async::Ready(())); + } + Async::Ready(_) => (), + Async::NotReady => break, + } + } + } + + Ok(Async::NotReady) + })) + .unwrap(); +} + +#[test] +fn unresponsive_not_returned_direct() { + // Build one node. It contains fake addresses to non-existing nodes. We ask it to find a + // random peer. We make sure that no fake address is returned. + + let mut swarms = build_nodes(1); + + // Add fake addresses. + for _ in 0 .. 10 { + swarms[0].add_not_connected_address( + &PeerId::random(), + libp2p_core::multiaddr::multiaddr![Udp(10u16)] + ); + } + + // Ask first to search a random value. + let search_target = PeerId::random(); + swarms[0].find_node(search_target.clone()); + + tokio::runtime::Runtime::new() + .unwrap() + .block_on(futures::future::poll_fn(move || -> Result<_, io::Error> { + for swarm in &mut swarms { + loop { + match swarm.poll().unwrap() { + Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => { + assert_eq!(key, search_target); + assert_eq!(closer_peers.len(), 0); + return Ok(Async::Ready(())); + } + Async::Ready(_) => (), + Async::NotReady => break, + } + } + } + + Ok(Async::NotReady) + })) + .unwrap(); +} + +#[test] +fn unresponsive_not_returned_indirect() { + // Build two nodes. Node #2 knows about node #1. Node #1 contains fake addresses to + // non-existing nodes. We ask node #1 to find a random peer. We make sure that no fake address + // is returned. + + let mut swarms = build_nodes(2); + + // Add fake addresses to first. + let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone(); + for _ in 0 .. 10 { + swarms[0].add_not_connected_address( + &PeerId::random(), + libp2p_core::multiaddr::multiaddr![Udp(10u16)] + ); + } + + // Connect second to first. + { + let listen_addr = Swarm::listeners(&swarms[0]).next().unwrap().clone(); + swarms[1].add_not_connected_address(&first_peer_id, listen_addr); + } + + // Ask second to search a random value. + let search_target = PeerId::random(); + swarms[1].find_node(search_target.clone()); + + tokio::runtime::Runtime::new() + .unwrap() + .block_on(futures::future::poll_fn(move || -> Result<_, io::Error> { + for swarm in &mut swarms { + loop { + match swarm.poll().unwrap() { + Async::Ready(Some(KademliaOut::FindNodeResult { key, closer_peers })) => { + assert_eq!(key, search_target); + assert_eq!(closer_peers.len(), 1); + assert_eq!(closer_peers[0], first_peer_id); + return Ok(Async::Ready(())); + } + Async::Ready(_) => (), + Async::NotReady => break, + } + } + } + + Ok(Async::NotReady) + })) + .unwrap(); +} diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index d55fcc3c..ae7a3efb 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -23,13 +23,10 @@ //! This allows one to create queries that iterate on the DHT on nodes that become closer and //! closer to the target. -use crate::handler::KademliaHandlerIn; use crate::kbucket::KBucketsPeerId; use futures::prelude::*; -use libp2p_core::PeerId; -use multihash::Multihash; use smallvec::SmallVec; -use std::time::{Duration, Instant}; +use std::{cmp::PartialEq, time::Duration, time::Instant}; use tokio_timer::Delay; /// State of a query iterative process. @@ -43,9 +40,9 @@ use tokio_timer::Delay; /// `GET_PROVIDERS`, you need to extract yourself the value or list of providers from RPC requests /// received by remotes as this is not handled by the `QueryState`. #[derive(Debug)] -pub struct QueryState { +pub struct QueryState { /// Target we're looking for. - target: QueryTarget, + target: TTarget, /// Stage of the query. See the documentation of `QueryStage`. stage: QueryStage, @@ -53,7 +50,7 @@ pub struct QueryState { /// Ordered list of the peers closest to the result we're looking for. /// Entries that are `InProgress` shouldn't be removed from the list before they complete. /// Must never contain two entries with the same peer IDs. - closest_peers: SmallVec<[(PeerId, QueryPeerState); 32]>, + closest_peers: SmallVec<[(TPeerId, QueryPeerState); 32]>, /// Allowed level of parallelism. parallelism: usize, @@ -67,9 +64,9 @@ pub struct QueryState { /// Configuration for a query. #[derive(Debug, Clone)] -pub struct QueryConfig { +pub struct QueryConfig { /// Target of the query. - pub target: QueryTarget, + pub target: TTarget, /// Iterator to a list of `num_results` nodes that we know of whose distance is close to the /// target. @@ -101,11 +98,15 @@ enum QueryStage { Frozen, } -impl QueryState { +impl QueryState +where + TPeerId: Eq, + TTarget: KBucketsPeerId +{ /// Creates a new query. /// /// You should call `poll()` this function returns in order to know what to do. - pub fn new(config: QueryConfig>) -> QueryState { + pub fn new(config: QueryConfig, TTarget>) -> Self { let mut closest_peers: SmallVec<[_; 32]> = config .known_closest_peers .into_iter() @@ -113,7 +114,7 @@ impl QueryState { .take(config.num_results) .collect(); let target = config.target; - closest_peers.sort_by_key(|e| target.as_hash().distance_with(e.0.as_ref())); + closest_peers.sort_by_key(|e| target.distance_with(&e.0)); closest_peers.dedup_by(|a, b| a.0 == b.0); QueryState { @@ -130,10 +131,19 @@ impl QueryState { /// Returns the target of the query. Always the same as what was passed to `new()`. #[inline] - pub fn target(&self) -> &QueryTarget { + pub fn target(&self) -> &TTarget { &self.target } + /// Returns the target of the query. Always the same as what was passed to `new()`. + /// + /// You shouldn't modify the target in such a way that modifies the target of the query, + /// otherwise logic errors will likely happen. + #[inline] + pub fn target_mut(&mut self) -> &mut TTarget { + &mut self.target + } + /// After `poll()` returned `SendRpc`, this method should be called when the node sends back /// the result of the query. /// @@ -143,12 +153,12 @@ impl QueryState { /// After this function returns, you should call `poll()` again. pub fn inject_rpc_result( &mut self, - result_source: &PeerId, - closer_peers: impl IntoIterator, + result_source: &impl PartialEq, + closer_peers: impl IntoIterator, ) { // Mark the peer as succeeded. for (peer_id, state) in self.closest_peers.iter_mut() { - if peer_id == result_source { + if result_source == peer_id { if let state @ QueryPeerState::InProgress(_) = state { *state = QueryPeerState::Succeeded; } @@ -165,9 +175,9 @@ impl QueryState { for elem_to_add in closer_peers { let target = &self.target; - let elem_to_add_distance = target.as_hash().distance_with(elem_to_add.as_ref()); + let elem_to_add_distance = target.distance_with(&elem_to_add); let insert_pos_start = self.closest_peers.iter().position(|(id, _)| { - target.as_hash().distance_with(id.as_ref()) >= elem_to_add_distance + target.distance_with(&id) >= elem_to_add_distance }); if let Some(insert_pos_start) = insert_pos_start { @@ -176,7 +186,7 @@ impl QueryState { let insert_pos_size = self.closest_peers.iter() .skip(insert_pos_start) .position(|(id, _)| { - target.as_hash().distance_with(id.as_ref()) > elem_to_add_distance + target.distance_with(&id) > elem_to_add_distance }); // Make sure we don't insert duplicates. @@ -216,31 +226,26 @@ impl QueryState { } } + /// Returns the list of peers for which we are waiting for an answer. + pub fn waiting(&self) -> impl Iterator { + self.closest_peers + .iter() + .filter(|(_, state)| { + match state { + QueryPeerState::InProgress(_) => true, + QueryPeerState::NotContacted => false, + QueryPeerState::Succeeded => false, + QueryPeerState::Failed => false, + } + }) + .map(|(id, _)| id) + } + /// Returns true if we are waiting for a query answer from that peer. /// /// After `poll()` returned `SendRpc`, this function will return `true`. - pub fn is_waiting(&self, id: &PeerId) -> bool { - let state = self - .closest_peers - .iter() - .filter_map( - |(peer_id, state)| { - if peer_id == id { - Some(state) - } else { - None - } - }, - ) - .next(); - - match state { - Some(&QueryPeerState::InProgress(_)) => true, - Some(&QueryPeerState::NotContacted) => false, - Some(&QueryPeerState::Succeeded) => false, - Some(&QueryPeerState::Failed) => false, - None => false, - } + pub fn is_waiting(&self, id: &impl PartialEq) -> bool { + self.waiting().any(|peer_id| id == peer_id) } /// After `poll()` returned `SendRpc`, this function should be called if we were unable to @@ -250,7 +255,7 @@ impl QueryState { /// function whenever an error happens on the network. /// /// After this function returns, you should call `poll()` again. - pub fn inject_rpc_error(&mut self, id: &PeerId) { + pub fn inject_rpc_error(&mut self, id: &TPeerId) { let state = self .closest_peers .iter_mut() @@ -275,7 +280,7 @@ impl QueryState { } /// Polls this individual query. - pub fn poll(&mut self) -> Async> { + pub fn poll(&mut self) -> Async> { // While iterating over peers, count the number of queries currently being processed. // This is used to not go over the limit of parallel requests. // If this is still 0 at the end of the function, that means the query is finished. @@ -331,10 +336,7 @@ impl QueryState { let need_connect = match state { QueryPeerState::NotContacted => match self.stage { QueryStage::Iterating { .. } => active_counter < self.parallelism, - QueryStage::Frozen => match self.target { - QueryTarget::FindPeer(_) => true, - QueryTarget::GetProviders(_) => false, - }, + QueryStage::Frozen => true, // TODO: as an optimization, could be false if we're not trying to find peers }, _ => false, }; @@ -358,12 +360,12 @@ impl QueryState { } } - /// Consumes the query and returns the known closest peers. + /// Consumes the query and returns the targe tand known closest peers. /// /// > **Note**: This can be called at any time, but you normally only do that once the query /// > is finished. - pub fn into_closest_peers(self) -> impl Iterator { - self.closest_peers + pub fn into_target_and_closest_peers(self) -> (TTarget, impl Iterator) { + let closest = self.closest_peers .into_iter() .filter_map(|(peer_id, state)| { if let QueryPeerState::Succeeded = state { @@ -372,13 +374,22 @@ impl QueryState { None } }) - .take(self.num_results) + .take(self.num_results); + (self.target, closest) + } + + /// Consumes the query and returns the known closest peers. + /// + /// > **Note**: This can be called at any time, but you normally only do that once the query + /// > is finished. + pub fn into_closest_peers(self) -> impl Iterator { + self.into_target_and_closest_peers().1 } } /// Outcome of polling a query. #[derive(Debug, Clone)] -pub enum QueryStatePollOut<'a> { +pub enum QueryStatePollOut<'a, TTarget, TPeerId> { /// The query is finished. /// /// If this is a `FindValue` query, the user is supposed to extract the record themselves from @@ -399,9 +410,9 @@ pub enum QueryStatePollOut<'a> { /// `inject_rpc_error` at a later point in time. SendRpc { /// The peer to send the RPC query to. - peer_id: &'a PeerId, + peer_id: &'a TPeerId, /// A reminder of the query target. Same as what you obtain by calling `target()`. - query_target: &'a QueryTarget, + query_target: &'a TTarget, }, /// We no longer need to send a query to this specific node. @@ -409,49 +420,10 @@ pub enum QueryStatePollOut<'a> { /// It is guaranteed that an earlier polling returned `SendRpc` with this peer id. CancelRpc { /// The target. - peer_id: &'a PeerId, + peer_id: &'a TPeerId, }, } -/// What we're aiming for with our query. -#[derive(Debug, Clone)] -pub enum QueryTarget { - /// Finding a peer. - FindPeer(PeerId), - /// Find the peers that provide a certain value. - GetProviders(Multihash), -} - -impl QueryTarget { - /// Creates the corresponding RPC request to send to remote. - #[inline] - pub fn to_rpc_request(&self, user_data: TUserData) -> KademliaHandlerIn { - self.clone().into_rpc_request(user_data) - } - - /// Creates the corresponding RPC request to send to remote. - pub fn into_rpc_request(self, user_data: TUserData) -> KademliaHandlerIn { - match self { - QueryTarget::FindPeer(key) => KademliaHandlerIn::FindNodeReq { - key, - user_data, - }, - QueryTarget::GetProviders(key) => KademliaHandlerIn::GetProvidersReq { - key, - user_data, - }, - } - } - - /// Returns the hash of the thing we're looking for. - pub fn as_hash(&self) -> &Multihash { - match self { - QueryTarget::FindPeer(peer) => peer.as_ref(), - QueryTarget::GetProviders(key) => key, - } - } -} - /// State of peer in the context of a query. #[derive(Debug)] enum QueryPeerState { @@ -467,7 +439,7 @@ enum QueryPeerState { #[cfg(test)] mod tests { - use super::{QueryConfig, QueryState, QueryStatePollOut, QueryTarget}; + use super::{QueryConfig, QueryState, QueryStatePollOut}; use futures::{self, try_ready, prelude::*}; use libp2p_core::PeerId; use std::{iter, time::Duration, sync::Arc, sync::Mutex, thread}; @@ -476,7 +448,7 @@ mod tests { #[test] fn start_by_sending_rpc_to_known_peers() { let random_id = PeerId::random(); - let target = QueryTarget::FindPeer(PeerId::random()); + let target = PeerId::random(); let mut query = QueryState::new(QueryConfig { target, @@ -500,7 +472,7 @@ mod tests { fn continue_second_result() { let random_id = PeerId::random(); let random_id2 = PeerId::random(); - let target = QueryTarget::FindPeer(PeerId::random()); + let target = PeerId::random(); let query = Arc::new(Mutex::new(QueryState::new(QueryConfig { target, @@ -546,7 +518,7 @@ mod tests { let random_id = PeerId::random(); let query = Arc::new(Mutex::new(QueryState::new(QueryConfig { - target: QueryTarget::FindPeer(PeerId::random()), + target: PeerId::random(), known_closest_peers: iter::once(random_id.clone()), parallelism: 3, num_results: 100,