mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
Report the entire peers after a result (#467)
This commit is contained in:
parent
0a3d4cdfad
commit
304e9c72c8
@ -204,10 +204,10 @@ fn main() {
|
|||||||
})
|
})
|
||||||
.filter_map(move |event| {
|
.filter_map(move |event| {
|
||||||
match event {
|
match event {
|
||||||
KadQueryEvent::NewKnownMultiaddrs(peers) => {
|
KadQueryEvent::PeersReported(peers) => {
|
||||||
for (peer, addrs) in peers {
|
for peer in peers {
|
||||||
peer_store.peer_or_create(&peer)
|
peer_store.peer_or_create(&peer.node_id)
|
||||||
.add_addrs(addrs, Duration::from_secs(3600));
|
.add_addrs(peer.multiaddrs, Duration::from_secs(3600));
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
},
|
},
|
||||||
|
@ -23,7 +23,6 @@ use futures::{future, Future, IntoFuture, stream, Stream};
|
|||||||
use kad_server::KadConnecController;
|
use kad_server::KadConnecController;
|
||||||
use kbucket::{KBucketsTable, KBucketsPeerId};
|
use kbucket::{KBucketsTable, KBucketsPeerId};
|
||||||
use libp2p_core::PeerId;
|
use libp2p_core::PeerId;
|
||||||
use multiaddr::Multiaddr;
|
|
||||||
use protocol;
|
use protocol;
|
||||||
use rand;
|
use rand;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
@ -64,7 +63,7 @@ pub struct KadSystem {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum KadQueryEvent<TOut> {
|
pub enum KadQueryEvent<TOut> {
|
||||||
/// Learned about new mutiaddresses for the given peers.
|
/// Learned about new mutiaddresses for the given peers.
|
||||||
NewKnownMultiaddrs(Vec<(PeerId, Vec<Multiaddr>)>),
|
PeersReported(Vec<protocol::KadPeer>),
|
||||||
/// Finished the processing of the query. Contains the result.
|
/// Finished the processing of the query. Contains the result.
|
||||||
Finished(TOut),
|
Finished(TOut),
|
||||||
}
|
}
|
||||||
@ -180,7 +179,7 @@ where F: FnMut(&PeerId) -> Fut + Send + 'a,
|
|||||||
let stream = query(access, kbuckets, peer_id, parallelism, 20, request_timeout) // TODO: 20 is arbitrary
|
let stream = query(access, kbuckets, peer_id, parallelism, 20, request_timeout) // TODO: 20 is arbitrary
|
||||||
.map(|event| {
|
.map(|event| {
|
||||||
match event {
|
match event {
|
||||||
KadQueryEvent::NewKnownMultiaddrs(peers) => KadQueryEvent::NewKnownMultiaddrs(peers),
|
KadQueryEvent::PeersReported(peers) => KadQueryEvent::PeersReported(peers),
|
||||||
KadQueryEvent::Finished(_) => KadQueryEvent::Finished(()),
|
KadQueryEvent::Finished(_) => KadQueryEvent::Finished(()),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -413,15 +412,12 @@ where F: FnMut(&PeerId) -> Fut + 'a,
|
|||||||
let mut local_nearest_node_updated = false;
|
let mut local_nearest_node_updated = false;
|
||||||
|
|
||||||
// Update `state` with the actual content of the message.
|
// Update `state` with the actual content of the message.
|
||||||
let mut new_known_multiaddrs = Vec::with_capacity(closer_peers.len());
|
let mut peers_reported = Vec::with_capacity(closer_peers.len());
|
||||||
for mut peer in closer_peers {
|
for mut peer in closer_peers {
|
||||||
// Update the peerstore with the information sent by
|
// Update the peerstore with the information sent by
|
||||||
// the remote.
|
// the remote.
|
||||||
{
|
trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, peer.multiaddrs);
|
||||||
let multiaddrs = mem::replace(&mut peer.multiaddrs, Vec::new());
|
peers_reported.push(peer.clone());
|
||||||
trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, multiaddrs);
|
|
||||||
new_known_multiaddrs.push((peer.node_id.clone(), multiaddrs));
|
|
||||||
}
|
|
||||||
|
|
||||||
if peer.node_id.distance_with(&searched_key)
|
if peer.node_id.distance_with(&searched_key)
|
||||||
<= state.result[0].distance_with(&searched_key)
|
<= state.result[0].distance_with(&searched_key)
|
||||||
@ -458,7 +454,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
future::ok((Some(KadQueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state))
|
future::ok((Some(KadQueryEvent::PeersReported(peers_reported)), state))
|
||||||
});
|
});
|
||||||
|
|
||||||
Some(future::Either::B(future))
|
Some(future::Either::B(future))
|
||||||
|
@ -49,7 +49,7 @@ pub struct QueryParams<FBuckets, FFindNode> {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum QueryEvent<TOut> {
|
pub enum QueryEvent<TOut> {
|
||||||
/// Learned about new mutiaddresses for the given peers.
|
/// Learned about new mutiaddresses for the given peers.
|
||||||
NewKnownMultiaddrs(Vec<(PeerId, Vec<Multiaddr>)>),
|
PeersReported(Vec<(PeerId, Vec<Multiaddr>)>),
|
||||||
/// Finished the processing of the query. Contains the result.
|
/// Finished the processing of the query. Contains the result.
|
||||||
Finished(TOut),
|
Finished(TOut),
|
||||||
}
|
}
|
||||||
@ -86,7 +86,7 @@ where
|
|||||||
|
|
||||||
let stream = find_node(query_params, peer_id).map(|event| {
|
let stream = find_node(query_params, peer_id).map(|event| {
|
||||||
match event {
|
match event {
|
||||||
QueryEvent::NewKnownMultiaddrs(peers) => QueryEvent::NewKnownMultiaddrs(peers),
|
QueryEvent::PeersReported(peers) => QueryEvent::PeersReported(peers),
|
||||||
QueryEvent::Finished(_) => QueryEvent::Finished(()),
|
QueryEvent::Finished(_) => QueryEvent::Finished(()),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -355,7 +355,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
future::ok((Some(QueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state))
|
future::ok((Some(QueryEvent::PeersReported(new_known_multiaddrs)), state))
|
||||||
});
|
});
|
||||||
|
|
||||||
Some(future::Either::B(future))
|
Some(future::Either::B(future))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user