Merge branch 'fluence_master' into memory_store_set

This commit is contained in:
folex 2020-04-21 21:40:48 +03:00
commit 122bb921d7

View File

@ -50,11 +50,11 @@ use libp2p_core::identity::ed25519;
type TestSwarm = Swarm<Kademlia<MemoryStore>>; type TestSwarm = Swarm<Kademlia<MemoryStore>>;
fn build_node() -> (Ed25519::Keypair, Multiaddr, TestSwarm) { fn build_node() -> (ed25519::Keypair, Multiaddr, TestSwarm) {
build_node_with_config(Default::default()) build_node_with_config(Default::default())
} }
fn build_node_with_config(cfg: KademliaConfig) -> (Ed25519::Keypair, Multiaddr, TestSwarm) { fn build_node_with_config(cfg: KademliaConfig) -> (ed25519::Keypair, Multiaddr, TestSwarm) {
let ed25519_key = ed25519::Keypair::generate(); let ed25519_key = ed25519::Keypair::generate();
let local_key = identity::Keypair::Ed25519(ed25519_key.clone()); let local_key = identity::Keypair::Ed25519(ed25519_key.clone());
let local_public_key = local_key.public(); let local_public_key = local_key.public();
@ -80,21 +80,21 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Ed25519::Keypair, Multiaddr,
} }
/// Builds swarms, each listening on a port. Does *not* connect the nodes together. /// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes(num: usize) -> Vec<(Ed25519::Keypair, Multiaddr, TestSwarm)> { fn build_nodes(num: usize) -> Vec<(ed25519::Keypair, Multiaddr, TestSwarm)> {
build_nodes_with_config(num, Default::default()) build_nodes_with_config(num, Default::default())
} }
/// Builds swarms, each listening on a port. Does *not* connect the nodes together. /// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(Ed25519::Keypair, Multiaddr, TestSwarm)> { fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(ed25519::Keypair, Multiaddr, TestSwarm)> {
(0..num).map(|_| build_node_with_config(cfg.clone())).collect() (0..num).map(|_| build_node_with_config(cfg.clone())).collect()
} }
fn build_connected_nodes(total: usize, step: usize) -> Vec<(Ed25519::Keypair, Multiaddr, TestSwarm)> { fn build_connected_nodes(total: usize, step: usize) -> Vec<(ed25519::Keypair, Multiaddr, TestSwarm)> {
build_connected_nodes_with_config(total, step, Default::default()) build_connected_nodes_with_config(total, step, Default::default())
} }
fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig) fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig)
-> Vec<(Ed25519::Keypair, Multiaddr, TestSwarm)> -> Vec<(ed25519::Keypair, Multiaddr, TestSwarm)>
{ {
let mut swarms = build_nodes_with_config(total, cfg); let mut swarms = build_nodes_with_config(total, cfg);
let swarm_ids: Vec<_> = swarms.iter() let swarm_ids: Vec<_> = swarms.iter()
@ -105,7 +105,7 @@ fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaCon
for (j, (addr, peer_id)) in swarm_ids.iter().enumerate().skip(1) { for (j, (addr, peer_id)) in swarm_ids.iter().enumerate().skip(1) {
if i < swarm_ids.len() { if i < swarm_ids.len() {
let public = swarms[i].0.public(); let public = swarms[i].0.public();
swarms[i].1.add_address(peer_id, addr.clone(), public); swarms[i].2.add_address(peer_id, addr.clone(), public);
} }
if j % step == 0 { if j % step == 0 {
i += step; i += step;
@ -120,12 +120,12 @@ fn build_fully_connected_nodes_with_config(total: usize, cfg: KademliaConfig)
{ {
let mut swarms = build_nodes_with_config(total, cfg); let mut swarms = build_nodes_with_config(total, cfg);
let swarm_addr_and_peer_id: Vec<_> = swarms.iter() let swarm_addr_and_peer_id: Vec<_> = swarms.iter()
.map(|(public, addr, swarm)| (public, addr.clone(), Swarm::local_peer_id(swarm).clone())) .map(|(kp, addr, swarm)| (kp.public().clone(), addr.clone(), Swarm::local_peer_id(swarm).clone()))
.collect(); .collect();
for (_addr, swarm) in swarms.iter_mut() { for (_, _addr, swarm) in swarms.iter_mut() {
for (public, addr, peer) in &swarm_addr_and_peer_id { for (public, addr, peer) in &swarm_addr_and_peer_id {
swarm.add_address(&peer, addr.clone(), public); swarm.add_address(&peer, addr.clone(), public.clone());
} }
} }
@ -148,11 +148,11 @@ fn bootstrap() {
let num_group = rng.gen_range(1, (num_total % ALPHA_VALUE.get()) + 2); let num_group = rng.gen_range(1, (num_total % ALPHA_VALUE.get()) + 2);
let mut swarms = build_connected_nodes(num_total, num_group).into_iter() let mut swarms = build_connected_nodes(num_total, num_group).into_iter()
.map(|(_a, s)| s) .map(|(_, _a, s)| s)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
swarms[0].1.bootstrap(); swarms[0].bootstrap();
// Expected known peers // Expected known peers
let expected_known = swarm_ids.iter().skip(1).cloned().collect::<HashSet<_>>(); let expected_known = swarm_ids.iter().skip(1).cloned().collect::<HashSet<_>>();
@ -160,7 +160,7 @@ fn bootstrap() {
// Run test // Run test
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for (i, (_, swarm)) in swarms.iter_mut().enumerate() { for (i, swarm) in swarms.iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => {
@ -202,7 +202,7 @@ fn query_iter() {
fn run(rng: &mut impl Rng) { fn run(rng: &mut impl Rng) {
let num_total = rng.gen_range(2, 20); let num_total = rng.gen_range(2, 20);
let mut swarms = build_connected_nodes(num_total, 1).into_iter() let mut swarms = build_connected_nodes(num_total, 1).into_iter()
.map(|(_a, s)| s) .map(|(_, _a, s)| s)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
@ -210,7 +210,7 @@ fn query_iter() {
// propagate forwards through the list of peers. // propagate forwards through the list of peers.
let search_target = PeerId::random(); let search_target = PeerId::random();
let search_target_key = kbucket::Key::new(search_target.clone()); let search_target_key = kbucket::Key::new(search_target.clone());
swarms[0].1.get_closest_peers(search_target.clone()); swarms[0].get_closest_peers(search_target.clone());
// Set up expectations. // Set up expectations.
let expected_swarm_id = swarm_ids[0].clone(); let expected_swarm_id = swarm_ids[0].clone();
@ -221,7 +221,7 @@ fn query_iter() {
// Run test // Run test
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for (i, (_, swarm)) in swarms.iter_mut().enumerate() { for (i, swarm) in swarms.iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
@ -257,7 +257,7 @@ fn unresponsive_not_returned_direct() {
// random peer. We make sure that no fake address is returned. // random peer. We make sure that no fake address is returned.
let mut swarms = build_nodes(1).into_iter() let mut swarms = build_nodes(1).into_iter()
.map(|(_a, s)| s) .map(|(kp, _a, s)| (kp, s))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Add fake addresses. // Add fake addresses.
@ -304,25 +304,25 @@ fn unresponsive_not_returned_indirect() {
// Add fake addresses to first. // Add fake addresses to first.
for _ in 0 .. 10 { for _ in 0 .. 10 {
let public0 = swarms[0].0.public(); let public0 = swarms[0].0.public();
swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)]); swarms[0].2.add_address(&PeerId::random(), multiaddr![Udp(10u16)], public0);
} }
// Connect second to first. // Connect second to first.
let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone(); let first_peer_id = Swarm::local_peer_id(&swarms[0].2).clone();
let first_address = swarms[0].0.clone(); let first_address = swarms[0].1.clone();
let public1 = swarms[1].0.public(); let public1 = swarms[1].0.public();
swarms[1].1.add_address(&first_peer_id, first_address); swarms[1].2.add_address(&first_peer_id, first_address, public1);
// Drop the swarm addresses. // Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>(); let mut swarms = swarms.into_iter().map(|(_, _addr, swarm)| swarm).collect::<Vec<_>>();
// Ask second to search a random value. // Ask second to search a random value.
let search_target = PeerId::random(); let search_target = PeerId::random();
swarms[1].1.get_closest_peers(search_target.clone()); swarms[1].get_closest_peers(search_target.clone());
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for (_, swarm) in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
@ -349,25 +349,25 @@ fn get_record_not_found() {
let mut swarms = build_nodes(3); let mut swarms = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter() let swarm_ids: Vec<_> = swarms.iter()
.map(|(_addr, swarm)| Swarm::local_peer_id(swarm)) .map(|(_, _addr, swarm)| Swarm::local_peer_id(swarm))
.cloned() .cloned()
.collect(); .collect();
let public0 = swarms[0].0.public(); let public0 = swarms[0].0.public();
let public1 = swarms[1].0.public(); let public1 = swarms[1].0.public();
let (second, third) = (swarms[1].0.clone(), swarms[2].0.clone()); let (second, third) = (swarms[1].1.clone(), swarms[2].1.clone());
swarms[0].1.add_address(&swarm_ids[1], second); swarms[0].2.add_address(&swarm_ids[1], second, public0);
swarms[1].1.add_address(&swarm_ids[2], third); swarms[1].2.add_address(&swarm_ids[2], third, public1);
// Drop the swarm addresses. // Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>(); let mut swarms = swarms.into_iter().map(|(_, _addr, swarm)| swarm).collect::<Vec<_>>();
let target_key = record::Key::from(random_multihash()); let target_key = record::Key::from(random_multihash());
swarms[0].1.get_record(&target_key, Quorum::One); swarms[0].get_record(&target_key, Quorum::One);
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for (_, swarm) in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => { Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => {
@ -412,16 +412,17 @@ fn put_record() {
); );
let mut single_swarm = build_node_with_config(config); let mut single_swarm = build_node_with_config(config);
single_swarm.1.add_address( single_swarm.2.add_address(
Swarm::local_peer_id(&fully_connected_swarms[0].1), Swarm::local_peer_id(&fully_connected_swarms[0].2),
fully_connected_swarms[0].0.clone(), fully_connected_swarms[0].1.clone(),
fully_connected_swarms[0].0.public(),
); );
let mut swarms = vec![single_swarm]; let mut swarms = vec![single_swarm];
swarms.append(&mut fully_connected_swarms); swarms.append(&mut fully_connected_swarms);
// Drop the swarm addresses. // Drop the swarm addresses.
swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>() swarms.into_iter().map(|(_, _addr, swarm)| swarm).collect::<Vec<_>>()
}; };
let records = records.into_iter() let records = records.into_iter()
@ -436,7 +437,7 @@ fn put_record() {
.collect::<HashMap<_,_>>(); .collect::<HashMap<_,_>>();
for r in records.values() { for r in records.values() {
swarms[0].1.put_record(r.clone(), Quorum::All); swarms[0].put_record(r.clone(), Quorum::All);
} }
// Each test run republishes all records once. // Each test run republishes all records once.
@ -447,7 +448,7 @@ fn put_record() {
block_on( block_on(
poll_fn(move |ctx| loop { poll_fn(move |ctx| loop {
// Poll all swarms until they are "Pending". // Poll all swarms until they are "Pending".
for (_, swarm) in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) | Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) |
@ -502,7 +503,7 @@ fn put_record() {
let actual = swarms.iter() let actual = swarms.iter()
.skip(1) .skip(1)
.filter_map(|(_, swarm)| .filter_map(|swarm|
if swarm.store.get(key.preimage()).is_some() { if swarm.store.get(key.preimage()).is_some() {
Some(Swarm::local_peer_id(swarm).clone()) Some(Swarm::local_peer_id(swarm).clone())
} else { } else {
@ -529,18 +530,18 @@ fn put_record() {
} }
if republished { if republished {
assert_eq!(swarms[0].1.store.records().count(), records.len()); assert_eq!(swarms[0].store.records().count(), records.len());
assert_eq!(swarms[0].1.queries.size(), 0); assert_eq!(swarms[0].queries.size(), 0);
for k in records.keys() { for k in records.keys() {
swarms[0].1.store.remove(&k); swarms[0].store.remove(&k);
} }
assert_eq!(swarms[0].1.store.records().count(), 0); assert_eq!(swarms[0].store.records().count(), 0);
// All records have been republished, thus the test is complete. // All records have been republished, thus the test is complete.
return Poll::Ready(()); return Poll::Ready(());
} }
// Tell the replication job to republish asap. // Tell the replication job to republish asap.
swarms[0].1.put_record_job.as_mut().unwrap().asap(true); swarms[0].put_record_job.as_mut().unwrap().asap(true);
republished = true; republished = true;
}) })
) )
@ -555,18 +556,15 @@ fn get_value() {
// Let first peer know of second peer and second peer know of third peer. // Let first peer know of second peer and second peer know of third peer.
for i in 0..2 { for i in 0..2 {
let (peer_id, address) = (|(_, s)| s).map(Swarm::local_peer_id(&swarms[i+1].1).clone(), swarms[i+1].0.clone()); let (peer_id, public, address) = (Swarm::local_peer_id(&swarms[i+1].2).clone(), swarms[i+1].0.public(), swarms[i+1].1.clone());
swarms[i].1.add_address(&peer_id, address); swarms[i].2.add_address(&peer_id, address, public);
} }
// Drop the swarm addresses. // Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_key, _addr, swarm)| swarm).collect::<Vec<_>>(); let mut swarms = swarms.into_iter().map(|(kp, _addr, swarm)| (kp, swarm)).collect::<Vec<_>>();
let record = Record::new(random_multihash(), vec![4,5,6]); let record = Record::new(random_multihash(), vec![4,5,6]);
let public0 = swarms[0].0.public();
let public1 = swarms[1].0.public();
swarms[1].1.store.put(record.clone()).unwrap(); swarms[1].1.store.put(record.clone()).unwrap();
swarms[0].1.get_record(&record.key, Quorum::One); swarms[0].1.get_record(&record.key, Quorum::One);
@ -598,22 +596,22 @@ fn get_value_many() {
// TODO: Randomise // TODO: Randomise
let num_nodes = 12; let num_nodes = 12;
let mut swarms = build_connected_nodes(num_nodes, 3).into_iter() let mut swarms = build_connected_nodes(num_nodes, 3).into_iter()
.map(|(_addr, swarm)| swarm) .map(|(_kp, _addr, swarm)| swarm)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let num_results = 10; let num_results = 10;
let record = Record::new(random_multihash(), vec![4,5,6]); let record = Record::new(random_multihash(), vec![4,5,6]);
for i in 0 .. num_nodes { for i in 0 .. num_nodes {
swarms[i].1.store.put(record.clone()).unwrap(); swarms[i].store.put(record.clone()).unwrap();
} }
let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap()); let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap());
swarms[0].1.get_record(&record.key, quorum); swarms[0].get_record(&record.key, quorum);
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for (_, swarm) in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
@ -652,16 +650,17 @@ fn add_provider() {
); );
let mut single_swarm = build_node_with_config(config); let mut single_swarm = build_node_with_config(config);
single_swarm.1.add_address( single_swarm.2.add_address(
Swarm::local_peer_id(&fully_connected_swarms[0].1), Swarm::local_peer_id(&fully_connected_swarms[0].2),
fully_connected_swarms[0].0.clone(), fully_connected_swarms[0].1.clone(),
fully_connected_swarms[0].0.public(),
); );
let mut swarms = vec![single_swarm]; let mut swarms = vec![single_swarm];
swarms.append(&mut fully_connected_swarms); swarms.append(&mut fully_connected_swarms);
// Drop addresses before returning. // Drop addresses before returning.
swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>() swarms.into_iter().map(|(_, _addr, swarm)| swarm).collect::<Vec<_>>()
}; };
let keys: HashSet<_> = keys.into_iter().take(num_total).collect(); let keys: HashSet<_> = keys.into_iter().take(num_total).collect();
@ -674,13 +673,13 @@ fn add_provider() {
// Initiate the first round of publishing. // Initiate the first round of publishing.
for k in &keys { for k in &keys {
swarms[0].1.start_providing(k.clone()); swarms[0].start_providing(k.clone());
} }
block_on( block_on(
poll_fn(move |ctx| loop { poll_fn(move |ctx| loop {
// Poll all swarms until they are "Pending". // Poll all swarms until they are "Pending".
for (_, swarm) in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) | Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) |
@ -717,7 +716,7 @@ fn add_provider() {
while let Some(key) = results.pop() { while let Some(key) = results.pop() {
// Collect the nodes that have a provider record for `key`. // Collect the nodes that have a provider record for `key`.
let actual = swarms.iter().skip(1) let actual = swarms.iter().skip(1)
.filter_map(|(_, swarm)| .filter_map(|swarm|
if swarm.store.providers(&key).len() == 1 { if swarm.store.providers(&key).len() == 1 {
Some(Swarm::local_peer_id(&swarm).clone()) Some(Swarm::local_peer_id(&swarm).clone())
} else { } else {
@ -751,23 +750,23 @@ fn add_provider() {
// One round of publishing is complete. // One round of publishing is complete.
assert!(results.is_empty()); assert!(results.is_empty());
for (_, swarm) in &swarms { for swarm in &swarms {
assert_eq!(swarm.queries.size(), 0); assert_eq!(swarm.queries.size(), 0);
} }
if republished { if republished {
assert_eq!(swarms[0].1.store.provided().count(), keys.len()); assert_eq!(swarms[0].store.provided().count(), keys.len());
for k in &keys { for k in &keys {
swarms[0].1.stop_providing(&k); swarms[0].stop_providing(&k);
} }
assert_eq!(swarms[0].1.store.provided().count(), 0); assert_eq!(swarms[0].store.provided().count(), 0);
// All records have been republished, thus the test is complete. // All records have been republished, thus the test is complete.
return Poll::Ready(()); return Poll::Ready(());
} }
// Initiate the second round of publishing by telling the // Initiate the second round of publishing by telling the
// periodic provider job to run asap. // periodic provider job to run asap.
swarms[0].1.add_provider_job.as_mut().unwrap().asap(); swarms[0].add_provider_job.as_mut().unwrap().asap();
published = false; published = false;
republished = true; republished = true;
}) })
@ -782,19 +781,19 @@ fn add_provider() {
/// arithmetic overflow, see https://github.com/libp2p/rust-libp2p/issues/1290. /// arithmetic overflow, see https://github.com/libp2p/rust-libp2p/issues/1290.
#[test] #[test]
fn exceed_jobs_max_queries() { fn exceed_jobs_max_queries() {
let (_addr, mut swarm) = build_node(); let (_, _addr, mut swarm) = build_node();
let num = JOBS_MAX_QUERIES + 1; let num = JOBS_MAX_QUERIES + 1;
for _ in 0 .. num { for _ in 0 .. num {
swarm.1.bootstrap(); swarm.bootstrap();
} }
assert_eq!(swarm.1.queries.size(), num); assert_eq!(swarm.queries.size(), num);
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for _ in 0 .. num { for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly. // There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarm.1.poll_next_unpin(ctx) { if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
if let KademliaEvent::BootstrapResult(r) = e { if let KademliaEvent::BootstrapResult(r) = e {
assert!(r.is_ok(), "Unexpected error") assert!(r.is_ok(), "Unexpected error")
} else { } else {