mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-28 10:11:19 +00:00
add iterlog
This commit is contained in:
parent
91c94f5537
commit
81f15b5ec5
@ -49,6 +49,8 @@ pub struct ClosestPeersIter {
|
||||
|
||||
/// The number of peers for which the iterator is currently waiting for results.
|
||||
num_waiting: usize,
|
||||
|
||||
created_at: Instant,
|
||||
}
|
||||
|
||||
/// Configuration for a `ClosestPeersIter`.
|
||||
@ -110,7 +112,8 @@ impl ClosestPeersIter {
|
||||
.map(|key| {
|
||||
let distance = key.distance(&target);
|
||||
let state = PeerState::NotContacted;
|
||||
(distance, Peer { key, state })
|
||||
let log = vec![(Instant::now(), state.clone())];
|
||||
(distance, Peer { key, state, log })
|
||||
})
|
||||
.take(K_VALUE.into()));
|
||||
|
||||
@ -122,7 +125,8 @@ impl ClosestPeersIter {
|
||||
target,
|
||||
state,
|
||||
closest_peers,
|
||||
num_waiting: 0
|
||||
num_waiting: 0,
|
||||
created_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -159,10 +163,10 @@ impl ClosestPeersIter {
|
||||
PeerState::Waiting(..) => {
|
||||
debug_assert!(self.num_waiting > 0);
|
||||
self.num_waiting -= 1;
|
||||
e.get_mut().state = PeerState::Succeeded;
|
||||
e.get_mut().set_state(PeerState::Succeeded);
|
||||
}
|
||||
PeerState::Unresponsive => {
|
||||
e.get_mut().state = PeerState::Succeeded;
|
||||
e.get_mut().set_state(PeerState::Succeeded);
|
||||
}
|
||||
PeerState::NotContacted
|
||||
| PeerState::Failed
|
||||
@ -177,7 +181,7 @@ impl ClosestPeersIter {
|
||||
for peer in closer_peers {
|
||||
let key = peer.into();
|
||||
let distance = self.target.distance(&key);
|
||||
let peer = Peer { key, state: PeerState::NotContacted };
|
||||
let peer = Peer { key, state: PeerState::NotContacted, log: vec![(Instant::now(), PeerState::NotContacted)] };
|
||||
self.closest_peers.entry(distance).or_insert(peer);
|
||||
// The iterator makes progress if the new peer is either closer to the target
|
||||
// than any peer seen so far (i.e. is the first entry), or the iterator did
|
||||
@ -229,10 +233,10 @@ impl ClosestPeersIter {
|
||||
PeerState::Waiting(_) => {
|
||||
debug_assert!(self.num_waiting > 0);
|
||||
self.num_waiting -= 1;
|
||||
e.get_mut().state = PeerState::Failed
|
||||
e.get_mut().set_state(PeerState::Failed);
|
||||
}
|
||||
PeerState::Unresponsive => {
|
||||
e.get_mut().state = PeerState::Failed
|
||||
e.get_mut().set_state(PeerState::Failed);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@ -276,6 +280,9 @@ impl ClosestPeersIter {
|
||||
// Check if the iterator is at capacity w.r.t. the allowed parallelism.
|
||||
let at_capacity = self.at_capacity();
|
||||
|
||||
let closest = self.closest_peers.values().cloned().collect::<Vec<Peer>>();
|
||||
let created_at = self.created_at;
|
||||
|
||||
for peer in self.closest_peers.values_mut() {
|
||||
match peer.state {
|
||||
PeerState::Waiting(timeout) => {
|
||||
@ -285,7 +292,7 @@ impl ClosestPeersIter {
|
||||
// their results can still be delivered to the iterator.
|
||||
debug_assert!(self.num_waiting > 0);
|
||||
self.num_waiting -= 1;
|
||||
peer.state = PeerState::Unresponsive;
|
||||
peer.set_state(PeerState::Unresponsive);
|
||||
trace!(
|
||||
"ClosestPeerIter: target = {}; peer {} timed out",
|
||||
bs58::encode(&self.target).into_string(),
|
||||
@ -317,8 +324,17 @@ impl ClosestPeersIter {
|
||||
// If `num_results` successful results have been delivered for the
|
||||
// closest peers, the iterator is done.
|
||||
if *cnt >= self.config.num_results {
|
||||
let log = closest.into_iter().map(|p| {
|
||||
let log = p.log.iter().map(|(i, s)| {
|
||||
let elapsed = i.checked_duration_since(created_at).map_or("negative".to_string(), |d| d.as_millis().to_string());
|
||||
format!("[iterlog] \t{: <25?} - +{}ms\n", s, elapsed)
|
||||
}).collect::<String>();
|
||||
|
||||
format!("[iterlog] {}:\n{}", p.key.into_preimage(), log)
|
||||
}).collect::<String>();
|
||||
|
||||
trace!(
|
||||
"ClosestPeerIter: target = {}; Got all {} results, finished.",
|
||||
"[iterlog] ClosestPeerIter: target = {}; Got all {} results, finished. Log:\n",
|
||||
bs58::encode(&self.target).into_string(),
|
||||
*cnt
|
||||
);
|
||||
@ -336,7 +352,7 @@ impl ClosestPeersIter {
|
||||
);
|
||||
if !at_capacity {
|
||||
let timeout = now + self.config.peer_timeout;
|
||||
peer.state = PeerState::Waiting(timeout);
|
||||
peer.set_state(PeerState::Waiting(timeout));
|
||||
self.num_waiting += 1;
|
||||
return PeersIterState::Waiting(Some(Cow::Borrowed(peer.key.preimage())))
|
||||
} else {
|
||||
@ -454,7 +470,15 @@ enum State {
|
||||
#[derive(Debug, Clone)]
|
||||
struct Peer {
|
||||
key: Key<PeerId>,
|
||||
state: PeerState
|
||||
state: PeerState,
|
||||
log: Vec<(Instant, PeerState)>
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
pub fn set_state(&mut self, state: PeerState) {
|
||||
self.state = state;
|
||||
self.log.push((Instant::now(), self.state.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of a single `Peer`.
|
||||
@ -704,7 +728,7 @@ mod tests {
|
||||
// Advancing the iterator again should mark the first peer as unresponsive.
|
||||
let _ = iter.next(now);
|
||||
match &iter.closest_peers.values().next().unwrap() {
|
||||
Peer { key, state: PeerState::Unresponsive } => {
|
||||
Peer { key, state: PeerState::Unresponsive, .. } => {
|
||||
assert_eq!(key.preimage(), &peer);
|
||||
},
|
||||
Peer { state, .. } => panic!("Unexpected peer state: {:?}", state)
|
||||
|
Loading…
x
Reference in New Issue
Block a user