diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 22436811..afeda26a 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -22,7 +22,7 @@ use super::*; -use crate::K_VALUE; +use crate::{ALPHA_VALUE, K_VALUE}; use crate::kbucket::Distance; use crate::record::store::MemoryStore; use futures::{ @@ -35,7 +35,7 @@ use libp2p_core::{ Transport, identity, transport::MemoryTransport, - multiaddr::{Protocol, multiaddr}, + multiaddr::{Protocol, Multiaddr, multiaddr}, muxing::StreamMuxerBox, upgrade }; @@ -49,61 +49,83 @@ use multihash::{wrap, Code, Multihash}; type TestSwarm = Swarm>; +fn build_node() -> (Multiaddr, TestSwarm) { + build_node_with_config(Default::default()) +} + +fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { + let local_key = identity::Keypair::generate_ed25519(); + let local_public_key = local_key.public(); + let transport = MemoryTransport::default() + .upgrade(upgrade::Version::V1) + .authenticate(SecioConfig::new(local_key)) + .multiplex(yamux::Config::default()) + .map(|(p, m), _| (p, StreamMuxerBox::new(m))) + .map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); }) + .boxed(); + + let local_id = local_public_key.clone().into_peer_id(); + let store = MemoryStore::new(local_id.clone()); + let behaviour = Kademlia::with_config(local_id.clone(), store, cfg.clone()); + + let mut swarm = Swarm::new(transport, behaviour, local_id); + + let address: Multiaddr = Protocol::Memory(random::()).into(); + Swarm::listen_on(&mut swarm, address.clone()).unwrap(); + + (address, swarm) +} + /// Builds swarms, each listening on a port. Does *not* connect the nodes together. -fn build_nodes(num: usize) -> (u64, Vec) { +fn build_nodes(num: usize) -> Vec<(Multiaddr, TestSwarm)> { build_nodes_with_config(num, Default::default()) } /// Builds swarms, each listening on a port. Does *not* connect the nodes together. -fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec) { - let port_base = 1 + random::() % (u64::MAX - num as u64); - let mut result: Vec> = Vec::with_capacity(num); - - for _ in 0 .. num { - let local_key = identity::Keypair::generate_ed25519(); - let local_public_key = local_key.public(); - let transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) - .authenticate(SecioConfig::new(local_key)) - .multiplex(yamux::Config::default()) - .map(|(p, m), _| (p, StreamMuxerBox::new(m))) - .map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); }) - .boxed(); - - let local_id = local_public_key.clone().into_peer_id(); - let store = MemoryStore::new(local_id.clone()); - let behaviour = Kademlia::with_config(local_id.clone(), store, cfg.clone()); - result.push(Swarm::new(transport, behaviour, local_id)); - } - - for (i, s) in result.iter_mut().enumerate() { - Swarm::listen_on(s, Protocol::Memory(port_base + i as u64).into()).unwrap(); - } - - (port_base, result) +fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(Multiaddr, TestSwarm)> { + (0..num).map(|_| build_node_with_config(cfg.clone())).collect() } -fn build_connected_nodes(total: usize, step: usize) -> (Vec, Vec) { +fn build_connected_nodes(total: usize, step: usize) -> Vec<(Multiaddr, TestSwarm)> { build_connected_nodes_with_config(total, step, Default::default()) } fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig) - -> (Vec, Vec) + -> Vec<(Multiaddr, TestSwarm)> { - let (port_base, mut swarms) = build_nodes_with_config(total, cfg); - let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); + let mut swarms = build_nodes_with_config(total, cfg); + let swarm_ids: Vec<_> = swarms.iter() + .map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone())) + .collect(); let mut i = 0; - for (j, peer) in swarm_ids.iter().enumerate().skip(1) { + for (j, (addr, peer_id)) in swarm_ids.iter().enumerate().skip(1) { if i < swarm_ids.len() { - swarms[i].add_address(&peer, Protocol::Memory(port_base + j as u64).into()); + swarms[i].1.add_address(peer_id, addr.clone()); } if j % step == 0 { i += step; } } - (swarm_ids, swarms) + swarms +} + +fn build_fully_connected_nodes_with_config(total: usize, cfg: KademliaConfig) + -> Vec<(Multiaddr, TestSwarm)> +{ + let mut swarms = build_nodes_with_config(total, cfg); + let swarm_addr_and_peer_id: Vec<_> = swarms.iter() + .map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone())) + .collect(); + + for (_addr, swarm) in swarms.iter_mut() { + for (addr, peer) in &swarm_addr_and_peer_id { + swarm.add_address(&peer, addr.clone()); + } + } + + swarms } fn random_multihash() -> Multihash { @@ -114,8 +136,17 @@ fn random_multihash() -> Multihash { fn bootstrap() { fn run(rng: &mut impl Rng) { let num_total = rng.gen_range(2, 20); - let num_group = rng.gen_range(1, num_total); - let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group); + // When looking for the closest node to a key, Kademlia considers ALPHA_VALUE nodes to query + // at initialization. If `num_groups` is larger than ALPHA_VALUE the remaining locally known + // nodes will not be considered. Given that no other node is aware of them, they would be + // lost entirely. To prevent the above restrict `num_groups` to be equal or smaller than + // ALPHA_VALUE. + let num_group = rng.gen_range(1, (num_total % ALPHA_VALUE.get()) + 2); + + let mut swarms = build_connected_nodes(num_total, num_group).into_iter() + .map(|(_a, s)| s) + .collect::>(); + let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); swarms[0].bootstrap(); @@ -166,7 +197,10 @@ fn query_iter() { fn run(rng: &mut impl Rng) { let num_total = rng.gen_range(2, 20); - let (swarm_ids, mut swarms) = build_connected_nodes(num_total, 1); + let mut swarms = build_connected_nodes(num_total, 1).into_iter() + .map(|(_a, s)| s) + .collect::>(); + let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); // Ask the first peer in the list to search a random peer. The search should // propagate forwards through the list of peers. @@ -218,7 +252,9 @@ fn unresponsive_not_returned_direct() { // Build one node. It contains fake addresses to non-existing nodes. We ask it to find a // random peer. We make sure that no fake address is returned. - let (_, mut swarms) = build_nodes(1); + let mut swarms = build_nodes(1).into_iter() + .map(|(_a, s)| s) + .collect::>(); // Add fake addresses. for _ in 0 .. 10 { @@ -258,16 +294,20 @@ fn unresponsive_not_returned_indirect() { // non-existing nodes. We ask node #2 to find a random peer. We make sure that no fake address // is returned. - let (port_base, mut swarms) = build_nodes(2); + let mut swarms = build_nodes(2); // Add fake addresses to first. - let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone(); for _ in 0 .. 10 { - swarms[0].add_address(&PeerId::random(), multiaddr![Udp(10u16)]); + swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)]); } // Connect second to first. - swarms[1].add_address(&first_peer_id, Protocol::Memory(port_base).into()); + let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone(); + let first_address = swarms[0].0.clone(); + swarms[1].1.add_address(&first_peer_id, first_address); + + // Drop the swarm addresses. + let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>(); // Ask second to search a random value. let search_target = PeerId::random(); @@ -299,12 +339,19 @@ fn unresponsive_not_returned_indirect() { #[test] fn get_record_not_found() { - let (port_base, mut swarms) = build_nodes(3); + let mut swarms = build_nodes(3); - let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); + let swarm_ids: Vec<_> = swarms.iter() + .map(|(_addr, swarm)| Swarm::local_peer_id(swarm)) + .cloned() + .collect(); - swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into()); - swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into()); + let (second, third) = (swarms[1].0.clone(), swarms[2].0.clone()); + swarms[0].1.add_address(&swarm_ids[1], second); + swarms[1].1.add_address(&swarm_ids[2], third); + + // Drop the swarm addresses. + let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>(); let target_key = record::Key::from(random_multihash()); swarms[0].get_record(&target_key, Quorum::One); @@ -338,16 +385,35 @@ fn get_record_not_found() { ) } +/// A node joining a fully connected network via a single bootnode should be able to put a record to +/// the X closest nodes of the network where X is equal to the configured replication factor. #[test] fn put_record() { fn prop(replication_factor: usize, records: Vec) { let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap(); let num_total = replication_factor.get() * 2; - let num_group = replication_factor.get(); let mut config = KademliaConfig::default(); config.set_replication_factor(replication_factor); - let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config); + + let mut swarms = { + let mut fully_connected_swarms = build_fully_connected_nodes_with_config( + num_total - 1, + config.clone(), + ); + + let mut single_swarm = build_node_with_config(config); + single_swarm.1.add_address( + Swarm::local_peer_id(&fully_connected_swarms[0].1), + fully_connected_swarms[0].0.clone(), + ); + + let mut swarms = vec![single_swarm]; + swarms.append(&mut fully_connected_swarms); + + // Drop the swarm addresses. + swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>() + }; let records = records.into_iter() .take(num_total) @@ -378,7 +444,7 @@ fn put_record() { Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) | Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => { match res { - Err(e) => panic!(e), + Err(e) => panic!("{:?}", e), Ok(ok) => { assert!(records.contains_key(&ok.key)); let record = swarm.store.get(&ok.key).unwrap(); @@ -408,10 +474,14 @@ fn put_record() { assert_eq!(r.key, expected.key); assert_eq!(r.value, expected.value); assert_eq!(r.expires, expected.expires); - assert_eq!(r.publisher.as_ref(), Some(&swarm_ids[0])); + assert_eq!(r.publisher.as_ref(), Some(Swarm::local_peer_id(&swarms[0]))); let key = kbucket::Key::new(r.key.clone()); - let mut expected = swarm_ids.clone().split_off(1); + let mut expected = swarms.iter() + .skip(1) + .map(Swarm::local_peer_id) + .cloned() + .collect::>(); expected.sort_by(|id1, id2| kbucket::Key::new(id1.clone()).distance(&key).cmp( &kbucket::Key::new(id2.clone()).distance(&key))); @@ -421,17 +491,32 @@ fn put_record() { .take(replication_factor.get()) .collect::>(); - let actual = swarms.iter().enumerate().skip(1) - .filter_map(|(i, s)| - if s.store.get(key.preimage()).is_some() { - Some(swarm_ids[i].clone()) + let actual = swarms.iter() + .skip(1) + .filter_map(|swarm| + if swarm.store.get(key.preimage()).is_some() { + Some(Swarm::local_peer_id(swarm).clone()) } else { None }) .collect::>(); assert_eq!(actual.len(), replication_factor.get()); - assert_eq!(actual, expected); + + let actual_not_expected = actual.difference(&expected) + .collect::>(); + assert!( + actual_not_expected.is_empty(), + "Did not expect records to be stored on nodes {:?}.", + actual_not_expected, + ); + + let expected_not_actual = expected.difference(&actual) + .collect::>(); + assert!(expected_not_actual.is_empty(), + "Expected record to be stored on nodes {:?}.", + expected_not_actual, + ); } if republished { @@ -452,17 +537,21 @@ fn put_record() { ) } - QuickCheck::new().tests(3).quickcheck(prop as fn(_,_)) + QuickCheck::new().tests(3).quickcheck(prop as fn(_,_) -> _) } #[test] fn get_value() { - let (port_base, mut swarms) = build_nodes(3); + let mut swarms = build_nodes(3); - let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); + // Let first peer know of second peer and second peer know of third peer. + for i in 0..2 { + let (peer_id, address) = (Swarm::local_peer_id(&swarms[i+1].1).clone(), swarms[i+1].0.clone()); + swarms[i].1.add_address(&peer_id, address); + } - swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into()); - swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into()); + // Drop the swarm addresses. + let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>(); let record = Record::new(random_multihash(), vec![4,5,6]); @@ -496,7 +585,9 @@ fn get_value() { fn get_value_many() { // TODO: Randomise let num_nodes = 12; - let (_, mut swarms) = build_connected_nodes(num_nodes, num_nodes); + let mut swarms = build_connected_nodes(num_nodes, 3).into_iter() + .map(|(_addr, swarm)| swarm) + .collect::>(); let num_results = 10; let record = Record::new(random_multihash(), vec![4,5,6]); @@ -530,17 +621,36 @@ fn get_value_many() { ) } +/// A node joining a fully connected network via a single bootnode should be able to add itself as a +/// provider to the X closest nodes of the network where X is equal to the configured replication +/// factor. #[test] fn add_provider() { fn prop(replication_factor: usize, keys: Vec) { let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap(); let num_total = replication_factor.get() * 2; - let num_group = replication_factor.get(); let mut config = KademliaConfig::default(); config.set_replication_factor(replication_factor); - let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config); + let mut swarms = { + let mut fully_connected_swarms = build_fully_connected_nodes_with_config( + num_total - 1, + config.clone(), + ); + + let mut single_swarm = build_node_with_config(config); + single_swarm.1.add_address( + Swarm::local_peer_id(&fully_connected_swarms[0].1), + fully_connected_swarms[0].0.clone(), + ); + + let mut swarms = vec![single_swarm]; + swarms.append(&mut fully_connected_swarms); + + // Drop addresses before returning. + swarms.into_iter().map(|(_addr, swarm)| swarm).collect::>() + }; let keys: HashSet<_> = keys.into_iter().take(num_total).collect(); @@ -594,10 +704,10 @@ fn add_provider() { // each key was published to the `replication_factor` closest peers. while let Some(key) = results.pop() { // Collect the nodes that have a provider record for `key`. - let actual = swarms.iter().enumerate().skip(1) - .filter_map(|(i, s)| - if s.store.providers(&key).len() == 1 { - Some(swarm_ids[i].clone()) + let actual = swarms.iter().skip(1) + .filter_map(|swarm| + if swarm.store.providers(&key).len() == 1 { + Some(Swarm::local_peer_id(&swarm).clone()) } else { None }) @@ -609,7 +719,11 @@ fn add_provider() { return Poll::Pending } - let mut expected = swarm_ids.clone().split_off(1); + let mut expected = swarms.iter() + .skip(1) + .map(Swarm::local_peer_id) + .cloned() + .collect::>(); let kbucket_key = kbucket::Key::new(key); expected.sort_by(|id1, id2| kbucket::Key::new(id1.clone()).distance(&kbucket_key).cmp( @@ -625,8 +739,8 @@ fn add_provider() { // One round of publishing is complete. assert!(results.is_empty()); - for s in &swarms { - assert_eq!(s.queries.size(), 0); + for swarm in &swarms { + assert_eq!(swarm.queries.size(), 0); } if republished { @@ -656,19 +770,19 @@ fn add_provider() { /// arithmetic overflow, see https://github.com/libp2p/rust-libp2p/issues/1290. #[test] fn exceed_jobs_max_queries() { - let (_, mut swarms) = build_nodes(1); + let (_addr, mut swarm) = build_node(); let num = JOBS_MAX_QUERIES + 1; for _ in 0 .. num { - swarms[0].bootstrap(); + swarm.bootstrap(); } - assert_eq!(swarms[0].queries.size(), num); + assert_eq!(swarm.queries.size(), num); block_on( poll_fn(move |ctx| { for _ in 0 .. num { // There are no other nodes, so the queries finish instantly. - if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) { + if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) { if let KademliaEvent::BootstrapResult(r) = e { assert!(r.is_ok(), "Unexpected error") } else { diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index 987d0c36..3e9b6a62 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -108,7 +108,7 @@ impl ClosestPeersIter { let state = PeerState::NotContacted; (distance, Peer { key, state }) }) - .take(config.num_results)); + .take(K_VALUE.into())); // The iterator initially makes progress by iterating towards the target. let state = State::Iterating { no_progress : 0 }; @@ -695,6 +695,26 @@ mod tests { } #[test] + fn without_success_try_up_to_k_peers() { + fn prop(mut iter: ClosestPeersIter) { + let now = Instant::now(); + + for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) { + match iter.next(now) { + PeersIterState::Waiting(Some(p)) => { + let peer = p.clone().into_owned(); + iter.on_failure(&peer); + }, + _ => panic!("Expected iterator to yield another peer to query."), + } + } + + assert_eq!(PeersIterState::Finished, iter.next(now)); + } + + QuickCheck::new().tests(10).quickcheck(prop as fn(_)) + } + fn stalled_at_capacity() { fn prop(mut iter: ClosestPeersIter) { iter.state = State::Stalled;