mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-01 04:01:20 +00:00
merge
This commit is contained in:
parent
8ced6588e2
commit
3e0528fdcb
@ -46,21 +46,23 @@ use quickcheck::*;
|
||||
use rand::{Rng, random, thread_rng};
|
||||
use std::{collections::{HashSet, HashMap}, io, num::NonZeroUsize, u64};
|
||||
use multihash::{wrap, Code, Multihash};
|
||||
use libp2p_core::identity::ed25519;
|
||||
|
||||
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
|
||||
|
||||
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
|
||||
fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
|
||||
fn build_nodes(num: usize) -> (u64, Vec<(ed25519::Keypair, TestSwarm)>) {
|
||||
build_nodes_with_config(num, Default::default())
|
||||
}
|
||||
|
||||
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
|
||||
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<TestSwarm>) {
|
||||
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<(ed25519::Keypair, TestSwarm)>) {
|
||||
let port_base = 1 + random::<u64>() % (u64::MAX - num as u64);
|
||||
let mut result: Vec<Swarm<_, _>> = Vec::with_capacity(num);
|
||||
let mut result: Vec<(ed25519::Keypair, Swarm<_, _>)> = Vec::with_capacity(num);
|
||||
|
||||
for _ in 0 .. num {
|
||||
let local_key = identity::Keypair::generate_ed25519();
|
||||
let ed25519_key = ed25519::Keypair::generate();
|
||||
let local_key = identity::Keypair::Ed25519(ed25519_key.clone());
|
||||
let local_public_key = local_key.public();
|
||||
let transport = MemoryTransport::default()
|
||||
.upgrade(upgrade::Version::V1)
|
||||
@ -72,31 +74,32 @@ fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<TestSwa
|
||||
|
||||
let local_id = local_public_key.clone().into_peer_id();
|
||||
let store = MemoryStore::new(local_id.clone());
|
||||
let behaviour = Kademlia::with_config(local_id.clone(), store, cfg.clone());
|
||||
result.push(Swarm::new(transport, behaviour, local_id));
|
||||
let behaviour = Kademlia::with_config(ed25519_key.clone(), local_id.clone(), store, cfg.clone());
|
||||
result.push((ed25519_key, Swarm::new(transport, behaviour, local_id)));
|
||||
}
|
||||
|
||||
for (i, s) in result.iter_mut().enumerate() {
|
||||
for (i, (_, s)) in result.iter_mut().enumerate() {
|
||||
Swarm::listen_on(s, Protocol::Memory(port_base + i as u64).into()).unwrap();
|
||||
}
|
||||
|
||||
(port_base, result)
|
||||
}
|
||||
|
||||
fn build_connected_nodes(total: usize, step: usize) -> (Vec<PeerId>, Vec<TestSwarm>) {
|
||||
fn build_connected_nodes(total: usize, step: usize) -> (Vec<PeerId>, Vec<(ed25519::Keypair, TestSwarm)>) {
|
||||
build_connected_nodes_with_config(total, step, Default::default())
|
||||
}
|
||||
|
||||
fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig)
|
||||
-> (Vec<PeerId>, Vec<TestSwarm>)
|
||||
-> (Vec<PeerId>, Vec<(ed25519::Keypair, TestSwarm)>)
|
||||
{
|
||||
let (port_base, mut swarms) = build_nodes_with_config(total, cfg);
|
||||
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
|
||||
let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect();
|
||||
|
||||
let mut i = 0;
|
||||
for (j, peer) in swarm_ids.iter().enumerate().skip(1) {
|
||||
if i < swarm_ids.len() {
|
||||
swarms[i].add_address(&peer, Protocol::Memory(port_base + j as u64).into());
|
||||
let public = swarms[i].0.public();
|
||||
swarms[i].1.add_address(&peer, Protocol::Memory(port_base + j as u64).into(), public);
|
||||
}
|
||||
if j % step == 0 {
|
||||
i += step;
|
||||
@ -117,7 +120,7 @@ fn bootstrap() {
|
||||
let num_group = rng.gen_range(1, num_total);
|
||||
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group);
|
||||
|
||||
swarms[0].bootstrap();
|
||||
swarms[0].1.bootstrap();
|
||||
|
||||
// Expected known peers
|
||||
let expected_known = swarm_ids.iter().skip(1).cloned().collect::<HashSet<_>>();
|
||||
@ -125,7 +128,7 @@ fn bootstrap() {
|
||||
// Run test
|
||||
block_on(
|
||||
poll_fn(move |ctx| {
|
||||
for (i, swarm) in swarms.iter_mut().enumerate() {
|
||||
for (i, (_, swarm)) in swarms.iter_mut().enumerate() {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => {
|
||||
@ -172,7 +175,7 @@ fn query_iter() {
|
||||
// propagate forwards through the list of peers.
|
||||
let search_target = PeerId::random();
|
||||
let search_target_key = kbucket::Key::new(search_target.clone());
|
||||
swarms[0].get_closest_peers(search_target.clone());
|
||||
swarms[0].1.get_closest_peers(search_target.clone());
|
||||
|
||||
// Set up expectations.
|
||||
let expected_swarm_id = swarm_ids[0].clone();
|
||||
@ -183,7 +186,7 @@ fn query_iter() {
|
||||
// Run test
|
||||
block_on(
|
||||
poll_fn(move |ctx| {
|
||||
for (i, swarm) in swarms.iter_mut().enumerate() {
|
||||
for (i, (_, swarm)) in swarms.iter_mut().enumerate() {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
|
||||
@ -222,16 +225,17 @@ fn unresponsive_not_returned_direct() {
|
||||
|
||||
// Add fake addresses.
|
||||
for _ in 0 .. 10 {
|
||||
swarms[0].add_address(&PeerId::random(), Protocol::Udp(10u16).into());
|
||||
let public0 = swarms[0].0.public();
|
||||
swarms[0].1.add_address(&PeerId::random(), Protocol::Udp(10u16).into(), public0);
|
||||
}
|
||||
|
||||
// Ask first to search a random value.
|
||||
let search_target = PeerId::random();
|
||||
swarms[0].get_closest_peers(search_target.clone());
|
||||
swarms[0].1.get_closest_peers(search_target.clone());
|
||||
|
||||
block_on(
|
||||
poll_fn(move |ctx| {
|
||||
for swarm in &mut swarms {
|
||||
for (_, swarm) in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
|
||||
@ -261,21 +265,23 @@ fn unresponsive_not_returned_indirect() {
|
||||
let (port_base, mut swarms) = build_nodes(2);
|
||||
|
||||
// Add fake addresses to first.
|
||||
let first_peer_id = Swarm::local_peer_id(&swarms[0]).clone();
|
||||
let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone();
|
||||
for _ in 0 .. 10 {
|
||||
swarms[0].add_address(&PeerId::random(), multiaddr![Udp(10u16)]);
|
||||
let public0 = swarms[0].0.public();
|
||||
swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)], public0);
|
||||
}
|
||||
|
||||
// Connect second to first.
|
||||
swarms[1].add_address(&first_peer_id, Protocol::Memory(port_base).into());
|
||||
let public1 = swarms[1].0.public();
|
||||
swarms[1].1.add_address(&first_peer_id, Protocol::Memory(port_base).into(), public1);
|
||||
|
||||
// Ask second to search a random value.
|
||||
let search_target = PeerId::random();
|
||||
swarms[1].get_closest_peers(search_target.clone());
|
||||
swarms[1].1.get_closest_peers(search_target.clone());
|
||||
|
||||
block_on(
|
||||
poll_fn(move |ctx| {
|
||||
for swarm in &mut swarms {
|
||||
for (_, swarm) in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
|
||||
@ -301,17 +307,19 @@ fn unresponsive_not_returned_indirect() {
|
||||
fn get_record_not_found() {
|
||||
let (port_base, mut swarms) = build_nodes(3);
|
||||
|
||||
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
|
||||
let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect();
|
||||
|
||||
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
|
||||
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
|
||||
let public0 = swarms[0].0.public();
|
||||
swarms[0].1.add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into(), public0);
|
||||
let public1 = swarms[1].0.public();
|
||||
swarms[1].1.add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into(), public1);
|
||||
|
||||
let target_key = record::Key::from(random_multihash());
|
||||
swarms[0].get_record(&target_key, Quorum::One);
|
||||
swarms[0].1.get_record(&target_key, Quorum::One);
|
||||
|
||||
block_on(
|
||||
poll_fn(move |ctx| {
|
||||
for swarm in &mut swarms {
|
||||
for (_, swarm) in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => {
|
||||
@ -361,7 +369,7 @@ fn put_record() {
|
||||
.collect::<HashMap<_,_>>();
|
||||
|
||||
for r in records.values() {
|
||||
swarms[0].put_record(r.clone(), Quorum::All);
|
||||
swarms[0].1.put_record(r.clone(), Quorum::All);
|
||||
}
|
||||
|
||||
// Each test run republishes all records once.
|
||||
@ -372,7 +380,7 @@ fn put_record() {
|
||||
block_on(
|
||||
poll_fn(move |ctx| loop {
|
||||
// Poll all swarms until they are "Pending".
|
||||
for swarm in &mut swarms {
|
||||
for (_, swarm) in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) |
|
||||
@ -422,7 +430,7 @@ fn put_record() {
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let actual = swarms.iter().enumerate().skip(1)
|
||||
.filter_map(|(i, s)|
|
||||
.filter_map(|(i, (_, s))|
|
||||
if s.store.get(key.preimage()).is_some() {
|
||||
Some(swarm_ids[i].clone())
|
||||
} else {
|
||||
@ -435,18 +443,18 @@ fn put_record() {
|
||||
}
|
||||
|
||||
if republished {
|
||||
assert_eq!(swarms[0].store.records().count(), records.len());
|
||||
assert_eq!(swarms[0].queries.size(), 0);
|
||||
assert_eq!(swarms[0].1.store.records().count(), records.len());
|
||||
assert_eq!(swarms[0].1.queries.size(), 0);
|
||||
for k in records.keys() {
|
||||
swarms[0].store.remove(&k);
|
||||
swarms[0].1.store.remove(&k);
|
||||
}
|
||||
assert_eq!(swarms[0].store.records().count(), 0);
|
||||
assert_eq!(swarms[0].1.store.records().count(), 0);
|
||||
// All records have been republished, thus the test is complete.
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
// Tell the replication job to republish asap.
|
||||
swarms[0].put_record_job.as_mut().unwrap().asap(true);
|
||||
swarms[0].1.put_record_job.as_mut().unwrap().asap(true);
|
||||
republished = true;
|
||||
})
|
||||
)
|
||||
@ -459,19 +467,21 @@ fn put_record() {
|
||||
fn get_value() {
|
||||
let (port_base, mut swarms) = build_nodes(3);
|
||||
|
||||
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
|
||||
let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect();
|
||||
|
||||
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
|
||||
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
|
||||
let public0 = swarms[0].0.public();
|
||||
swarms[0].1.add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into(), public0);
|
||||
let public1 = swarms[1].0.public();
|
||||
swarms[1].1.add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into(), public1);
|
||||
|
||||
let record = Record::new(random_multihash(), vec![4,5,6]);
|
||||
|
||||
swarms[1].store.put(record.clone()).unwrap();
|
||||
swarms[0].get_record(&record.key, Quorum::One);
|
||||
swarms[1].1.store.put(record.clone()).unwrap();
|
||||
swarms[0].1.get_record(&record.key, Quorum::One);
|
||||
|
||||
block_on(
|
||||
poll_fn(move |ctx| {
|
||||
for swarm in &mut swarms {
|
||||
for (_, swarm) in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
|
||||
@ -502,15 +512,15 @@ fn get_value_many() {
|
||||
let record = Record::new(random_multihash(), vec![4,5,6]);
|
||||
|
||||
for i in 0 .. num_nodes {
|
||||
swarms[i].store.put(record.clone()).unwrap();
|
||||
swarms[i].1.store.put(record.clone()).unwrap();
|
||||
}
|
||||
|
||||
let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap());
|
||||
swarms[0].get_record(&record.key, quorum);
|
||||
swarms[0].1.get_record(&record.key, quorum);
|
||||
|
||||
block_on(
|
||||
poll_fn(move |ctx| {
|
||||
for swarm in &mut swarms {
|
||||
for (_, swarm) in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => {
|
||||
@ -552,13 +562,13 @@ fn add_provider() {
|
||||
|
||||
// Initiate the first round of publishing.
|
||||
for k in &keys {
|
||||
swarms[0].start_providing(k.clone());
|
||||
swarms[0].1.start_providing(k.clone());
|
||||
}
|
||||
|
||||
block_on(
|
||||
poll_fn(move |ctx| loop {
|
||||
// Poll all swarms until they are "Pending".
|
||||
for swarm in &mut swarms {
|
||||
for (_, swarm) in &mut swarms {
|
||||
loop {
|
||||
match swarm.poll_next_unpin(ctx) {
|
||||
Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) |
|
||||
@ -595,7 +605,7 @@ fn add_provider() {
|
||||
while let Some(key) = results.pop() {
|
||||
// Collect the nodes that have a provider record for `key`.
|
||||
let actual = swarms.iter().enumerate().skip(1)
|
||||
.filter_map(|(i, s)|
|
||||
.filter_map(|(i, (_, s))|
|
||||
if s.store.providers(&key).len() == 1 {
|
||||
Some(swarm_ids[i].clone())
|
||||
} else {
|
||||
@ -625,23 +635,23 @@ fn add_provider() {
|
||||
|
||||
// One round of publishing is complete.
|
||||
assert!(results.is_empty());
|
||||
for s in &swarms {
|
||||
for (_, s) in &swarms {
|
||||
assert_eq!(s.queries.size(), 0);
|
||||
}
|
||||
|
||||
if republished {
|
||||
assert_eq!(swarms[0].store.provided().count(), keys.len());
|
||||
assert_eq!(swarms[0].1.store.provided().count(), keys.len());
|
||||
for k in &keys {
|
||||
swarms[0].stop_providing(&k);
|
||||
swarms[0].1.stop_providing(&k);
|
||||
}
|
||||
assert_eq!(swarms[0].store.provided().count(), 0);
|
||||
assert_eq!(swarms[0].1.store.provided().count(), 0);
|
||||
// All records have been republished, thus the test is complete.
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
// Initiate the second round of publishing by telling the
|
||||
// periodic provider job to run asap.
|
||||
swarms[0].add_provider_job.as_mut().unwrap().asap();
|
||||
swarms[0].1.add_provider_job.as_mut().unwrap().asap();
|
||||
published = false;
|
||||
republished = true;
|
||||
})
|
||||
@ -659,16 +669,16 @@ fn exceed_jobs_max_queries() {
|
||||
let (_, mut swarms) = build_nodes(1);
|
||||
let num = JOBS_MAX_QUERIES + 1;
|
||||
for _ in 0 .. num {
|
||||
swarms[0].bootstrap();
|
||||
swarms[0].1.bootstrap();
|
||||
}
|
||||
|
||||
assert_eq!(swarms[0].queries.size(), num);
|
||||
assert_eq!(swarms[0].1.queries.size(), num);
|
||||
|
||||
block_on(
|
||||
poll_fn(move |ctx| {
|
||||
for _ in 0 .. num {
|
||||
// There are no other nodes, so the queries finish instantly.
|
||||
if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) {
|
||||
if let Poll::Ready(Some(e)) = swarms[0].1.poll_next_unpin(ctx) {
|
||||
if let KademliaEvent::BootstrapResult(r) = e {
|
||||
assert!(r.is_ok(), "Unexpected error")
|
||||
} else {
|
||||
|
Loading…
x
Reference in New Issue
Block a user