mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-31 19:51:20 +00:00
Closest iterators: lots of logs
This commit is contained in:
parent
6ef8f2c299
commit
0a4cd14efa
@ -76,7 +76,7 @@ mod weighted;
|
|||||||
pub use entry::*;
|
pub use entry::*;
|
||||||
pub use sub_bucket::*;
|
pub use sub_bucket::*;
|
||||||
|
|
||||||
use arrayvec::{self, ArrayVec};
|
use arrayvec::self;
|
||||||
use bucket::KBucket;
|
use bucket::KBucket;
|
||||||
use libp2p_core::identity::ed25519::{Keypair, PublicKey};
|
use libp2p_core::identity::ed25519::{Keypair, PublicKey};
|
||||||
use std::collections::{VecDeque};
|
use std::collections::{VecDeque};
|
||||||
@ -241,7 +241,7 @@ where
|
|||||||
iter: None,
|
iter: None,
|
||||||
table: self,
|
table: self,
|
||||||
buckets_iter: ClosestBucketsIter::new(distance),
|
buckets_iter: ClosestBucketsIter::new(distance),
|
||||||
fmap: |b: &KBucket<TKey, _>| -> ArrayVec<_> {
|
fmap: |b: &KBucket<TKey, _>| -> Vec<_> {
|
||||||
b.iter().map(|(n, _)| n.key.clone()).collect()
|
b.iter().map(|(n, _)| n.key.clone()).collect()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -263,7 +263,7 @@ where
|
|||||||
iter: None,
|
iter: None,
|
||||||
table: self,
|
table: self,
|
||||||
buckets_iter: ClosestBucketsIter::new(distance),
|
buckets_iter: ClosestBucketsIter::new(distance),
|
||||||
fmap: |b: &KBucket<_, TVal>| -> ArrayVec<_> {
|
fmap: |b: &KBucket<_, TVal>| -> Vec<_> {
|
||||||
b.iter()
|
b.iter()
|
||||||
.map(|(n, status)| EntryView {
|
.map(|(n, status)| EntryView {
|
||||||
node: n.clone(),
|
node: n.clone(),
|
||||||
@ -292,7 +292,14 @@ where
|
|||||||
.filter(|(n, _)| n.key.as_ref().distance(&local_key) <= distance)
|
.filter(|(n, _)| n.key.as_ref().distance(&local_key) <= distance)
|
||||||
.count();
|
.count();
|
||||||
let num_rest: usize = iter.map(|i| self.buckets[i.get()].num_entries()).sum();
|
let num_rest: usize = iter.map(|i| self.buckets[i.get()].num_entries()).sum();
|
||||||
num_first + num_rest
|
let result = num_first + num_rest;
|
||||||
|
println!(
|
||||||
|
"There are {} nodes between local {} and remote {}",
|
||||||
|
result,
|
||||||
|
bs58::encode(local_key.as_ref()).into_string(),
|
||||||
|
bs58::encode(target.as_ref()).into_string()
|
||||||
|
);
|
||||||
|
result
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
}
|
}
|
||||||
@ -313,7 +320,7 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
|
|||||||
/// distance of the local key to the target.
|
/// distance of the local key to the target.
|
||||||
buckets_iter: ClosestBucketsIter,
|
buckets_iter: ClosestBucketsIter,
|
||||||
/// The iterator over the entries in the currently traversed bucket.
|
/// The iterator over the entries in the currently traversed bucket.
|
||||||
iter: Option<arrayvec::IntoIter<[TOut; K_VALUE.get()]>>,
|
iter: Option<std::vec::IntoIter<TOut>>,
|
||||||
/// The projection function / mapping applied on each bucket as
|
/// The projection function / mapping applied on each bucket as
|
||||||
/// it is encountered, producing the next `iter`ator.
|
/// it is encountered, producing the next `iter`ator.
|
||||||
fmap: TMap,
|
fmap: TMap,
|
||||||
@ -386,14 +393,26 @@ impl Iterator for ClosestBucketsIter {
|
|||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
match self.state {
|
match self.state {
|
||||||
ClosestBucketsIterState::Start(i) => {
|
ClosestBucketsIterState::Start(i) => {
|
||||||
|
println!(
|
||||||
|
"ClosestBucketsIter: distance = {}; Start({}) -> ZoomIn({})",
|
||||||
|
self.distance.0, i.0, i.0
|
||||||
|
);
|
||||||
self.state = ClosestBucketsIterState::ZoomIn(i);
|
self.state = ClosestBucketsIterState::ZoomIn(i);
|
||||||
Some(i)
|
Some(i)
|
||||||
}
|
}
|
||||||
ClosestBucketsIterState::ZoomIn(i) => {
|
ClosestBucketsIterState::ZoomIn(i) => {
|
||||||
if let Some(i) = self.next_in(i) {
|
if let Some(i) = self.next_in(i) {
|
||||||
|
println!(
|
||||||
|
"ClosestBucketsIter: distance = {}; ZoomIn({}) -> ZoomIn({})",
|
||||||
|
self.distance.0, i.0, i.0
|
||||||
|
);
|
||||||
self.state = ClosestBucketsIterState::ZoomIn(i);
|
self.state = ClosestBucketsIterState::ZoomIn(i);
|
||||||
Some(i)
|
Some(i)
|
||||||
} else {
|
} else {
|
||||||
|
println!(
|
||||||
|
"ClosestBucketsIter: distance = {}; ZoomIn({}) -> ZoomOut(0)",
|
||||||
|
self.distance.0, i.0
|
||||||
|
);
|
||||||
let i = BucketIndex(0);
|
let i = BucketIndex(0);
|
||||||
self.state = ClosestBucketsIterState::ZoomOut(i);
|
self.state = ClosestBucketsIterState::ZoomOut(i);
|
||||||
Some(i)
|
Some(i)
|
||||||
@ -401,14 +420,28 @@ impl Iterator for ClosestBucketsIter {
|
|||||||
}
|
}
|
||||||
ClosestBucketsIterState::ZoomOut(i) => {
|
ClosestBucketsIterState::ZoomOut(i) => {
|
||||||
if let Some(i) = self.next_out(i) {
|
if let Some(i) = self.next_out(i) {
|
||||||
|
println!(
|
||||||
|
"ClosestBucketsIter: distance = {}; ZoomOut({}) -> ZoomOut({})",
|
||||||
|
self.distance.0, i.0, i.0
|
||||||
|
);
|
||||||
self.state = ClosestBucketsIterState::ZoomOut(i);
|
self.state = ClosestBucketsIterState::ZoomOut(i);
|
||||||
Some(i)
|
Some(i)
|
||||||
} else {
|
} else {
|
||||||
|
println!(
|
||||||
|
"ClosestBucketsIter: distance = {}; ZoomOut({}) ->Done",
|
||||||
|
self.distance.0, i.0
|
||||||
|
);
|
||||||
self.state = ClosestBucketsIterState::Done;
|
self.state = ClosestBucketsIterState::Done;
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ClosestBucketsIterState::Done => None,
|
ClosestBucketsIterState::Done => {
|
||||||
|
println!(
|
||||||
|
"ClosestBucketsIter: distance = {}; Done",
|
||||||
|
self.distance.0
|
||||||
|
);
|
||||||
|
None
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -418,7 +451,7 @@ where
|
|||||||
TTarget: AsRef<KeyBytes>,
|
TTarget: AsRef<KeyBytes>,
|
||||||
TKey: Clone + AsRef<KeyBytes>,
|
TKey: Clone + AsRef<KeyBytes>,
|
||||||
TVal: Clone,
|
TVal: Clone,
|
||||||
TMap: Fn(&KBucket<TKey, TVal>) -> ArrayVec<[TOut; K_VALUE.get()]>,
|
TMap: Fn(&KBucket<TKey, TVal>) -> Vec<TOut>,
|
||||||
TOut: AsRef<KeyBytes>,
|
TOut: AsRef<KeyBytes>,
|
||||||
{
|
{
|
||||||
type Item = TOut;
|
type Item = TOut;
|
||||||
@ -427,7 +460,14 @@ where
|
|||||||
loop {
|
loop {
|
||||||
match &mut self.iter {
|
match &mut self.iter {
|
||||||
Some(iter) => match iter.next() {
|
Some(iter) => match iter.next() {
|
||||||
Some(k) => return Some(k),
|
Some(k) => {
|
||||||
|
println!(
|
||||||
|
"ClosestIter: target = {}; next node {}",
|
||||||
|
bs58::encode(&self.target.as_ref()).into_string(),
|
||||||
|
bs58::encode(k.as_ref()).into_string()
|
||||||
|
);
|
||||||
|
return Some(k)
|
||||||
|
},
|
||||||
None => self.iter = None,
|
None => self.iter = None,
|
||||||
},
|
},
|
||||||
None => {
|
None => {
|
||||||
@ -436,13 +476,22 @@ where
|
|||||||
self.table.applied_pending.extend(bucket.apply_pending());
|
self.table.applied_pending.extend(bucket.apply_pending());
|
||||||
let mut v = (self.fmap)(bucket);
|
let mut v = (self.fmap)(bucket);
|
||||||
v.sort_by(|a, b| {
|
v.sort_by(|a, b| {
|
||||||
self.target
|
Ord::cmp(
|
||||||
.as_ref()
|
&self.target.as_ref().distance(a.as_ref()),
|
||||||
.distance(a.as_ref())
|
&self.target.as_ref().distance(b.as_ref())
|
||||||
.cmp(&self.target.as_ref().distance(b.as_ref()))
|
)
|
||||||
});
|
});
|
||||||
|
println!(
|
||||||
|
"ClosestIter: target = {}; next bucket {} with {} nodes",
|
||||||
|
bs58::encode(&self.target.as_ref()).into_string(),
|
||||||
|
i.0, v.len()
|
||||||
|
);
|
||||||
self.iter = Some(v.into_iter());
|
self.iter = Some(v.into_iter());
|
||||||
} else {
|
} else {
|
||||||
|
println!(
|
||||||
|
"ClosestIter: target = {}; Finished.",
|
||||||
|
bs58::encode(&self.target.as_ref()).into_string()
|
||||||
|
);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -281,7 +281,12 @@ impl ClosestPeersIter {
|
|||||||
// their results can still be delivered to the iterator.
|
// their results can still be delivered to the iterator.
|
||||||
debug_assert!(self.num_waiting > 0);
|
debug_assert!(self.num_waiting > 0);
|
||||||
self.num_waiting -= 1;
|
self.num_waiting -= 1;
|
||||||
peer.state = PeerState::Unresponsive
|
peer.state = PeerState::Unresponsive;
|
||||||
|
println!(
|
||||||
|
"ClosestPeerIter: target = {}; peer {} timed out",
|
||||||
|
bs58::encode(&self.target).into_string(),
|
||||||
|
peer.key.preimage().to_base58()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
else if at_capacity {
|
else if at_capacity {
|
||||||
// The iterator is still waiting for a result from a peer and is
|
// The iterator is still waiting for a result from a peer and is
|
||||||
@ -299,16 +304,32 @@ impl ClosestPeersIter {
|
|||||||
|
|
||||||
PeerState::Succeeded =>
|
PeerState::Succeeded =>
|
||||||
if let Some(ref mut cnt) = result_counter {
|
if let Some(ref mut cnt) = result_counter {
|
||||||
|
println!(
|
||||||
|
"ClosestPeerIter: target = {}; peer {} succeeded",
|
||||||
|
bs58::encode(&self.target).into_string(),
|
||||||
|
peer.key.preimage().to_base58()
|
||||||
|
);
|
||||||
*cnt += 1;
|
*cnt += 1;
|
||||||
// If `num_results` successful results have been delivered for the
|
// If `num_results` successful results have been delivered for the
|
||||||
// closest peers, the iterator is done.
|
// closest peers, the iterator is done.
|
||||||
if *cnt >= self.config.num_results {
|
if *cnt >= self.config.num_results {
|
||||||
|
println!(
|
||||||
|
"ClosestPeerIter: target = {}; Got all {} results, finished.",
|
||||||
|
bs58::encode(&self.target).into_string(),
|
||||||
|
*cnt
|
||||||
|
);
|
||||||
self.state = State::Finished;
|
self.state = State::Finished;
|
||||||
return PeersIterState::Finished
|
return PeersIterState::Finished
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PeerState::NotContacted =>
|
PeerState::NotContacted => {
|
||||||
|
println!(
|
||||||
|
"ClosestPeerIter: target = {}; new peer {}. Capacity left? {}",
|
||||||
|
bs58::encode(&self.target).into_string(),
|
||||||
|
peer.key.preimage().to_base58(),
|
||||||
|
!at_capacity
|
||||||
|
);
|
||||||
if !at_capacity {
|
if !at_capacity {
|
||||||
let timeout = now + self.config.peer_timeout;
|
let timeout = now + self.config.peer_timeout;
|
||||||
peer.state = PeerState::Waiting(timeout);
|
peer.state = PeerState::Waiting(timeout);
|
||||||
@ -317,9 +338,16 @@ impl ClosestPeersIter {
|
|||||||
} else {
|
} else {
|
||||||
return PeersIterState::WaitingAtCapacity
|
return PeersIterState::WaitingAtCapacity
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
PeerState::Unresponsive | PeerState::Failed => {
|
PeerState::Unresponsive | PeerState::Failed => {
|
||||||
// Skip over unresponsive or failed peers.
|
// Skip over unresponsive or failed peers.
|
||||||
|
println!(
|
||||||
|
"ClosestPeerIter: target = {}; peer {} is {:?}",
|
||||||
|
bs58::encode(&self.target).into_string(),
|
||||||
|
peer.key.preimage().to_base58(),
|
||||||
|
peer.state
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user