From 3e0528fdcb22227ed6d2d9f05d4cfd8ab53cadfa Mon Sep 17 00:00:00 2001 From: folex <0xdxdy@gmail.com> Date: Wed, 25 Mar 2020 19:26:24 +0300 Subject: [PATCH] merge --- protocols/kad/src/behaviour/test.rs | 122 +++++++++++++++------------- 1 file changed, 66 insertions(+), 56 deletions(-) diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 22436811..9dd843ec 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -46,21 +46,23 @@ use quickcheck::*; use rand::{Rng, random, thread_rng}; use std::{collections::{HashSet, HashMap}, io, num::NonZeroUsize, u64}; use multihash::{wrap, Code, Multihash}; +use libp2p_core::identity::ed25519; type TestSwarm = Swarm>; /// Builds swarms, each listening on a port. Does *not* connect the nodes together. -fn build_nodes(num: usize) -> (u64, Vec) { +fn build_nodes(num: usize) -> (u64, Vec<(ed25519::Keypair, TestSwarm)>) { build_nodes_with_config(num, Default::default()) } /// Builds swarms, each listening on a port. Does *not* connect the nodes together. -fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec) { +fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<(ed25519::Keypair, TestSwarm)>) { let port_base = 1 + random::() % (u64::MAX - num as u64); - let mut result: Vec> = Vec::with_capacity(num); + let mut result: Vec<(ed25519::Keypair, Swarm<_, _>)> = Vec::with_capacity(num); for _ in 0 .. num { - let local_key = identity::Keypair::generate_ed25519(); + let ed25519_key = ed25519::Keypair::generate(); + let local_key = identity::Keypair::Ed25519(ed25519_key.clone()); let local_public_key = local_key.public(); let transport = MemoryTransport::default() .upgrade(upgrade::Version::V1) @@ -72,31 +74,32 @@ fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec (Vec, Vec) { +fn build_connected_nodes(total: usize, step: usize) -> (Vec, Vec<(ed25519::Keypair, TestSwarm)>) { build_connected_nodes_with_config(total, step, Default::default()) } fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig) - -> (Vec, Vec) + -> (Vec, Vec<(ed25519::Keypair, TestSwarm)>) { let (port_base, mut swarms) = build_nodes_with_config(total, cfg); - let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); + let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect(); let mut i = 0; for (j, peer) in swarm_ids.iter().enumerate().skip(1) { if i < swarm_ids.len() { - swarms[i].add_address(&peer, Protocol::Memory(port_base + j as u64).into()); + let public = swarms[i].0.public(); + swarms[i].1.add_address(&peer, Protocol::Memory(port_base + j as u64).into(), public); } if j % step == 0 { i += step; @@ -117,7 +120,7 @@ fn bootstrap() { let num_group = rng.gen_range(1, num_total); let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group); - swarms[0].bootstrap(); + swarms[0].1.bootstrap(); // Expected known peers let expected_known = swarm_ids.iter().skip(1).cloned().collect::>(); @@ -125,7 +128,7 @@ fn bootstrap() { // Run test block_on( poll_fn(move |ctx| { - for (i, swarm) in swarms.iter_mut().enumerate() { + for (i, (_, swarm)) in swarms.iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => { @@ -172,7 +175,7 @@ fn query_iter() { // propagate forwards through the list of peers. let search_target = PeerId::random(); let search_target_key = kbucket::Key::new(search_target.clone()); - swarms[0].get_closest_peers(search_target.clone()); + swarms[0].1.get_closest_peers(search_target.clone()); // Set up expectations. let expected_swarm_id = swarm_ids[0].clone(); @@ -183,7 +186,7 @@ fn query_iter() { // Run test block_on( poll_fn(move |ctx| { - for (i, swarm) in swarms.iter_mut().enumerate() { + for (i, (_, swarm)) in swarms.iter_mut().enumerate() { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { @@ -222,16 +225,17 @@ fn unresponsive_not_returned_direct() { // Add fake addresses. for _ in 0 .. 10 { - swarms[0].add_address(&PeerId::random(), Protocol::Udp(10u16).into()); + let public0 = swarms[0].0.public(); + swarms[0].1.add_address(&PeerId::random(), Protocol::Udp(10u16).into(), public0); } // Ask first to search a random value. let search_target = PeerId::random(); - swarms[0].get_closest_peers(search_target.clone()); + swarms[0].1.get_closest_peers(search_target.clone()); block_on( poll_fn(move |ctx| { - for swarm in &mut swarms { + for (_, swarm) in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { @@ -261,21 +265,23 @@ fn unresponsive_not_returned_indirect() { let (port_base, mut swarms) = build_nodes(2); // Add fake addresses to first. - let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone(); + let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone(); for _ in 0 .. 10 { - swarms[0].add_address(&PeerId::random(), multiaddr![Udp(10u16)]); + let public0 = swarms[0].0.public(); + swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)], public0); } // Connect second to first. - swarms[1].add_address(&first_peer_id, Protocol::Memory(port_base).into()); + let public1 = swarms[1].0.public(); + swarms[1].1.add_address(&first_peer_id, Protocol::Memory(port_base).into(), public1); // Ask second to search a random value. let search_target = PeerId::random(); - swarms[1].get_closest_peers(search_target.clone()); + swarms[1].1.get_closest_peers(search_target.clone()); block_on( poll_fn(move |ctx| { - for swarm in &mut swarms { + for (_, swarm) in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { @@ -301,17 +307,19 @@ fn unresponsive_not_returned_indirect() { fn get_record_not_found() { let (port_base, mut swarms) = build_nodes(3); - let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); + let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect(); - swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into()); - swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into()); + let public0 = swarms[0].0.public(); + swarms[0].1.add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into(), public0); + let public1 = swarms[1].0.public(); + swarms[1].1.add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into(), public1); let target_key = record::Key::from(random_multihash()); - swarms[0].get_record(&target_key, Quorum::One); + swarms[0].1.get_record(&target_key, Quorum::One); block_on( poll_fn(move |ctx| { - for swarm in &mut swarms { + for (_, swarm) in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => { @@ -361,7 +369,7 @@ fn put_record() { .collect::>(); for r in records.values() { - swarms[0].put_record(r.clone(), Quorum::All); + swarms[0].1.put_record(r.clone(), Quorum::All); } // Each test run republishes all records once. @@ -372,7 +380,7 @@ fn put_record() { block_on( poll_fn(move |ctx| loop { // Poll all swarms until they are "Pending". - for swarm in &mut swarms { + for (_, swarm) in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) | @@ -422,7 +430,7 @@ fn put_record() { .collect::>(); let actual = swarms.iter().enumerate().skip(1) - .filter_map(|(i, s)| + .filter_map(|(i, (_, s))| if s.store.get(key.preimage()).is_some() { Some(swarm_ids[i].clone()) } else { @@ -435,18 +443,18 @@ fn put_record() { } if republished { - assert_eq!(swarms[0].store.records().count(), records.len()); - assert_eq!(swarms[0].queries.size(), 0); + assert_eq!(swarms[0].1.store.records().count(), records.len()); + assert_eq!(swarms[0].1.queries.size(), 0); for k in records.keys() { - swarms[0].store.remove(&k); + swarms[0].1.store.remove(&k); } - assert_eq!(swarms[0].store.records().count(), 0); + assert_eq!(swarms[0].1.store.records().count(), 0); // All records have been republished, thus the test is complete. return Poll::Ready(()); } // Tell the replication job to republish asap. - swarms[0].put_record_job.as_mut().unwrap().asap(true); + swarms[0].1.put_record_job.as_mut().unwrap().asap(true); republished = true; }) ) @@ -459,19 +467,21 @@ fn put_record() { fn get_value() { let (port_base, mut swarms) = build_nodes(3); - let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); + let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect(); - swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into()); - swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into()); + let public0 = swarms[0].0.public(); + swarms[0].1.add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into(), public0); + let public1 = swarms[1].0.public(); + swarms[1].1.add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into(), public1); let record = Record::new(random_multihash(), vec![4,5,6]); - swarms[1].store.put(record.clone()).unwrap(); - swarms[0].get_record(&record.key, Quorum::One); + swarms[1].1.store.put(record.clone()).unwrap(); + swarms[0].1.get_record(&record.key, Quorum::One); block_on( poll_fn(move |ctx| { - for swarm in &mut swarms { + for (_, swarm) in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { @@ -502,15 +512,15 @@ fn get_value_many() { let record = Record::new(random_multihash(), vec![4,5,6]); for i in 0 .. num_nodes { - swarms[i].store.put(record.clone()).unwrap(); + swarms[i].1.store.put(record.clone()).unwrap(); } let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap()); - swarms[0].get_record(&record.key, quorum); + swarms[0].1.get_record(&record.key, quorum); block_on( poll_fn(move |ctx| { - for swarm in &mut swarms { + for (_, swarm) in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { @@ -552,13 +562,13 @@ fn add_provider() { // Initiate the first round of publishing. for k in &keys { - swarms[0].start_providing(k.clone()); + swarms[0].1.start_providing(k.clone()); } block_on( poll_fn(move |ctx| loop { // Poll all swarms until they are "Pending". - for swarm in &mut swarms { + for (_, swarm) in &mut swarms { loop { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) | @@ -595,7 +605,7 @@ fn add_provider() { while let Some(key) = results.pop() { // Collect the nodes that have a provider record for `key`. let actual = swarms.iter().enumerate().skip(1) - .filter_map(|(i, s)| + .filter_map(|(i, (_, s))| if s.store.providers(&key).len() == 1 { Some(swarm_ids[i].clone()) } else { @@ -625,23 +635,23 @@ fn add_provider() { // One round of publishing is complete. assert!(results.is_empty()); - for s in &swarms { + for (_, s) in &swarms { assert_eq!(s.queries.size(), 0); } if republished { - assert_eq!(swarms[0].store.provided().count(), keys.len()); + assert_eq!(swarms[0].1.store.provided().count(), keys.len()); for k in &keys { - swarms[0].stop_providing(&k); + swarms[0].1.stop_providing(&k); } - assert_eq!(swarms[0].store.provided().count(), 0); + assert_eq!(swarms[0].1.store.provided().count(), 0); // All records have been republished, thus the test is complete. return Poll::Ready(()); } // Initiate the second round of publishing by telling the // periodic provider job to run asap. - swarms[0].add_provider_job.as_mut().unwrap().asap(); + swarms[0].1.add_provider_job.as_mut().unwrap().asap(); published = false; republished = true; }) @@ -659,16 +669,16 @@ fn exceed_jobs_max_queries() { let (_, mut swarms) = build_nodes(1); let num = JOBS_MAX_QUERIES + 1; for _ in 0 .. num { - swarms[0].bootstrap(); + swarms[0].1.bootstrap(); } - assert_eq!(swarms[0].queries.size(), num); + assert_eq!(swarms[0].1.queries.size(), num); block_on( poll_fn(move |ctx| { for _ in 0 .. num { // There are no other nodes, so the queries finish instantly. - if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) { + if let Poll::Ready(Some(e)) = swarms[0].1.poll_next_unpin(ctx) { if let KademliaEvent::BootstrapResult(r) = e { assert!(r.is_ok(), "Unexpected error") } else {