From 26f58d20a8e7d21da8c78ae32f0dea49d424c275 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 28 Nov 2019 16:12:02 +0100 Subject: [PATCH] protocols/kad: Fix tests + minor fix in mdns and noise (#1320) * misc/mdns: Fix missleading error message * protocols/noise: Remove unneeded tokio-io import * protocols/kad: Update tests to use stable futures --- misc/mdns/src/behaviour.rs | 2 +- protocols/kad/src/behaviour/test.rs | 216 ++++++++++++++++------------ protocols/kad/src/jobs.rs | 54 +++---- protocols/noise/Cargo.toml | 1 - 4 files changed, 150 insertions(+), 123 deletions(-) diff --git a/misc/mdns/src/behaviour.rs b/misc/mdns/src/behaviour.rs index 54237c8f..61da92b9 100644 --- a/misc/mdns/src/behaviour.rs +++ b/misc/mdns/src/behaviour.rs @@ -236,7 +236,7 @@ where } }, Poll::Pending => (), - Poll::Ready(Err(err)) => warn!("tokio timer has errored: {:?}", err), + Poll::Ready(Err(err)) => warn!("timer has errored: {:?}", err), } } diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 7786762d..2be81cdf 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -25,7 +25,11 @@ use super::*; use crate::K_VALUE; use crate::kbucket::Distance; use crate::record::store::MemoryStore; -use futures::future; +use futures::{ + prelude::*, + executor::block_on, + future::poll_fn, +}; use libp2p_core::{ PeerId, Transport, @@ -42,7 +46,6 @@ use libp2p_yamux as yamux; use quickcheck::*; use rand::{Rng, random, thread_rng}; use std::{collections::{HashSet, HashMap}, io, num::NonZeroUsize, u64}; -use tokio::runtime::current_thread; use multihash::{Multihash, Hash::SHA2256}; type TestSwarm = Swarm< @@ -120,27 +123,30 @@ fn bootstrap() { let expected_known = swarm_ids.iter().skip(1).cloned().collect::>(); // Run test - current_thread::run( - future::poll_fn(move || { + block_on( + poll_fn(move |ctx| { for (i, swarm) in swarms.iter_mut().enumerate() { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::BootstrapResult(Ok(ok))))) => { assert_eq!(i, 0); assert_eq!(ok.peer, swarm_ids[0]); let known = swarm.kbuckets.iter() .map(|e| e.node.key.preimage().clone()) .collect::>(); assert_eq!(expected_known, known); - return Ok(Async::Ready(())); + return Poll::Ready(()) } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } - Ok(Async::NotReady) - })) + Poll::Pending + }) + ) } let mut rng = thread_rng(); @@ -175,27 +181,30 @@ fn query_iter() { expected_distances.sort(); // Run test - current_thread::run( - future::poll_fn(move || { + block_on( + poll_fn(move |ctx| { for (i, swarm) in swarms.iter_mut().enumerate() { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => { assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(swarm_ids[i], expected_swarm_id); assert_eq!(swarm.queries.size(), 0); assert!(expected_peer_ids.iter().all(|p| ok.peers.contains(p))); let key = kbucket::Key::new(ok.key); assert_eq!(expected_distances, distances(&key, ok.peers)); - return Ok(Async::Ready(())); + return Poll::Ready(()); } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } - Ok(Async::NotReady) - })) + Poll::Pending + }) + ) } let mut rng = thread_rng(); @@ -220,24 +229,27 @@ fn unresponsive_not_returned_direct() { let search_target = PeerId::random(); swarms[0].get_closest_peers(search_target.clone()); - current_thread::run( - future::poll_fn(move || { + block_on( + poll_fn(move |ctx| { for swarm in &mut swarms { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => { assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(ok.peers.len(), 0); - return Ok(Async::Ready(())); + return Poll::Ready(()); } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } - Ok(Async::NotReady) - })) + Poll::Pending + }) + ) } #[test] @@ -261,25 +273,28 @@ fn unresponsive_not_returned_indirect() { let search_target = PeerId::random(); swarms[1].get_closest_peers(search_target.clone()); - current_thread::run( - future::poll_fn(move || { + block_on( + poll_fn(move |ctx| { for swarm in &mut swarms { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::GetClosestPeersResult(Ok(ok))))) => { assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(ok.peers.len(), 1); assert_eq!(ok.peers[0], first_peer_id); - return Ok(Async::Ready(())); + return Poll::Ready(()); } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } - Ok(Async::NotReady) - })) + Poll::Pending + }) + ) } #[test] @@ -294,30 +309,33 @@ fn get_record_not_found() { let target_key = record::Key::from(Multihash::random(SHA2256)); swarms[0].get_record(&target_key, Quorum::One); - current_thread::run( - future::poll_fn(move || { + block_on( + poll_fn(move |ctx| { for swarm in &mut swarms { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Err(e))))) => { if let GetRecordError::NotFound { key, closest_peers, } = e { assert_eq!(key, target_key); assert_eq!(closest_peers.len(), 2); assert!(closest_peers.contains(&swarm_ids[1])); assert!(closest_peers.contains(&swarm_ids[2])); - return Ok(Async::Ready(())); + return Poll::Ready(()); } else { panic!("Unexpected error result: {:?}", e); } } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } - Ok(Async::NotReady) - })) + Poll::Pending + }) + ) } #[test] @@ -351,14 +369,14 @@ fn put_record() { // The accumulated results for one round of publishing. let mut results = Vec::new(); - current_thread::run( - future::poll_fn(move || loop { - // Poll all swarms until they are "NotReady". + block_on( + poll_fn(move |ctx| loop { + // Poll all swarms until they are "Pending". for swarm in &mut swarms { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::PutRecordResult(res))) | - Async::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::PutRecordResult(res)))) | + Poll::Ready(Some(Ok(KademliaEvent::RepublishRecordResult(res)))) => { match res { Err(e) => panic!(e), Ok(ok) => { @@ -368,16 +386,18 @@ fn put_record() { } } } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } - // All swarms are NotReady and not enough results have been collected + // All swarms are Pending and not enough results have been collected // so far, thus wait to be polled again for further progress. if results.len() != records.len() { - return Ok(Async::NotReady) + return Poll::Pending } // Consume the results, checking that each record was replicated @@ -422,7 +442,7 @@ fn put_record() { } assert_eq!(swarms[0].store.records().count(), 0); // All records have been republished, thus the test is complete. - return Ok(Async::Ready(())); + return Poll::Ready(()); } // Tell the replication job to republish asap. @@ -449,24 +469,27 @@ fn get_value() { swarms[1].store.put(record.clone()).unwrap(); swarms[0].get_record(&record.key, Quorum::One); - current_thread::run( - future::poll_fn(move || { + block_on( + poll_fn(move |ctx| { for swarm in &mut swarms { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Ok(ok))))) => { assert_eq!(ok.records.len(), 1); assert_eq!(ok.records.first(), Some(&record)); - return Ok(Async::Ready(())); + return Poll::Ready(()); } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } - Ok(Async::NotReady) - })) + Poll::Pending + }) + ) } #[test] @@ -485,23 +508,26 @@ fn get_value_many() { let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap()); swarms[0].get_record(&record.key, quorum); - current_thread::run( - future::poll_fn(move || { + block_on( + poll_fn(move |ctx| { for swarm in &mut swarms { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::GetRecordResult(Ok(ok))))) => { assert_eq!(ok.records.len(), num_results); assert_eq!(ok.records.first(), Some(&record)); - return Ok(Async::Ready(())); + return Poll::Ready(()); } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } - Ok(Async::NotReady) - })) + Poll::Pending + }) + ) } #[test] @@ -529,14 +555,14 @@ fn add_provider() { swarms[0].start_providing(k.clone()); } - current_thread::run( - future::poll_fn(move || loop { - // Poll all swarms until they are "NotReady". + block_on( + poll_fn(move |ctx| loop { + // Poll all swarms until they are "Pending". for swarm in &mut swarms { loop { - match swarm.poll().unwrap() { - Async::Ready(Some(KademliaEvent::StartProvidingResult(res))) | - Async::Ready(Some(KademliaEvent::RepublishProviderResult(res))) => { + match swarm.poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(KademliaEvent::StartProvidingResult(res)))) | + Poll::Ready(Some(Ok(KademliaEvent::RepublishProviderResult(res)))) => { match res { Err(e) => panic!(e), Ok(ok) => { @@ -545,8 +571,10 @@ fn add_provider() { } } } - Async::Ready(_) => (), - Async::NotReady => break, + // Ignore any other event. + Poll::Ready(Some(Ok(_))) => (), + e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), + Poll::Pending => break, } } } @@ -559,7 +587,7 @@ fn add_provider() { if !published { // Still waiting for all requests to be sent for one round // of publishing. - return Ok(Async::NotReady) + return Poll::Pending } // A round of publishing is complete. Consume the results, checking that @@ -578,7 +606,7 @@ fn add_provider() { if actual.len() != replication_factor.get() { // Still waiting for some nodes to process the request. results.push(key); - return Ok(Async::NotReady) + return Poll::Pending } let mut expected = swarm_ids.clone().split_off(1); @@ -608,7 +636,7 @@ fn add_provider() { } assert_eq!(swarms[0].store.provided().count(), 0); // All records have been republished, thus the test is complete. - return Ok(Async::Ready(())); + return Poll::Ready(()); } // Initiate the second round of publishing by telling the @@ -636,12 +664,12 @@ fn exceed_jobs_max_queries() { assert_eq!(swarms[0].queries.size(), num); - current_thread::run( - future::poll_fn(move || { + block_on( + poll_fn(move |ctx| { for _ in 0 .. num { // There are no other nodes, so the queries finish instantly. - if let Ok(Async::Ready(Some(e))) = swarms[0].poll() { - if let KademliaEvent::BootstrapResult(r) = e { + if let Poll::Ready(Some(e)) = swarms[0].poll_next_unpin(ctx) { + if let Ok(KademliaEvent::BootstrapResult(r)) = e { assert!(r.is_ok(), "Unexpected error") } else { panic!("Unexpected event: {:?}", e) @@ -650,7 +678,7 @@ fn exceed_jobs_max_queries() { panic!("Expected event") } } - Ok(Async::Ready(())) - })) + Poll::Ready(()) + }) + ) } - diff --git a/protocols/kad/src/jobs.rs b/protocols/kad/src/jobs.rs index 6d9ed399..9f5f8c67 100644 --- a/protocols/kad/src/jobs.rs +++ b/protocols/kad/src/jobs.rs @@ -326,6 +326,7 @@ impl AddProviderJob { #[cfg(test)] mod tests { use crate::record::store::MemoryStore; + use futures::{executor::block_on, future::poll_fn}; use quickcheck::*; use rand::Rng; use super::*; @@ -362,20 +363,20 @@ mod tests { for r in records { let _ = store.put(r); } - // Polling with an instant beyond the deadline for the next run - // is guaranteed to run the job, without the job needing to poll the `Delay` - // and thus without needing to run `poll` in the context of a task - // for testing purposes. - let now = Instant::now() + job.inner.interval; - // All (non-expired) records in the store must be yielded by the job. - for r in store.records().map(|r| r.into_owned()).collect::>() { - if !r.is_expired(now) { - assert_eq!(job.poll(&mut store, now), Poll::Ready(r)); - assert!(job.is_running()); + + block_on(poll_fn(|ctx| { + let now = Instant::now() + job.inner.interval; + // All (non-expired) records in the store must be yielded by the job. + for r in store.records().map(|r| r.into_owned()).collect::>() { + if !r.is_expired(now) { + assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r)); + assert!(job.is_running()); + } } - } - assert_eq!(job.poll(&mut store, now), Poll::Pending); - assert!(!job.is_running()); + assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending); + assert!(!job.is_running()); + Poll::Ready(()) + })); } quickcheck(prop as fn(_)) @@ -392,23 +393,22 @@ mod tests { r.provider = id.clone(); let _ = store.add_provider(r); } - // Polling with an instant beyond the deadline for the next run - // is guaranteed to run the job, without the job needing to poll the `Delay` - // and thus without needing to run `poll` in the context of a task - // for testing purposes. - let now = Instant::now() + job.inner.interval; - // All (non-expired) records in the store must be yielded by the job. - for r in store.provided().map(|r| r.into_owned()).collect::>() { - if !r.is_expired(now) { - assert_eq!(job.poll(&mut store, now), Poll::Ready(r)); - assert!(job.is_running()); + + block_on(poll_fn(|ctx| { + let now = Instant::now() + job.inner.interval; + // All (non-expired) records in the store must be yielded by the job. + for r in store.provided().map(|r| r.into_owned()).collect::>() { + if !r.is_expired(now) { + assert_eq!(job.poll(ctx, &mut store, now), Poll::Ready(r)); + assert!(job.is_running()); + } } - } - assert_eq!(job.poll(&mut store, now), Poll::Pending); - assert!(!job.is_running()); + assert_eq!(job.poll(ctx, &mut store, now), Poll::Pending); + assert!(!job.is_running()); + Poll::Ready(()) + })); } quickcheck(prop as fn(_)) } } - diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index 0d59bf34..cc236368 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -18,7 +18,6 @@ protobuf = "2.8" rand = "^0.7.2" ring = { version = "0.16.9", features = ["alloc"], default-features = false } snow = { version = "0.6.1", features = ["ring-resolver"], default-features = false } -tokio-io = "0.1" x25519-dalek = "0.5" zeroize = "1"