mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-28 09:11:34 +00:00
[libp2p-kad] Scope pending RPCs to queries. (#1217)
* Remove pending RPCs on query completion. Ensure that any still pending RPCs related to a query are removed once the query terminates (successfully or through timeout) by scoping pending RPCs to the lifetime of a query. * Cleanup.
This commit is contained in:
committed by
Pierre Krieger
parent
bcfb647e65
commit
5696b3eb4d
@ -22,6 +22,7 @@
|
|||||||
|
|
||||||
mod test;
|
mod test;
|
||||||
|
|
||||||
|
use crate::K_VALUE;
|
||||||
use crate::addresses::Addresses;
|
use crate::addresses::Addresses;
|
||||||
use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
|
use crate::handler::{KademliaHandler, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
|
||||||
use crate::jobs::*;
|
use crate::jobs::*;
|
||||||
@ -58,10 +59,6 @@ pub struct Kademlia<TSubstream, TStore> {
|
|||||||
/// This is a superset of the connected peers currently in the routing table.
|
/// This is a superset of the connected peers currently in the routing table.
|
||||||
connected_peers: FnvHashSet<PeerId>,
|
connected_peers: FnvHashSet<PeerId>,
|
||||||
|
|
||||||
/// A list of pending request to peers that are not currently connected.
|
|
||||||
/// These requests are sent as soon as a connection to the peer is established.
|
|
||||||
pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); 8]>,
|
|
||||||
|
|
||||||
/// Periodic job for re-publication of provider records for keys
|
/// Periodic job for re-publication of provider records for keys
|
||||||
/// provided by the local node.
|
/// provided by the local node.
|
||||||
add_provider_job: Option<AddProviderJob>,
|
add_provider_job: Option<AddProviderJob>,
|
||||||
@ -233,7 +230,6 @@ where
|
|||||||
/// Creates a new `Kademlia` network behaviour with the given configuration.
|
/// Creates a new `Kademlia` network behaviour with the given configuration.
|
||||||
pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self {
|
pub fn with_config(id: PeerId, store: TStore, config: KademliaConfig) -> Self {
|
||||||
let local_key = kbucket::Key::new(id.clone());
|
let local_key = kbucket::Key::new(id.clone());
|
||||||
let pending_rpcs = SmallVec::with_capacity(config.query_config.replication_factor.get());
|
|
||||||
|
|
||||||
let put_record_job = config
|
let put_record_job = config
|
||||||
.record_replication_interval
|
.record_replication_interval
|
||||||
@ -256,7 +252,6 @@ where
|
|||||||
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
|
queued_events: VecDeque::with_capacity(config.query_config.replication_factor.get()),
|
||||||
queries: QueryPool::new(config.query_config),
|
queries: QueryPool::new(config.query_config),
|
||||||
connected_peers: Default::default(),
|
connected_peers: Default::default(),
|
||||||
pending_rpcs,
|
|
||||||
add_provider_job,
|
add_provider_job,
|
||||||
put_record_job,
|
put_record_job,
|
||||||
record_ttl: config.record_ttl,
|
record_ttl: config.record_ttl,
|
||||||
@ -1054,12 +1049,14 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn inject_connected(&mut self, peer: PeerId, endpoint: ConnectedPoint) {
|
fn inject_connected(&mut self, peer: PeerId, endpoint: ConnectedPoint) {
|
||||||
while let Some(pos) = self.pending_rpcs.iter().position(|(p, _)| p == &peer) {
|
// Queue events for sending pending RPCs to the connected peer.
|
||||||
let (_, rpc) = self.pending_rpcs.remove(pos);
|
// There can be only one pending RPC for a particular peer and query per definition.
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::SendEvent {
|
for (peer_id, event) in self.queries.iter_mut().filter_map(|q|
|
||||||
peer_id: peer.clone(),
|
q.inner.pending_rpcs.iter()
|
||||||
event: rpc,
|
.position(|(p, _)| p == &peer)
|
||||||
});
|
.map(|p| q.inner.pending_rpcs.remove(p)))
|
||||||
|
{
|
||||||
|
self.queued_events.push_back(NetworkBehaviourAction::SendEvent { peer_id, event });
|
||||||
}
|
}
|
||||||
|
|
||||||
// The remote's address can only be put into the routing table,
|
// The remote's address can only be put into the routing table,
|
||||||
@ -1396,7 +1393,7 @@ where
|
|||||||
peer_id, event
|
peer_id, event
|
||||||
});
|
});
|
||||||
} else if &peer_id != self.kbuckets.local_key().preimage() {
|
} else if &peer_id != self.kbuckets.local_key().preimage() {
|
||||||
self.pending_rpcs.push((peer_id.clone(), event));
|
query.inner.pending_rpcs.push((peer_id.clone(), event));
|
||||||
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
|
self.queued_events.push_back(NetworkBehaviourAction::DialPeer {
|
||||||
peer_id
|
peer_id
|
||||||
});
|
});
|
||||||
@ -1741,13 +1738,19 @@ struct QueryInner {
|
|||||||
info: QueryInfo,
|
info: QueryInfo,
|
||||||
/// Addresses of peers discovered during a query.
|
/// Addresses of peers discovered during a query.
|
||||||
addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
|
addresses: FnvHashMap<PeerId, SmallVec<[Multiaddr; 8]>>,
|
||||||
|
/// A map of pending requests to peers.
|
||||||
|
///
|
||||||
|
/// A request is pending if the targeted peer is not currently connected
|
||||||
|
/// and these requests are sent as soon as a connection to the peer is established.
|
||||||
|
pending_rpcs: SmallVec<[(PeerId, KademliaHandlerIn<QueryId>); K_VALUE.get()]>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QueryInner {
|
impl QueryInner {
|
||||||
fn new(info: QueryInfo) -> Self {
|
fn new(info: QueryInfo) -> Self {
|
||||||
QueryInner {
|
QueryInner {
|
||||||
info,
|
info,
|
||||||
addresses: Default::default()
|
addresses: Default::default(),
|
||||||
|
pending_rpcs: SmallVec::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -188,6 +188,7 @@ fn query_iter() {
|
|||||||
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
|
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
|
||||||
assert_eq!(ok.key, search_target);
|
assert_eq!(ok.key, search_target);
|
||||||
assert_eq!(swarm_ids[i], expected_swarm_id);
|
assert_eq!(swarm_ids[i], expected_swarm_id);
|
||||||
|
assert_eq!(swarm.queries.size(), 0);
|
||||||
assert!(expected_peer_ids.iter().all(|p| ok.peers.contains(p)));
|
assert!(expected_peer_ids.iter().all(|p| ok.peers.contains(p)));
|
||||||
let key = kbucket::Key::new(ok.key);
|
let key = kbucket::Key::new(ok.key);
|
||||||
assert_eq!(expected_distances, distances(&key, ok.peers));
|
assert_eq!(expected_distances, distances(&key, ok.peers));
|
||||||
@ -420,6 +421,7 @@ fn put_record() {
|
|||||||
|
|
||||||
if republished {
|
if republished {
|
||||||
assert_eq!(swarms[0].store.records().count(), records.len());
|
assert_eq!(swarms[0].store.records().count(), records.len());
|
||||||
|
assert_eq!(swarms[0].queries.size(), 0);
|
||||||
for k in records.keys() {
|
for k in records.keys() {
|
||||||
swarms[0].store.remove(&k);
|
swarms[0].store.remove(&k);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user