More insight into Kademlia queries. (#1567)

* [libp2p-kad] Provide more insight and control into Kademlia queries.

More insight: The API allows iterating over the active queries and
inspecting their state and execution statistics.

More control: The API allows aborting queries prematurely
at any time.

To that end, API operations that initiate new queries return the query ID
and multi-phase queries such as `put_record` retain the query ID across all
phases, each phase being executed by a new (internal) query.

* Cleanup

* Cleanup

* Update examples and re-exports.

* Incorporate review feedback.

* Update CHANGELOG

* Update CHANGELOG

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Roman Borschel 2020-05-16 10:43:09 +02:00 committed by GitHub
parent c271f6f56b
commit 3a96ebf57f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 925 additions and 366 deletions

View File

@ -27,6 +27,14 @@
has no effect. has no effect.
[PR 1536](https://github.com/libp2p/rust-libp2p/pull/1536) [PR 1536](https://github.com/libp2p/rust-libp2p/pull/1536)
- `libp2p-kad`: Provide more insight into, and control of, the execution of
queries. All query results are now wrapped in `KademliaEvent::QueryResult`.
As a side-effect of these changes and for as long as the record storage
API is not asynchronous, local storage errors on `put_record` are reported
synchronously in a `Result`, instead of being reported asynchronously by
an event.
[PR 1567](https://github.com/libp2p/rust-libp2p/pull/1567)
- `libp2p-tcp`: On listeners started with an IPv6 multi-address the socket - `libp2p-tcp`: On listeners started with an IPv6 multi-address the socket
option `IPV6_V6ONLY` is set to true. Instead of relying on IPv4-mapped IPv6 option `IPV6_V6ONLY` is set to true. Instead of relying on IPv4-mapped IPv6
address support, two listeners can be started if IPv4 and IPv6 should both address support, two listeners can be started if IPv4 and IPv6 should both

View File

@ -32,7 +32,15 @@
use async_std::{io, task}; use async_std::{io, task};
use futures::prelude::*; use futures::prelude::*;
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{record::Key, Kademlia, KademliaEvent, PutRecordOk, Quorum, Record}; use libp2p::kad::{
record::Key,
Kademlia,
KademliaEvent,
PutRecordOk,
QueryResult,
Quorum,
Record
};
use libp2p::{ use libp2p::{
NetworkBehaviour, NetworkBehaviour,
PeerId, PeerId,
@ -76,8 +84,9 @@ fn main() -> Result<(), Box<dyn Error>> {
// Called when `kademlia` produces an event. // Called when `kademlia` produces an event.
fn inject_event(&mut self, message: KademliaEvent) { fn inject_event(&mut self, message: KademliaEvent) {
match message { match message {
KademliaEvent::GetRecordResult(Ok(result)) => { KademliaEvent::QueryResult { result, .. } => match result {
for Record { key, value, .. } in result.records { QueryResult::GetRecord(Ok(ok)) => {
for Record { key, value, .. } in ok.records {
println!( println!(
"Got record {:?} {:?}", "Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(), std::str::from_utf8(key.as_ref()).unwrap(),
@ -85,20 +94,22 @@ fn main() -> Result<(), Box<dyn Error>> {
); );
} }
} }
KademliaEvent::GetRecordResult(Err(err)) => { QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {:?}", err); eprintln!("Failed to get record: {:?}", err);
} }
KademliaEvent::PutRecordResult(Ok(PutRecordOk { key })) => { QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
println!( println!(
"Successfully put record {:?}", "Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap() std::str::from_utf8(key.as_ref()).unwrap()
); );
} }
KademliaEvent::PutRecordResult(Err(err)) => { QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {:?}", err); eprintln!("Failed to put record: {:?}", err);
} }
_ => {} _ => {}
} }
_ => {}
}
} }
} }
@ -188,7 +199,7 @@ fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
publisher: None, publisher: None,
expires: None, expires: None,
}; };
kademlia.put_record(record, Quorum::One); kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");
} }
_ => { _ => {
eprintln!("expected GET or PUT"); eprintln!("expected GET or PUT");

View File

@ -30,7 +30,13 @@ use libp2p::{
identity, identity,
build_development_transport build_development_transport
}; };
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, GetClosestPeersError}; use libp2p::kad::{
Kademlia,
KademliaConfig,
KademliaEvent,
GetClosestPeersError,
QueryResult,
};
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use std::{env, error::Error, time::Duration}; use std::{env, error::Error, time::Duration};
@ -91,7 +97,10 @@ fn main() -> Result<(), Box<dyn Error>> {
task::block_on(async move { task::block_on(async move {
loop { loop {
let event = swarm.next().await; let event = swarm.next().await;
if let KademliaEvent::GetClosestPeersResult(result) = event { if let KademliaEvent::QueryResult {
result: QueryResult::GetClosestPeers(result),
..
} = event {
match result { match result {
Ok(ok) => Ok(ok) =>
if !ok.peers.is_empty() { if !ok.peers.is_empty() {

File diff suppressed because it is too large Load Diff

View File

@ -148,10 +148,11 @@ fn bootstrap() {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect(); let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
swarms[0].bootstrap(); let qid = swarms[0].bootstrap().unwrap();
// Expected known peers // Expected known peers
let expected_known = swarm_ids.iter().skip(1).cloned().collect::<HashSet<_>>(); let expected_known = swarm_ids.iter().skip(1).cloned().collect::<HashSet<_>>();
let mut first = true;
// Run test // Run test
block_on( block_on(
@ -159,15 +160,24 @@ fn bootstrap() {
for (i, swarm) in swarms.iter_mut().enumerate() { for (i, swarm) in swarms.iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::BootstrapResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::Bootstrap(Ok(ok)), ..
})) => {
assert_eq!(id, qid);
assert_eq!(i, 0); assert_eq!(i, 0);
if first {
// Bootstrapping must start with a self-lookup.
assert_eq!(ok.peer, swarm_ids[0]); assert_eq!(ok.peer, swarm_ids[0]);
}
first = false;
if ok.num_remaining == 0 {
let known = swarm.kbuckets.iter() let known = swarm.kbuckets.iter()
.map(|e| e.node.key.preimage().clone()) .map(|e| e.node.key.preimage().clone())
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
assert_eq!(expected_known, known); assert_eq!(expected_known, known);
return Poll::Ready(()) return Poll::Ready(())
} }
}
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(_)) => (), Poll::Ready(Some(_)) => (),
e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e), e @ Poll::Ready(_) => panic!("Unexpected return value: {:?}", e),
@ -206,7 +216,17 @@ fn query_iter() {
// propagate forwards through the list of peers. // propagate forwards through the list of peers.
let search_target = PeerId::random(); let search_target = PeerId::random();
let search_target_key = kbucket::Key::new(search_target.clone()); let search_target_key = kbucket::Key::new(search_target.clone());
swarms[0].get_closest_peers(search_target.clone()); let qid = swarms[0].get_closest_peers(search_target.clone());
match swarms[0].query(&qid) {
Some(q) => match q.info() {
QueryInfo::GetClosestPeers { key } => {
assert_eq!(&key[..], search_target.borrow() as &[u8])
},
i => panic!("Unexpected query info: {:?}", i)
}
None => panic!("Query not found: {:?}", qid)
}
// Set up expectations. // Set up expectations.
let expected_swarm_id = swarm_ids[0].clone(); let expected_swarm_id = swarm_ids[0].clone();
@ -220,7 +240,10 @@ fn query_iter() {
for (i, swarm) in swarms.iter_mut().enumerate() { for (i, swarm) in swarms.iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::GetClosestPeers(Ok(ok)), ..
})) => {
assert_eq!(id, qid);
assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(swarm_ids[i], expected_swarm_id); assert_eq!(swarm_ids[i], expected_swarm_id);
assert_eq!(swarm.queries.size(), 0); assert_eq!(swarm.queries.size(), 0);
@ -270,7 +293,9 @@ fn unresponsive_not_returned_direct() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::QueryResult {
result: QueryResult::GetClosestPeers(Ok(ok)), ..
})) => {
assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 0); assert_eq!(ok.peers.len(), 0);
return Poll::Ready(()); return Poll::Ready(());
@ -318,7 +343,9 @@ fn unresponsive_not_returned_indirect() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::QueryResult {
result: QueryResult::GetClosestPeers(Ok(ok)), ..
})) => {
assert_eq!(&ok.key[..], search_target.as_bytes()); assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 1); assert_eq!(ok.peers.len(), 1);
assert_eq!(ok.peers[0], first_peer_id); assert_eq!(ok.peers[0], first_peer_id);
@ -354,14 +381,17 @@ fn get_record_not_found() {
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>(); let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();
let target_key = record::Key::from(random_multihash()); let target_key = record::Key::from(random_multihash());
swarms[0].get_record(&target_key, Quorum::One); let qid = swarms[0].get_record(&target_key, Quorum::One);
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Err(e)))) => { Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::GetRecord(Err(e)), ..
})) => {
assert_eq!(id, qid);
if let GetRecordError::NotFound { key, closest_peers, } = e { if let GetRecordError::NotFound { key, closest_peers, } = e {
assert_eq!(key, target_key); assert_eq!(key, target_key);
assert_eq!(closest_peers.len(), 2); assert_eq!(closest_peers.len(), 2);
@ -426,8 +456,23 @@ fn put_record() {
}) })
.collect::<HashMap<_,_>>(); .collect::<HashMap<_,_>>();
// Initiate put_record queries.
let mut qids = HashSet::new();
for r in records.values() { for r in records.values() {
swarms[0].put_record(r.clone(), Quorum::All); let qid = swarms[0].put_record(r.clone(), Quorum::All).unwrap();
match swarms[0].query(&qid) {
Some(q) => match q.info() {
QueryInfo::PutRecord { phase, record, .. } => {
assert_eq!(phase, &PutRecordPhase::GetClosestPeers);
assert_eq!(record.key, r.key);
assert_eq!(record.value, r.value);
assert!(record.expires.is_some());
qids.insert(qid);
},
i => panic!("Unexpected query info: {:?}", i)
}
None => panic!("Query not found: {:?}", qid)
}
} }
// Each test run republishes all records once. // Each test run republishes all records once.
@ -441,8 +486,17 @@ fn put_record() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) | Poll::Ready(Some(KademliaEvent::QueryResult {
Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => { id, result: QueryResult::PutRecord(res), stats
})) |
Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::RepublishRecord(res), stats
})) => {
assert!(qids.is_empty() || qids.remove(&id));
assert!(stats.duration().is_some());
assert!(stats.num_successes() >= replication_factor.get() as u32);
assert!(stats.num_requests() >= stats.num_successes());
assert_eq!(stats.num_failures(), 0);
match res { match res {
Err(e) => panic!("{:?}", e), Err(e) => panic!("{:?}", e),
Ok(ok) => { Ok(ok) => {
@ -541,7 +595,7 @@ fn put_record() {
} }
#[test] #[test]
fn get_value() { fn get_record() {
let mut swarms = build_nodes(3); let mut swarms = build_nodes(3);
// Let first peer know of second peer and second peer know of third peer. // Let first peer know of second peer and second peer know of third peer.
@ -556,14 +610,17 @@ fn get_value() {
let record = Record::new(random_multihash(), vec![4,5,6]); let record = Record::new(random_multihash(), vec![4,5,6]);
swarms[1].store.put(record.clone()).unwrap(); swarms[1].store.put(record.clone()).unwrap();
swarms[0].get_record(&record.key, Quorum::One); let qid = swarms[0].get_record(&record.key, Quorum::One);
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::GetRecord(Ok(ok)), ..
})) => {
assert_eq!(id, qid);
assert_eq!(ok.records.len(), 1); assert_eq!(ok.records.len(), 1);
assert_eq!(ok.records.first(), Some(&record)); assert_eq!(ok.records.first(), Some(&record));
return Poll::Ready(()); return Poll::Ready(());
@ -582,7 +639,7 @@ fn get_value() {
} }
#[test] #[test]
fn get_value_many() { fn get_record_many() {
// TODO: Randomise // TODO: Randomise
let num_nodes = 12; let num_nodes = 12;
let mut swarms = build_connected_nodes(num_nodes, 3).into_iter() let mut swarms = build_connected_nodes(num_nodes, 3).into_iter()
@ -597,14 +654,17 @@ fn get_value_many() {
} }
let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap()); let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap());
swarms[0].get_record(&record.key, quorum); let qid = swarms[0].get_record(&record.key, quorum);
block_on( block_on(
poll_fn(move |ctx| { poll_fn(move |ctx| {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::GetRecordResult(Ok(ok)))) => { Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::GetRecord(Ok(ok)), ..
})) => {
assert_eq!(id, qid);
assert_eq!(ok.records.len(), num_results); assert_eq!(ok.records.len(), num_results);
assert_eq!(ok.records.first(), Some(&record)); assert_eq!(ok.records.first(), Some(&record));
return Poll::Ready(()); return Poll::Ready(());
@ -661,8 +721,10 @@ fn add_provider() {
let mut results = Vec::new(); let mut results = Vec::new();
// Initiate the first round of publishing. // Initiate the first round of publishing.
let mut qids = HashSet::new();
for k in &keys { for k in &keys {
swarms[0].start_providing(k.clone()); let qid = swarms[0].start_providing(k.clone()).unwrap();
qids.insert(qid);
} }
block_on( block_on(
@ -671,8 +733,13 @@ fn add_provider() {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::StartProvidingResult(res))) | Poll::Ready(Some(KademliaEvent::QueryResult {
Poll::Ready(Some(KademliaEvent::RepublishProviderResult(res))) => { id, result: QueryResult::StartProviding(res), ..
})) |
Poll::Ready(Some(KademliaEvent::QueryResult {
id, result: QueryResult::RepublishProvider(res), ..
})) => {
assert!(qids.is_empty() || qids.remove(&id));
match res { match res {
Err(e) => panic!(e), Err(e) => panic!(e),
Ok(ok) => { Ok(ok) => {
@ -773,7 +840,7 @@ fn exceed_jobs_max_queries() {
let (_addr, mut swarm) = build_node(); let (_addr, mut swarm) = build_node();
let num = JOBS_MAX_QUERIES + 1; let num = JOBS_MAX_QUERIES + 1;
for _ in 0 .. num { for _ in 0 .. num {
swarm.bootstrap(); swarm.get_closest_peers(PeerId::random());
} }
assert_eq!(swarm.queries.size(), num); assert_eq!(swarm.queries.size(), num);
@ -783,8 +850,10 @@ fn exceed_jobs_max_queries() {
for _ in 0 .. num { for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly. // There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) { if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
if let KademliaEvent::BootstrapResult(r) = e { if let KademliaEvent::QueryResult {
assert!(r.is_ok(), "Unexpected error") result: QueryResult::GetClosestPeers(Ok(r)), ..
} = e {
assert!(r.peers.is_empty())
} else { } else {
panic!("Unexpected event: {:?}", e) panic!("Unexpected event: {:?}", e)
} }

View File

@ -41,6 +41,10 @@ mod dht_proto {
pub use addresses::Addresses; pub use addresses::Addresses;
pub use behaviour::{Kademlia, KademliaConfig, KademliaEvent, Quorum}; pub use behaviour::{Kademlia, KademliaConfig, KademliaEvent, Quorum};
pub use behaviour::{ pub use behaviour::{
QueryResult,
QueryInfo,
QueryStats,
BootstrapResult, BootstrapResult,
BootstrapOk, BootstrapOk,
BootstrapError, BootstrapError,

View File

@ -91,13 +91,38 @@ impl<TInner> QueryPool<TInner> {
where where
I: IntoIterator<Item = PeerId> I: IntoIterator<Item = PeerId>
{ {
let id = self.next_query_id();
self.continue_fixed(id, peers, inner);
id
}
/// Continues an earlier query with a fixed set of peers, reusing
/// the given query ID, which must be from a query that finished
/// earlier.
pub fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
where
I: IntoIterator<Item = PeerId>
{
assert!(!self.queries.contains_key(&id));
let parallelism = self.config.replication_factor.get(); let parallelism = self.config.replication_factor.get();
let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism)); let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
self.add(peer_iter, inner) let query = Query::new(id, peer_iter, inner);
self.queries.insert(id, query);
} }
/// Adds a query to the pool that iterates towards the closest peers to the target. /// Adds a query to the pool that iterates towards the closest peers to the target.
pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
where
T: Into<KeyBytes>,
I: IntoIterator<Item = Key<PeerId>>
{
let id = self.next_query_id();
self.continue_iter_closest(id, target, peers, inner);
id
}
/// Adds a query to the pool that iterates towards the closest peers to the target.
pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner)
where where
T: Into<KeyBytes>, T: Into<KeyBytes>,
I: IntoIterator<Item = Key<PeerId>> I: IntoIterator<Item = Key<PeerId>>
@ -107,14 +132,13 @@ impl<TInner> QueryPool<TInner> {
.. ClosestPeersIterConfig::default() .. ClosestPeersIterConfig::default()
}; };
let peer_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers)); let peer_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers));
self.add(peer_iter, inner)
}
fn add(&mut self, peer_iter: QueryPeerIter, inner: TInner) -> QueryId {
let id = QueryId(self.next_id);
self.next_id = self.next_id.wrapping_add(1);
let query = Query::new(id, peer_iter, inner); let query = Query::new(id, peer_iter, inner);
self.queries.insert(id, query); self.queries.insert(id, query);
}
fn next_query_id(&mut self) -> QueryId {
let id = QueryId(self.next_id);
self.next_id = self.next_id.wrapping_add(1);
id id
} }
@ -135,7 +159,7 @@ impl<TInner> QueryPool<TInner> {
let mut waiting = None; let mut waiting = None;
for (&query_id, query) in self.queries.iter_mut() { for (&query_id, query) in self.queries.iter_mut() {
query.started = query.started.or(Some(now)); query.stats.start = query.stats.start.or(Some(now));
match query.next(now) { match query.next(now) {
PeersIterState::Finished => { PeersIterState::Finished => {
finished = Some(query_id); finished = Some(query_id);
@ -147,7 +171,7 @@ impl<TInner> QueryPool<TInner> {
break break
} }
PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => { PeersIterState::Waiting(None) | PeersIterState::WaitingAtCapacity => {
let elapsed = now - query.started.unwrap_or(now); let elapsed = now - query.stats.start.unwrap_or(now);
if elapsed >= self.config.timeout { if elapsed >= self.config.timeout {
timeout = Some(query_id); timeout = Some(query_id);
break break
@ -162,12 +186,14 @@ impl<TInner> QueryPool<TInner> {
} }
if let Some(query_id) = finished { if let Some(query_id) = finished {
let query = self.queries.remove(&query_id).expect("s.a."); let mut query = self.queries.remove(&query_id).expect("s.a.");
query.stats.end = Some(now);
return QueryPoolState::Finished(query) return QueryPoolState::Finished(query)
} }
if let Some(query_id) = timeout { if let Some(query_id) = timeout {
let query = self.queries.remove(&query_id).expect("s.a."); let mut query = self.queries.remove(&query_id).expect("s.a.");
query.stats.end = Some(now);
return QueryPoolState::Timeout(query) return QueryPoolState::Timeout(query)
} }
@ -205,9 +231,8 @@ pub struct Query<TInner> {
id: QueryId, id: QueryId,
/// The peer iterator that drives the query state. /// The peer iterator that drives the query state.
peer_iter: QueryPeerIter, peer_iter: QueryPeerIter,
/// The instant when the query started (i.e. began waiting for the first /// Execution statistics of the query.
/// result from a peer). stats: QueryStats,
started: Option<Instant>,
/// The opaque inner query state. /// The opaque inner query state.
pub inner: TInner, pub inner: TInner,
} }
@ -221,7 +246,7 @@ enum QueryPeerIter {
impl<TInner> Query<TInner> { impl<TInner> Query<TInner> {
/// Creates a new query without starting it. /// Creates a new query without starting it.
fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self { fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self {
Query { id, inner, peer_iter, started: None } Query { id, inner, peer_iter, stats: QueryStats::empty() }
} }
/// Gets the unique ID of the query. /// Gets the unique ID of the query.
@ -229,11 +254,19 @@ impl<TInner> Query<TInner> {
self.id self.id
} }
/// Gets the current execution statistics of the query.
pub fn stats(&self) -> &QueryStats {
&self.stats
}
/// Informs the query that the attempt to contact `peer` failed. /// Informs the query that the attempt to contact `peer` failed.
pub fn on_failure(&mut self, peer: &PeerId) { pub fn on_failure(&mut self, peer: &PeerId) {
match &mut self.peer_iter { let updated = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_failure(peer), QueryPeerIter::Closest(iter) => iter.on_failure(peer),
QueryPeerIter::Fixed(iter) => iter.on_failure(peer) QueryPeerIter::Fixed(iter) => iter.on_failure(peer)
};
if updated {
self.stats.failure += 1;
} }
} }
@ -244,9 +277,12 @@ impl<TInner> Query<TInner> {
where where
I: IntoIterator<Item = PeerId> I: IntoIterator<Item = PeerId>
{ {
match &mut self.peer_iter { let updated = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers), QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
QueryPeerIter::Fixed(iter) => iter.on_success(peer) QueryPeerIter::Fixed(iter) => iter.on_success(peer)
};
if updated {
self.stats.success += 1;
} }
} }
@ -260,10 +296,16 @@ impl<TInner> Query<TInner> {
/// Advances the state of the underlying peer iterator. /// Advances the state of the underlying peer iterator.
fn next(&mut self, now: Instant) -> PeersIterState { fn next(&mut self, now: Instant) -> PeersIterState {
match &mut self.peer_iter { let state = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.next(now), QueryPeerIter::Closest(iter) => iter.next(now),
QueryPeerIter::Fixed(iter) => iter.next() QueryPeerIter::Fixed(iter) => iter.next()
};
if let PeersIterState::Waiting(Some(_)) = state {
self.stats.requests += 1;
} }
state
} }
/// Finishes the query prematurely. /// Finishes the query prematurely.
@ -277,13 +319,24 @@ impl<TInner> Query<TInner> {
} }
} }
/// Checks whether the query has finished.
///
/// A finished query is eventually reported by `QueryPool::next()` and
/// removed from the pool.
pub fn is_finished(&self) -> bool {
match &self.peer_iter {
QueryPeerIter::Closest(iter) => iter.is_finished(),
QueryPeerIter::Fixed(iter) => iter.is_finished()
}
}
/// Consumes the query, producing the final `QueryResult`. /// Consumes the query, producing the final `QueryResult`.
pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> { pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
let peers = match self.peer_iter { let peers = match self.peer_iter {
QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()), QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()),
QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()) QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
}; };
QueryResult { inner: self.inner, peers } QueryResult { peers, inner: self.inner, stats: self.stats }
} }
} }
@ -292,5 +345,90 @@ pub struct QueryResult<TInner, TPeers> {
/// The opaque inner query state. /// The opaque inner query state.
pub inner: TInner, pub inner: TInner,
/// The successfully contacted peers. /// The successfully contacted peers.
pub peers: TPeers pub peers: TPeers,
/// The collected query statistics.
pub stats: QueryStats
}
/// Execution statistics of a query.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct QueryStats {
requests: u32,
success: u32,
failure: u32,
start: Option<Instant>,
end: Option<Instant>
}
impl QueryStats {
pub fn empty() -> Self {
QueryStats {
requests: 0,
success: 0,
failure: 0,
start: None,
end: None,
}
}
/// Gets the total number of requests initiated by the query.
pub fn num_requests(&self) -> u32 {
self.requests
}
/// Gets the number of successful requests.
pub fn num_successes(&self) -> u32 {
self.success
}
/// Gets the number of failed requests.
pub fn num_failures(&self) -> u32 {
self.failure
}
/// Gets the number of pending requests.
///
/// > **Note**: A query can finish while still having pending
/// > requests, if the termination conditions are already met.
pub fn num_pending(&self) -> u32 {
self.requests - (self.success + self.failure)
}
/// Gets the duration of the query.
///
/// If the query has not yet finished, the duration is measured from the
/// start of the query to the current instant.
///
/// If the query did not yet start (i.e. yield the first peer to contact),
/// `None` is returned.
pub fn duration(&self) -> Option<Duration> {
if let Some(s) = self.start {
if let Some(e) = self.end {
Some(e - s)
} else {
Some(Instant::now() - s)
}
} else {
None
}
}
/// Merges these stats with the given stats of another query,
/// e.g. to accumulate statistics from a multi-phase query.
///
/// Counters are merged cumulatively while the instants for
/// start and end of the queries are taken as the minimum and
/// maximum, respectively.
pub fn merge(self, other: QueryStats) -> Self {
QueryStats {
requests: self.requests + other.requests,
success: self.success + other.success,
failure: self.failure + other.failure,
start: match (self.start, other.start) {
(Some(a), Some(b)) => Some(std::cmp::min(a, b)),
(a, b) => a.or(b)
},
end: std::cmp::max(self.end, other.end)
}
}
} }

View File

@ -122,8 +122,7 @@ impl ClosestPeersIter {
} }
} }
/// Callback for delivering the result of a successful request to a peer /// Callback for delivering the result of a successful request to a peer.
/// that the iterator is waiting on.
/// ///
/// Delivering results of requests back to the iterator allows the iterator to make /// Delivering results of requests back to the iterator allows the iterator to make
/// progress. The iterator is said to make progress either when the given /// progress. The iterator is said to make progress either when the given
@ -131,18 +130,20 @@ impl ClosestPeersIter {
/// or when the iterator did not yet accumulate `num_results` closest peers and /// or when the iterator did not yet accumulate `num_results` closest peers and
/// `closer_peers` contains a new peer, regardless of its distance to the target. /// `closer_peers` contains a new peer, regardless of its distance to the target.
/// ///
/// After calling this function, `next` should eventually be called again /// If the iterator is currently waiting for a result from `peer`,
/// to advance the state of the iterator. /// the iterator state is updated and `true` is returned. In that
/// case, after calling this function, `next` should eventually be
/// called again to obtain the new state of the iterator.
/// ///
/// If the iterator is finished, it is not currently waiting for a /// If the iterator is finished, it is not currently waiting for a
/// result from `peer`, or a result for `peer` has already been reported, /// result from `peer`, or a result for `peer` has already been reported,
/// calling this function has no effect. /// calling this function has no effect and `false` is returned.
pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
where where
I: IntoIterator<Item = PeerId> I: IntoIterator<Item = PeerId>
{ {
if let State::Finished = self.state { if let State::Finished = self.state {
return return false
} }
let key = Key::from(peer.clone()); let key = Key::from(peer.clone());
@ -150,7 +151,7 @@ impl ClosestPeersIter {
// Mark the peer as succeeded. // Mark the peer as succeeded.
match self.closest_peers.entry(distance) { match self.closest_peers.entry(distance) {
Entry::Vacant(..) => return, Entry::Vacant(..) => return false,
Entry::Occupied(mut e) => match e.get().state { Entry::Occupied(mut e) => match e.get().state {
PeerState::Waiting(..) => { PeerState::Waiting(..) => {
debug_assert!(self.num_waiting > 0); debug_assert!(self.num_waiting > 0);
@ -162,7 +163,7 @@ impl ClosestPeersIter {
} }
PeerState::NotContacted PeerState::NotContacted
| PeerState::Failed | PeerState::Failed
| PeerState::Succeeded => return | PeerState::Succeeded => return false
} }
} }
@ -199,28 +200,31 @@ impl ClosestPeersIter {
State::Stalled State::Stalled
} }
State::Finished => State::Finished State::Finished => State::Finished
} };
true
} }
/// Callback for informing the iterator about a failed request to a peer /// Callback for informing the iterator about a failed request to a peer.
/// that the iterator is waiting on.
/// ///
/// After calling this function, `next` should eventually be called again /// If the iterator is currently waiting for a result from `peer`,
/// to advance the state of the iterator. /// the iterator state is updated and `true` is returned. In that
/// case, after calling this function, `next` should eventually be
/// called again to obtain the new state of the iterator.
/// ///
/// If the iterator is finished, it is not currently waiting for a /// If the iterator is finished, it is not currently waiting for a
/// result from `peer`, or a result for `peer` has already been reported, /// result from `peer`, or a result for `peer` has already been reported,
/// calling this function has no effect. /// calling this function has no effect and `false` is returned.
pub fn on_failure(&mut self, peer: &PeerId) { pub fn on_failure(&mut self, peer: &PeerId) -> bool {
if let State::Finished = self.state { if let State::Finished = self.state {
return return false
} }
let key = Key::from(peer.clone()); let key = Key::from(peer.clone());
let distance = key.distance(&self.target); let distance = key.distance(&self.target);
match self.closest_peers.entry(distance) { match self.closest_peers.entry(distance) {
Entry::Vacant(_) => return, Entry::Vacant(_) => return false,
Entry::Occupied(mut e) => match e.get().state { Entry::Occupied(mut e) => match e.get().state {
PeerState::Waiting(_) => { PeerState::Waiting(_) => {
debug_assert!(self.num_waiting > 0); debug_assert!(self.num_waiting > 0);
@ -230,9 +234,13 @@ impl ClosestPeersIter {
PeerState::Unresponsive => { PeerState::Unresponsive => {
e.get_mut().state = PeerState::Failed e.get_mut().state = PeerState::Failed
} }
_ => {} PeerState::NotContacted
| PeerState::Failed
| PeerState::Succeeded => return false
} }
} }
true
} }
/// Returns the list of peers for which the iterator is currently waiting /// Returns the list of peers for which the iterator is currently waiting
@ -343,7 +351,7 @@ impl ClosestPeersIter {
} }
/// Checks whether the iterator has finished. /// Checks whether the iterator has finished.
pub fn finished(&self) -> bool { pub fn is_finished(&self) -> bool {
self.state == State::Finished self.state == State::Finished
} }
@ -649,7 +657,7 @@ mod tests {
match iter.next(now) { match iter.next(now) {
PeersIterState::Waiting(Some(p)) => { PeersIterState::Waiting(Some(p)) => {
let peer2 = p.into_owned(); let peer2 = p.into_owned();
iter.on_success(&peer2, closer.clone()) assert!(iter.on_success(&peer2, closer.clone()))
} }
PeersIterState::Finished => {} PeersIterState::Finished => {}
_ => panic!("Unexpectedly iter state."), _ => panic!("Unexpectedly iter state."),
@ -689,7 +697,7 @@ mod tests {
Peer { state, .. } => panic!("Unexpected peer state: {:?}", state) Peer { state, .. } => panic!("Unexpected peer state: {:?}", state)
} }
let finished = iter.finished(); let finished = iter.is_finished();
iter.on_success(&peer, iter::empty()); iter.on_success(&peer, iter::empty());
let closest = iter.into_result().collect::<Vec<_>>(); let closest = iter.into_result().collect::<Vec<_>>();

View File

@ -39,6 +39,7 @@ pub struct FixedPeersIter {
state: State, state: State,
} }
#[derive(Debug, PartialEq, Eq)]
enum State { enum State {
Waiting { num_waiting: usize }, Waiting { num_waiting: usize },
Finished Finished
@ -71,22 +72,46 @@ impl FixedPeersIter {
} }
} }
pub fn on_success(&mut self, peer: &PeerId) { /// Callback for delivering the result of a successful request to a peer.
///
/// If the iterator is currently waiting for a result from `peer`,
/// the iterator state is updated and `true` is returned. In that
/// case, after calling this function, `next` should eventually be
/// called again to obtain the new state of the iterator.
///
/// If the iterator is finished, it is not currently waiting for a
/// result from `peer`, or a result for `peer` has already been reported,
/// calling this function has no effect and `false` is returned.
pub fn on_success(&mut self, peer: &PeerId) -> bool {
if let State::Waiting { num_waiting } = &mut self.state { if let State::Waiting { num_waiting } = &mut self.state {
if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) { if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) {
*state = PeerState::Succeeded; *state = PeerState::Succeeded;
*num_waiting -= 1; *num_waiting -= 1;
return true
} }
} }
false
} }
pub fn on_failure(&mut self, peer: &PeerId) { /// Callback for informing the iterator about a failed request to a peer.
///
/// If the iterator is currently waiting for a result from `peer`,
/// the iterator state is updated and `true` is returned. In that
/// case, after calling this function, `next` should eventually be
/// called again to obtain the new state of the iterator.
///
/// If the iterator is finished, it is not currently waiting for a
/// result from `peer`, or a result for `peer` has already been reported,
/// calling this function has no effect and `false` is returned.
pub fn on_failure(&mut self, peer: &PeerId) -> bool {
if let State::Waiting { num_waiting } = &mut self.state { if let State::Waiting { num_waiting } = &mut self.state {
if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) { if let Some(state @ PeerState::Waiting) = self.peers.get_mut(peer) {
*state = PeerState::Failed; *state = PeerState::Failed;
*num_waiting -= 1; *num_waiting -= 1;
return true
} }
} }
false
} }
pub fn is_waiting(&self, peer: &PeerId) -> bool { pub fn is_waiting(&self, peer: &PeerId) -> bool {
@ -99,6 +124,11 @@ impl FixedPeersIter {
} }
} }
/// Checks whether the iterator has finished.
pub fn is_finished(&self) -> bool {
self.state == State::Finished
}
pub fn next(&mut self) -> PeersIterState { pub fn next(&mut self) -> PeersIterState {
match &mut self.state { match &mut self.state {
State::Finished => return PeersIterState::Finished, State::Finished => return PeersIterState::Finished,