protocols/kad: Improve options to efficiently retrieve (#2712)

This commit is contained in:
Friedel Ziegelmayer
2022-11-25 09:29:46 +01:00
committed by GitHub
parent cff84f1897
commit a99718162b
9 changed files with 611 additions and 364 deletions

View File

@@ -52,6 +52,7 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmEvent}, swarm::{NetworkBehaviour, SwarmEvent},
PeerId, Swarm, PeerId, Swarm,
}; };
use libp2p_kad::{GetProvidersOk, GetRecordOk};
use std::error::Error; use std::error::Error;
#[async_std::main] #[async_std::main]
@@ -120,33 +121,32 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
} }
} }
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => { SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
match result { match result {
QueryResult::GetProviders(Ok(ok)) => { QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key, providers, .. })) => {
for peer in ok.providers { for peer in providers {
println!( println!(
"Peer {:?} provides key {:?}", "Peer {peer:?} provides key {:?}",
peer, std::str::from_utf8(key.as_ref()).unwrap()
std::str::from_utf8(ok.key.as_ref()).unwrap()
); );
} }
} }
QueryResult::GetProviders(Err(err)) => { QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {err:?}"); eprintln!("Failed to get providers: {err:?}");
} }
QueryResult::GetRecord(Ok(ok)) => { QueryResult::GetRecord(Ok(
for PeerRecord { GetRecordOk::FoundRecord(PeerRecord {
record: Record { key, value, .. }, record: Record { key, value, .. },
.. ..
} in ok.records })
{ )) => {
println!( println!(
"Got record {:?} {:?}", "Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(), std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(), std::str::from_utf8(&value).unwrap(),
); );
}
} }
QueryResult::GetRecord(Ok(_)) => {}
QueryResult::GetRecord(Err(err)) => { QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {err:?}"); eprintln!("Failed to get record: {err:?}");
} }
@@ -191,7 +191,7 @@ fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
} }
} }
}; };
kademlia.get_record(key, Quorum::One); kademlia.get_record(key);
} }
Some("GET_PROVIDERS") => { Some("GET_PROVIDERS") => {
let key = { let key = {

View File

@@ -413,7 +413,7 @@ mod network {
) { ) {
match event { match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia( SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::StartProviding(_), result: QueryResult::StartProviding(_),
.. ..
@@ -426,18 +426,37 @@ mod network {
let _ = sender.send(()); let _ = sender.send(());
} }
SwarmEvent::Behaviour(ComposedEvent::Kademlia( SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })), result:
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
providers,
..
})),
.. ..
}, },
)) => { )) => {
let _ = self if let Some(sender) = self.pending_get_providers.remove(&id) {
.pending_get_providers sender.send(providers).expect("Receiver not to be dropped");
.remove(&id)
.expect("Completed query to be previously pending.") // Finish the query. We are only interested in the first result.
.send(providers); self.swarm
.behaviour_mut()
.kademlia
.query_mut(&id)
.unwrap()
.finish();
}
} }
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
result:
QueryResult::GetProviders(Ok(
GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
)),
..
},
)) => {}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse( SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
RequestResponseEvent::Message { message, .. }, RequestResponseEvent::Message { message, .. },

View File

@@ -81,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop { loop {
let event = swarm.select_next_some().await; let event = swarm.select_next_some().await;
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(result), result: QueryResult::GetClosestPeers(result),
.. ..
}) = event }) = event

View File

@@ -20,8 +20,12 @@
- Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090]. - Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090].
- Changed `Metrics::query_result_get_record_ok` from `Histogram` to a `Counter`.
See [PR 2712].
[PR 2982]: https://github.com/libp2p/rust-libp2p/pull/2982/ [PR 2982]: https://github.com/libp2p/rust-libp2p/pull/2982/
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 [PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090
[PR 2712]: https://github.com/libp2p/rust-libp2p/pull/2712
# 0.10.0 # 0.10.0

View File

@@ -25,7 +25,7 @@ use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::{Registry, Unit}; use prometheus_client::registry::{Registry, Unit};
pub struct Metrics { pub struct Metrics {
query_result_get_record_ok: Histogram, query_result_get_record_ok: Counter,
query_result_get_record_error: Family<GetRecordResult, Counter>, query_result_get_record_error: Family<GetRecordResult, Counter>,
query_result_get_closest_peers_ok: Histogram, query_result_get_closest_peers_ok: Histogram,
@@ -48,7 +48,7 @@ impl Metrics {
pub fn new(registry: &mut Registry) -> Self { pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("kad"); let sub_registry = registry.sub_registry_with_prefix("kad");
let query_result_get_record_ok = Histogram::new(exponential_buckets(1.0, 2.0, 10)); let query_result_get_record_ok = Counter::default();
sub_registry.register( sub_registry.register(
"query_result_get_record_ok", "query_result_get_record_ok",
"Number of records returned by a successful Kademlia get record query.", "Number of records returned by a successful Kademlia get record query.",
@@ -162,7 +162,7 @@ impl Metrics {
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics { impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) { fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event { match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => { libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => {
self.query_result_num_requests self.query_result_num_requests
.get_or_create(&result.into()) .get_or_create(&result.into())
.observe(stats.num_requests().into()); .observe(stats.num_requests().into());
@@ -180,9 +180,10 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
match result { match result {
libp2p_kad::QueryResult::GetRecord(result) => match result { libp2p_kad::QueryResult::GetRecord(result) => match result {
Ok(ok) => self Ok(libp2p_kad::GetRecordOk::FoundRecord(_)) => {
.query_result_get_record_ok self.query_result_get_record_ok.inc();
.observe(ok.records.len() as f64), }
Ok(libp2p_kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {}
Err(error) => { Err(error) => {
self.query_result_get_record_error self.query_result_get_record_error
.get_or_create(&error.into()) .get_or_create(&error.into())
@@ -200,9 +201,13 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
} }
}, },
libp2p_kad::QueryResult::GetProviders(result) => match result { libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self Ok(libp2p_kad::GetProvidersOk::FoundProviders { providers, .. }) => {
.query_result_get_providers_ok self.query_result_get_providers_ok
.observe(ok.providers.len() as f64), .observe(providers.len() as f64);
}
Ok(libp2p_kad::GetProvidersOk::FinishedWithNoAdditionalRecord {
..
}) => {}
Err(error) => { Err(error) => {
self.query_result_get_providers_error self.query_result_get_providers_error
.get_or_create(&error.into()) .get_or_create(&error.into())

View File

@@ -16,10 +16,18 @@
This would eventually lead to warning that says: "New inbound substream to PeerId exceeds inbound substream limit. No older substream waiting to be reused." This would eventually lead to warning that says: "New inbound substream to PeerId exceeds inbound substream limit. No older substream waiting to be reused."
See [PR 3152]. See [PR 3152].
- Refactor APIs to be streaming.
- Renamed `KademliaEvent::OutboundQueryCompleted` to `KademliaEvent::OutboundQueryProgressed`
- Instead of a single event `OutboundQueryCompleted`, there are now multiple events emitted, allowing the user to process them as they come in (via the new `OutboundQueryProgressed`). See `ProgressStep` to identify the final `OutboundQueryProgressed` of a single query.
- To finish a query early, i.e. before the final `OutboundQueryProgressed` of the query, a caller needs to call `query.finish()`.
- There is no more automatic caching of records. The user has to manually call `put_record_to` on the `QueryInfo::GetRecord.cache_candidates` to cache a record to a close peer that did not return the record on the foregone query.
See [PR 2712].
[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085 [PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011 [PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 [PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090
[PR 3152]: https://github.com/libp2p/rust-libp2p/pull/3152 [PR 3152]: https://github.com/libp2p/rust-libp2p/pull/3152
[PR 2712]: https://github.com/libp2p/rust-libp2p/pull/2712
# 0.41.0 # 0.41.0

View File

@@ -179,23 +179,6 @@ pub struct KademliaConfig {
caching: KademliaCaching, caching: KademliaCaching,
} }
/// The configuration for Kademlia "write-back" caching after successful
/// lookups via [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub enum KademliaCaching {
/// Caching is disabled and the peers closest to records being looked up
/// that do not return a record are not tracked, i.e.
/// [`GetRecordOk::cache_candidates`] is always empty.
Disabled,
/// Up to `max_peers` peers not returning a record that are closest to the key
/// being looked up are tracked and returned in [`GetRecordOk::cache_candidates`].
/// Furthermore, if [`Kademlia::get_record`] is used with a quorum of 1, the
/// found record is automatically sent to (i.e. cached at) these peers. For lookups with a
/// quorum > 1, the write-back operation must be performed explicitly, if
/// desired and after choosing a record from the results, via [`Kademlia::put_record_to`].
Enabled { max_peers: u16 },
}
impl Default for KademliaConfig { impl Default for KademliaConfig {
fn default() -> Self { fn default() -> Self {
KademliaConfig { KademliaConfig {
@@ -215,6 +198,21 @@ impl Default for KademliaConfig {
} }
} }
/// The configuration for Kademlia "write-back" caching after successful
/// lookups via [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub enum KademliaCaching {
/// Caching is disabled and the peers closest to records being looked up
/// that do not return a record are not tracked, i.e.
/// [`GetRecordOk::FinishedWithNoAdditionalRecord`] is always empty.
Disabled,
/// Up to `max_peers` peers not returning a record that are closest to the key
/// being looked up are tracked and returned in [`GetRecordOk::FinishedWithNoAdditionalRecord`].
/// The write-back operation must be performed explicitly, if
/// desired and after choosing a record from the results, via [`Kademlia::put_record_to`].
Enabled { max_peers: u16 },
}
impl KademliaConfig { impl KademliaConfig {
/// Sets custom protocol names. /// Sets custom protocol names.
/// ///
@@ -435,6 +433,7 @@ where
Kademlia { Kademlia {
store, store,
caching: config.caching,
kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout), kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout),
kbucket_inserts: config.kbucket_inserts, kbucket_inserts: config.kbucket_inserts,
protocol_config: config.protocol_config, protocol_config: config.protocol_config,
@@ -448,7 +447,6 @@ where
provider_record_ttl: config.provider_record_ttl, provider_record_ttl: config.provider_record_ttl,
connection_idle_timeout: config.connection_idle_timeout, connection_idle_timeout: config.connection_idle_timeout,
local_addrs: HashSet::new(), local_addrs: HashSet::new(),
caching: config.caching,
} }
} }
@@ -661,13 +659,15 @@ where
where where
K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone, K: Into<kbucket::Key<K>> + Into<Vec<u8>> + Clone,
{ {
let target: kbucket::Key<K> = key.clone().into();
let key: Vec<u8> = key.into();
let info = QueryInfo::GetClosestPeers { let info = QueryInfo::GetClosestPeers {
key: key.clone().into(), key,
step: ProgressStep::first(),
}; };
let target: kbucket::Key<K> = key.into(); let peer_keys: Vec<kbucket::Key<PeerId>> = self.kbuckets.closest_keys(&target).collect();
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info); let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner) self.queries.add_iter_closest(target, peer_keys, inner)
} }
/// Returns closest peers to the given key; takes peers from local routing table only. /// Returns closest peers to the given key; takes peers from local routing table only.
@@ -682,36 +682,56 @@ where
/// ///
/// The result of this operation is delivered in a /// The result of this operation is delivered in a
/// [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetRecord}`]. /// [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetRecord}`].
pub fn get_record(&mut self, key: record::Key, quorum: Quorum) -> QueryId { pub fn get_record(&mut self, key: record::Key) -> QueryId {
let quorum = quorum.eval(self.queries.config().replication_factor); let record = if let Some(record) = self.store.get(&key) {
let mut records = Vec::with_capacity(quorum.get());
if let Some(record) = self.store.get(&key) {
if record.is_expired(Instant::now()) { if record.is_expired(Instant::now()) {
self.store.remove(&key) self.store.remove(&key);
None
} else { } else {
records.push(PeerRecord { Some(PeerRecord {
peer: None, peer: None,
record: record.into_owned(), record: record.into_owned(),
}); })
} }
} } else {
None
};
let step = ProgressStep::first();
let done = records.len() >= quorum.get();
let target = kbucket::Key::new(key.clone()); let target = kbucket::Key::new(key.clone());
let info = QueryInfo::GetRecord { let info = if record.is_some() {
key, QueryInfo::GetRecord {
records, key,
quorum, step: step.next(),
cache_candidates: BTreeMap::new(), found_a_record: true,
cache_candidates: BTreeMap::new(),
}
} else {
QueryInfo::GetRecord {
key,
step: step.clone(),
found_a_record: false,
cache_candidates: BTreeMap::new(),
}
}; };
let peers = self.kbuckets.closest_keys(&target); let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info); let inner = QueryInner::new(info);
let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*) let id = self.queries.add_iter_closest(target.clone(), peers, inner);
// Instantly finish the query if we already have enough records. // No queries were actually done for the results yet.
if done { let stats = QueryStats::empty();
self.queries.get_mut(&id).expect("by (*)").finish();
if let Some(record) = record {
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
step,
stats,
},
));
} }
id id
@@ -766,19 +786,16 @@ where
/// ///
/// If the record's expiration is `None`, the configured record TTL is used. /// If the record's expiration is `None`, the configured record TTL is used.
/// ///
/// > **Note**: This is not a regular Kademlia DHT operation. It may be /// > **Note**: This is not a regular Kademlia DHT operation. It needs to be
/// > used to selectively update or store a record to specific peers /// > used to selectively update or store a record to specific peers
/// > for the purpose of e.g. making sure these peers have the latest /// > for the purpose of e.g. making sure these peers have the latest
/// > "version" of a record or to "cache" a record at further peers /// > "version" of a record or to "cache" a record at further peers
/// > to increase the lookup success rate on the DHT for other peers. /// > to increase the lookup success rate on the DHT for other peers.
/// > /// >
/// > In particular, if lookups are performed with a quorum > 1 multiple /// > In particular, there is no automatic storing of records performed, and this
/// > possibly different records may be returned and the standard Kademlia /// > method must be used to ensure the standard Kademlia
/// > procedure of "caching" (i.e. storing) a found record at the closest /// > procedure of "caching" (i.e. storing) a found record at the closest
/// > node to the key that _did not_ return it cannot be employed /// > node to the key that _did not_ return it.
/// > transparently. In that case, client code can explicitly choose
/// > which record to store at which peers for analogous write-back
/// > caching or for other reasons.
pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
where where
I: ExactSizeIterator<Item = PeerId>, I: ExactSizeIterator<Item = PeerId>,
@@ -854,6 +871,7 @@ where
let info = QueryInfo::Bootstrap { let info = QueryInfo::Bootstrap {
peer: *local_key.preimage(), peer: *local_key.preimage(),
remaining: None, remaining: None,
step: ProgressStep::first(),
}; };
let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>(); let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
if peers.is_empty() { if peers.is_empty() {
@@ -924,21 +942,49 @@ where
/// The result of this operation is delivered in a /// The result of this operation is delivered in a
/// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetProviders}`]. /// reported via [`KademliaEvent::OutboundQueryCompleted{QueryResult::GetProviders}`].
pub fn get_providers(&mut self, key: record::Key) -> QueryId { pub fn get_providers(&mut self, key: record::Key) -> QueryId {
let providers = self let providers: HashSet<_> = self
.store .store
.providers(&key) .providers(&key)
.into_iter() .into_iter()
.filter(|p| !p.is_expired(Instant::now())) .filter(|p| !p.is_expired(Instant::now()))
.map(|p| p.provider) .map(|p| p.provider)
.collect(); .collect();
let step = ProgressStep::first();
let info = QueryInfo::GetProviders { let info = QueryInfo::GetProviders {
key: key.clone(), key: key.clone(),
providers, providers_found: providers.len(),
step: if providers.is_empty() {
step.clone()
} else {
step.next()
},
}; };
let target = kbucket::Key::new(key);
let target = kbucket::Key::new(key.clone());
let peers = self.kbuckets.closest_keys(&target); let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info); let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner) let id = self.queries.add_iter_closest(target.clone(), peers, inner);
// No queries were actually done for the results yet.
let stats = QueryStats::empty();
if !providers.is_empty() {
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
key,
providers,
})),
step,
stats,
},
));
}
id
} }
/// Processes discovered peers from a successful request in an iterative `Query`. /// Processes discovered peers from a successful request in an iterative `Query`.
@@ -1189,7 +1235,11 @@ where
log::trace!("Query {:?} finished.", query_id); log::trace!("Query {:?} finished.", query_id);
let result = q.into_result(); let result = q.into_result();
match result.inner.info { match result.inner.info {
QueryInfo::Bootstrap { peer, remaining } => { QueryInfo::Bootstrap {
peer,
remaining,
mut step,
} => {
let local_key = self.kbuckets.local_key().clone(); let local_key = self.kbuckets.local_key().clone();
let mut remaining = remaining.unwrap_or_else(|| { let mut remaining = remaining.unwrap_or_else(|| {
debug_assert_eq!(&peer, local_key.preimage()); debug_assert_eq!(&peer, local_key.preimage());
@@ -1235,41 +1285,53 @@ where
let info = QueryInfo::Bootstrap { let info = QueryInfo::Bootstrap {
peer: *target.preimage(), peer: *target.preimage(),
remaining: Some(remaining), remaining: Some(remaining),
step: step.next(),
}; };
let peers = self.kbuckets.closest_keys(&target); let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info); let inner = QueryInner::new(info);
self.queries self.queries
.continue_iter_closest(query_id, target.clone(), peers, inner); .continue_iter_closest(query_id, target.clone(), peers, inner);
} } else {
step.last = true;
};
Some(KademliaEvent::OutboundQueryCompleted { Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::Bootstrap(Ok(BootstrapOk { result: QueryResult::Bootstrap(Ok(BootstrapOk {
peer, peer,
num_remaining, num_remaining,
})), })),
step,
}) })
} }
QueryInfo::GetClosestPeers { key, .. } => Some(KademliaEvent::OutboundQueryCompleted { QueryInfo::GetClosestPeers { key, mut step } => {
id: query_id, step.last = true;
stats: result.stats,
result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
key,
peers: result.peers.collect(),
})),
}),
QueryInfo::GetProviders { key, providers } => { Some(KademliaEvent::OutboundQueryProgressed {
Some(KademliaEvent::OutboundQueryCompleted {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::GetProviders(Ok(GetProvidersOk { result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk {
key, key,
providers, peers: result.peers.collect(),
closest_peers: result.peers.collect(),
})), })),
step,
})
}
QueryInfo::GetProviders { mut step, .. } => {
step.last = true;
Some(KademliaEvent::OutboundQueryProgressed {
id: query_id,
stats: result.stats,
result: QueryResult::GetProviders(Ok(
GetProvidersOk::FinishedWithNoAdditionalRecord {
closest_peers: result.peers.collect(),
},
)),
step,
}) })
} }
@@ -1302,65 +1364,41 @@ where
.. ..
}, },
} => match context { } => match context {
AddProviderContext::Publish => Some(KademliaEvent::OutboundQueryCompleted { AddProviderContext::Publish => Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: get_closest_peers_stats.merge(result.stats), stats: get_closest_peers_stats.merge(result.stats),
result: QueryResult::StartProviding(Ok(AddProviderOk { key })), result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
step: ProgressStep::first_and_last(),
}), }),
AddProviderContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { AddProviderContext::Republish => Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: get_closest_peers_stats.merge(result.stats), stats: get_closest_peers_stats.merge(result.stats),
result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })), result: QueryResult::RepublishProvider(Ok(AddProviderOk { key })),
step: ProgressStep::first_and_last(),
}), }),
}, },
QueryInfo::GetRecord { QueryInfo::GetRecord {
key, key,
records, mut step,
quorum, found_a_record,
cache_candidates, cache_candidates,
} => { } => {
let results = if records.len() >= quorum.get() { step.last = true;
// [not empty]
if quorum.get() == 1 && !cache_candidates.is_empty() { let results = if found_a_record {
// Cache the record at the closest node(s) to the key that Ok(GetRecordOk::FinishedWithNoAdditionalRecord { cache_candidates })
// did not return the record. } else {
let record = records.first().expect("[not empty]").record.clone();
let quorum = NonZeroUsize::new(1).expect("1 > 0");
let context = PutRecordContext::Cache;
let info = QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::PutRecord {
success: vec![],
get_closest_peers_stats: QueryStats::empty(),
},
};
let inner = QueryInner::new(info);
self.queries
.add_fixed(cache_candidates.values().copied(), inner);
}
Ok(GetRecordOk {
records,
cache_candidates,
})
} else if records.is_empty() {
Err(GetRecordError::NotFound { Err(GetRecordError::NotFound {
key, key,
closest_peers: result.peers.collect(), closest_peers: result.peers.collect(),
}) })
} else {
Err(GetRecordError::QuorumFailed {
key,
records,
quorum,
})
}; };
Some(KademliaEvent::OutboundQueryCompleted { Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::GetRecord(results), result: QueryResult::GetRecord(results),
step,
}) })
} }
@@ -1407,25 +1445,23 @@ where
}; };
match context { match context {
PutRecordContext::Publish | PutRecordContext::Custom => { PutRecordContext::Publish | PutRecordContext::Custom => {
Some(KademliaEvent::OutboundQueryCompleted { Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: get_closest_peers_stats.merge(result.stats), stats: get_closest_peers_stats.merge(result.stats),
result: QueryResult::PutRecord(mk_result(record.key)), result: QueryResult::PutRecord(mk_result(record.key)),
step: ProgressStep::first_and_last(),
}) })
} }
PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: get_closest_peers_stats.merge(result.stats), stats: get_closest_peers_stats.merge(result.stats),
result: QueryResult::RepublishRecord(mk_result(record.key)), result: QueryResult::RepublishRecord(mk_result(record.key)),
step: ProgressStep::first_and_last(),
}), }),
PutRecordContext::Replicate => { PutRecordContext::Replicate => {
debug!("Record replicated: {:?}", record.key); debug!("Record replicated: {:?}", record.key);
None None
} }
PutRecordContext::Cache => {
debug!("Record cached: {:?}", record.key);
None
}
} }
} }
} }
@@ -1440,54 +1476,66 @@ where
QueryInfo::Bootstrap { QueryInfo::Bootstrap {
peer, peer,
mut remaining, mut remaining,
mut step,
} => { } => {
let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32); let num_remaining = remaining.as_ref().map(|r| r.len().saturating_sub(1) as u32);
if let Some(mut remaining) = remaining.take() { // Continue with the next bootstrap query if `remaining` is not empty.
// Continue with the next bootstrap query if `remaining` is not empty. if let Some((target, remaining)) =
if let Some(target) = remaining.next() { remaining.take().and_then(|mut r| Some((r.next()?, r)))
let info = QueryInfo::Bootstrap { {
peer: target.clone().into_preimage(), let info = QueryInfo::Bootstrap {
remaining: Some(remaining), peer: target.clone().into_preimage(),
}; remaining: Some(remaining),
let peers = self.kbuckets.closest_keys(&target); step: step.next(),
let inner = QueryInner::new(info); };
self.queries let peers = self.kbuckets.closest_keys(&target);
.continue_iter_closest(query_id, target.clone(), peers, inner); let inner = QueryInner::new(info);
} self.queries
.continue_iter_closest(query_id, target.clone(), peers, inner);
} else {
step.last = true;
} }
Some(KademliaEvent::OutboundQueryCompleted { Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::Bootstrap(Err(BootstrapError::Timeout { result: QueryResult::Bootstrap(Err(BootstrapError::Timeout {
peer, peer,
num_remaining, num_remaining,
})), })),
step,
}) })
} }
QueryInfo::AddProvider { context, key, .. } => Some(match context { QueryInfo::AddProvider { context, key, .. } => Some(match context {
AddProviderContext::Publish => KademliaEvent::OutboundQueryCompleted { AddProviderContext::Publish => KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })), result: QueryResult::StartProviding(Err(AddProviderError::Timeout { key })),
step: ProgressStep::first_and_last(),
}, },
AddProviderContext::Republish => KademliaEvent::OutboundQueryCompleted { AddProviderContext::Republish => KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })), result: QueryResult::RepublishProvider(Err(AddProviderError::Timeout { key })),
step: ProgressStep::first_and_last(),
}, },
}), }),
QueryInfo::GetClosestPeers { key } => Some(KademliaEvent::OutboundQueryCompleted { QueryInfo::GetClosestPeers { key, mut step } => {
id: query_id, step.last = true;
stats: result.stats,
result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout { Some(KademliaEvent::OutboundQueryProgressed {
key, id: query_id,
peers: result.peers.collect(), stats: result.stats,
})), result: QueryResult::GetClosestPeers(Err(GetClosestPeersError::Timeout {
}), key,
peers: result.peers.collect(),
})),
step,
})
}
QueryInfo::PutRecord { QueryInfo::PutRecord {
record, record,
@@ -1505,16 +1553,18 @@ where
}); });
match context { match context {
PutRecordContext::Publish | PutRecordContext::Custom => { PutRecordContext::Publish | PutRecordContext::Custom => {
Some(KademliaEvent::OutboundQueryCompleted { Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::PutRecord(err), result: QueryResult::PutRecord(err),
step: ProgressStep::first_and_last(),
}) })
} }
PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryCompleted { PutRecordContext::Republish => Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::RepublishRecord(err), result: QueryResult::RepublishRecord(err),
step: ProgressStep::first_and_last(),
}), }),
PutRecordContext::Replicate => match phase { PutRecordContext::Replicate => match phase {
PutRecordPhase::GetClosestPeers => { PutRecordPhase::GetClosestPeers => {
@@ -1526,45 +1576,31 @@ where
None None
} }
}, },
PutRecordContext::Cache => match phase {
PutRecordPhase::GetClosestPeers => {
// Caching a record at the closest peer to a key that did not return
// a record is never preceded by a lookup for the closest peers, i.e.
// it is a direct query to a single peer.
unreachable!()
}
PutRecordPhase::PutRecord { .. } => {
debug!("Caching record failed: {:?}", err);
None
}
},
} }
} }
QueryInfo::GetRecord { QueryInfo::GetRecord { key, mut step, .. } => {
key, step.last = true;
records,
quorum,
..
} => Some(KademliaEvent::OutboundQueryCompleted {
id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(Err(GetRecordError::Timeout {
key,
records,
quorum,
})),
}),
QueryInfo::GetProviders { key, providers } => { Some(KademliaEvent::OutboundQueryProgressed {
Some(KademliaEvent::OutboundQueryCompleted { id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(Err(GetRecordError::Timeout { key })),
step,
})
}
QueryInfo::GetProviders { key, mut step, .. } => {
step.last = true;
Some(KademliaEvent::OutboundQueryProgressed {
id: query_id, id: query_id,
stats: result.stats, stats: result.stats,
result: QueryResult::GetProviders(Err(GetProvidersError::Timeout { result: QueryResult::GetProviders(Err(GetProvidersError::Timeout {
key, key,
providers,
closest_peers: result.peers.collect(), closest_peers: result.peers.collect(),
})), })),
step,
}) })
} }
} }
@@ -2063,10 +2099,32 @@ where
let peers = closer_peers.iter().chain(provider_peers.iter()); let peers = closer_peers.iter().chain(provider_peers.iter());
self.discovered(&user_data, &source, peers); self.discovered(&user_data, &source, peers);
if let Some(query) = self.queries.get_mut(&user_data) { if let Some(query) = self.queries.get_mut(&user_data) {
if let QueryInfo::GetProviders { providers, .. } = &mut query.inner.info { let stats = query.stats().clone();
for peer in provider_peers { if let QueryInfo::GetProviders {
providers.insert(peer.node_id); ref key,
} ref mut providers_found,
ref mut step,
..
} = query.inner.info
{
*providers_found += provider_peers.len();
let providers = provider_peers.iter().map(|p| p.node_id).collect();
self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::OutboundQueryProgressed {
id: user_data,
result: QueryResult::GetProviders(Ok(
GetProvidersOk::FoundProviders {
key: key.clone(),
providers,
},
)),
step: step.clone(),
stats,
},
));
*step = step.next();
} }
} }
} }
@@ -2138,40 +2196,34 @@ where
user_data, user_data,
} => { } => {
if let Some(query) = self.queries.get_mut(&user_data) { if let Some(query) = self.queries.get_mut(&user_data) {
let stats = query.stats().clone();
if let QueryInfo::GetRecord { if let QueryInfo::GetRecord {
key, key,
records, ref mut step,
quorum, ref mut found_a_record,
cache_candidates, cache_candidates,
} = &mut query.inner.info } = &mut query.inner.info
{ {
if let Some(record) = record { if let Some(record) = record {
records.push(PeerRecord { *found_a_record = true;
let record = PeerRecord {
peer: Some(source), peer: Some(source),
record, record,
}); };
let quorum = quorum.get(); self.queued_events
if records.len() >= quorum { .push_back(NetworkBehaviourAction::GenerateEvent(
// Desired quorum reached. The query may finish. See KademliaEvent::OutboundQueryProgressed {
// [`Query::try_finish`] for details. id: user_data,
let peers = records result: QueryResult::GetRecord(Ok(
.iter() GetRecordOk::FoundRecord(record),
.filter_map(|PeerRecord { peer, .. }| peer.as_ref()) )),
.cloned() step: step.clone(),
.collect::<Vec<_>>(); stats,
let finished = query.try_finish(peers.iter()); },
if !finished { ));
debug!(
"GetRecord query ({:?}) reached quorum ({}/{}) with \ *step = step.next();
response from peer {} but could not yet finish.",
user_data,
peers.len(),
quorum,
source,
);
}
}
} else { } else {
log::trace!("Record with key {:?} not found at {}", key, source); log::trace!("Record with key {:?} not found at {}", key, source);
if let KademliaCaching::Enabled { max_peers } = self.caching { if let KademliaCaching::Enabled { max_peers } = self.caching {
@@ -2323,6 +2375,7 @@ where
{ {
query.on_success(&peer_id, vec![]) query.on_success(&peer_id, vec![])
} }
if self.connected_peers.contains(&peer_id) { if self.connected_peers.contains(&peer_id) {
self.queued_events self.queued_events
.push_back(NetworkBehaviourAction::NotifyHandler { .push_back(NetworkBehaviourAction::NotifyHandler {
@@ -2431,14 +2484,16 @@ pub enum KademliaEvent {
// is made of multiple requests across multiple remote peers. // is made of multiple requests across multiple remote peers.
InboundRequest { request: InboundRequest }, InboundRequest { request: InboundRequest },
/// An outbound query has produced a result. /// An outbound query has made progress.
OutboundQueryCompleted { OutboundQueryProgressed {
/// The ID of the query that finished. /// The ID of the query that finished.
id: QueryId, id: QueryId,
/// The result of the query. /// The intermediate result of the query.
result: QueryResult, result: QueryResult,
/// Execution statistics from the query. /// Execution statistics from the query.
stats: QueryStats, stats: QueryStats,
/// Indicates which event this is, if therer are multiple responses for a single query.
step: ProgressStep,
}, },
/// The routing table has been updated with a new peer and / or /// The routing table has been updated with a new peer and / or
@@ -2492,6 +2547,37 @@ pub enum KademliaEvent {
PendingRoutablePeer { peer: PeerId, address: Multiaddr }, PendingRoutablePeer { peer: PeerId, address: Multiaddr },
} }
/// Information about progress events.
#[derive(Debug, Clone)]
pub struct ProgressStep {
/// The index into the event
pub count: NonZeroUsize,
/// Is this the final event?
pub last: bool,
}
impl ProgressStep {
fn first() -> Self {
Self {
count: NonZeroUsize::new(1).expect("1 to be greater than 0."),
last: false,
}
}
fn first_and_last() -> Self {
let mut first = ProgressStep::first();
first.last = true;
first
}
fn next(&self) -> Self {
assert!(!self.last);
let count = NonZeroUsize::new(self.count.get() + 1).expect("Adding 1 not to result in 0.");
Self { count, last: false }
}
}
/// Information about a received and handled inbound request. /// Information about a received and handled inbound request.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum InboundRequest { pub enum InboundRequest {
@@ -2558,19 +2644,20 @@ pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
/// The successful result of [`Kademlia::get_record`]. /// The successful result of [`Kademlia::get_record`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct GetRecordOk { pub enum GetRecordOk {
/// The records found, including the peer that returned them. FoundRecord(PeerRecord),
pub records: Vec<PeerRecord>, FinishedWithNoAdditionalRecord {
/// If caching is enabled, these are the peers closest /// If caching is enabled, these are the peers closest
/// _to the record key_ (not the local node) that were queried but /// _to the record key_ (not the local node) that were queried but
/// did not return the record, sorted by distance to the record key /// did not return the record, sorted by distance to the record key
/// from closest to farthest. How many of these are tracked is configured /// from closest to farthest. How many of these are tracked is configured
/// by [`KademliaConfig::set_caching`]. If the lookup used a quorum of /// by [`KademliaConfig::set_caching`]. If the lookup used a quorum of
/// 1, these peers will be sent the record as a means of caching. /// 1, these peers will be sent the record as a means of caching.
/// If the lookup used a quorum > 1, you may wish to use these /// If the lookup used a quorum > 1, you may wish to use these
/// candidates with [`Kademlia::put_record_to`] after selecting /// candidates with [`Kademlia::put_record_to`] after selecting
/// one of the returned records. /// one of the returned records.
pub cache_candidates: BTreeMap<kbucket::Distance, PeerId>, cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
},
} }
/// The error result of [`Kademlia::get_record`]. /// The error result of [`Kademlia::get_record`].
@@ -2588,11 +2675,7 @@ pub enum GetRecordError {
quorum: NonZeroUsize, quorum: NonZeroUsize,
}, },
#[error("the request timed out")] #[error("the request timed out")]
Timeout { Timeout { key: record::Key },
key: record::Key,
records: Vec<PeerRecord>,
quorum: NonZeroUsize,
},
} }
impl GetRecordError { impl GetRecordError {
@@ -2722,10 +2805,15 @@ pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
/// The successful result of [`Kademlia::get_providers`]. /// The successful result of [`Kademlia::get_providers`].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct GetProvidersOk { pub enum GetProvidersOk {
pub key: record::Key, FoundProviders {
pub providers: HashSet<PeerId>, key: record::Key,
pub closest_peers: Vec<PeerId>, /// The new set of providers discovered.
providers: HashSet<PeerId>,
},
FinishedWithNoAdditionalRecord {
closest_peers: Vec<PeerId>,
},
} }
/// The error result of [`Kademlia::get_providers`]. /// The error result of [`Kademlia::get_providers`].
@@ -2734,7 +2822,6 @@ pub enum GetProvidersError {
#[error("the request timed out")] #[error("the request timed out")]
Timeout { Timeout {
key: record::Key, key: record::Key,
providers: HashSet<PeerId>,
closest_peers: Vec<PeerId>, closest_peers: Vec<PeerId>,
}, },
} }
@@ -2847,11 +2934,6 @@ pub enum PutRecordContext {
/// The context is periodic replication (i.e. without extending /// The context is periodic replication (i.e. without extending
/// the record TTL) of stored records received earlier from another peer. /// the record TTL) of stored records received earlier from another peer.
Replicate, Replicate,
/// The context is an automatic write-back caching operation of a
/// record found via [`Kademlia::get_record`] at the closest node
/// to the key queried that did not return a record. This only
/// occurs after a lookup quorum of 1 as per standard Kademlia.
Cache,
/// The context is a custom store operation targeting specific /// The context is a custom store operation targeting specific
/// peers initiated by [`Kademlia::put_record_to`]. /// peers initiated by [`Kademlia::put_record_to`].
Custom, Custom,
@@ -2871,17 +2953,25 @@ pub enum QueryInfo {
/// yet completed and `Some` with an exhausted iterator /// yet completed and `Some` with an exhausted iterator
/// if bootstrapping is complete. /// if bootstrapping is complete.
remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>, remaining: Option<vec::IntoIter<kbucket::Key<PeerId>>>,
step: ProgressStep,
}, },
/// A query initiated by [`Kademlia::get_closest_peers`]. /// A (repeated) query initiated by [`Kademlia::get_closest_peers`].
GetClosestPeers { key: Vec<u8> }, GetClosestPeers {
/// The key being queried (the preimage).
key: Vec<u8>,
/// Current index of events.
step: ProgressStep,
},
/// A query initiated by [`Kademlia::get_providers`]. /// A (repeated) query initiated by [`Kademlia::get_providers`].
GetProviders { GetProviders {
/// The key for which to search for providers. /// The key for which to search for providers.
key: record::Key, key: record::Key,
/// The found providers. /// The number of providers found so far.
providers: HashSet<PeerId>, providers_found: usize,
/// Current index of events.
step: ProgressStep,
}, },
/// A (repeated) query initiated by [`Kademlia::start_providing`]. /// A (repeated) query initiated by [`Kademlia::start_providing`].
@@ -2905,15 +2995,14 @@ pub enum QueryInfo {
context: PutRecordContext, context: PutRecordContext,
}, },
/// A query initiated by [`Kademlia::get_record`]. /// A (repeated) query initiated by [`Kademlia::get_record`].
GetRecord { GetRecord {
/// The key to look for. /// The key to look for.
key: record::Key, key: record::Key,
/// The records with the id of the peer that returned them. `None` when /// Current index of events.
/// the record was found in the local store. step: ProgressStep,
records: Vec<PeerRecord>, /// Did we find at least one record?
/// The number of records to look for. found_a_record: bool,
quorum: NonZeroUsize,
/// The peers closest to the `key` that were queried but did not return a record, /// The peers closest to the `key` that were queried but did not return a record,
/// i.e. the peers that are candidates for caching the record. /// i.e. the peers that are candidates for caching the record.
cache_candidates: BTreeMap<kbucket::Distance, PeerId>, cache_candidates: BTreeMap<kbucket::Distance, PeerId>,

View File

@@ -188,7 +188,7 @@ fn bootstrap() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::Bootstrap(Ok(ok)), result: QueryResult::Bootstrap(Ok(ok)),
.. ..
@@ -257,8 +257,9 @@ fn query_iter() {
match swarms[0].behaviour_mut().query(&qid) { match swarms[0].behaviour_mut().query(&qid) {
Some(q) => match q.info() { Some(q) => match q.info() {
QueryInfo::GetClosestPeers { key } => { QueryInfo::GetClosestPeers { key, step } => {
assert_eq!(&key[..], search_target.to_bytes().as_slice()) assert_eq!(&key[..], search_target.to_bytes().as_slice());
assert_eq!(usize::from(step.count), 1);
} }
i => panic!("Unexpected query info: {:?}", i), i => panic!("Unexpected query info: {:?}", i),
}, },
@@ -277,7 +278,7 @@ fn query_iter() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::GetClosestPeers(Ok(ok)), result: QueryResult::GetClosestPeers(Ok(ok)),
.. ..
@@ -336,7 +337,7 @@ fn unresponsive_not_returned_direct() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(Ok(ok)), result: QueryResult::GetClosestPeers(Ok(ok)),
.. ..
}, },
@@ -396,7 +397,7 @@ fn unresponsive_not_returned_indirect() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(Ok(ok)), result: QueryResult::GetClosestPeers(Ok(ok)),
.. ..
}, },
@@ -444,16 +445,14 @@ fn get_record_not_found() {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let target_key = record::Key::from(random_multihash()); let target_key = record::Key::from(random_multihash());
let qid = swarms[0] let qid = swarms[0].behaviour_mut().get_record(target_key.clone());
.behaviour_mut()
.get_record(target_key.clone(), Quorum::One);
block_on(poll_fn(move |ctx| { block_on(poll_fn(move |ctx| {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::GetRecord(Err(e)), result: QueryResult::GetRecord(Err(e)),
.. ..
@@ -573,17 +572,19 @@ fn put_record() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::PutRecord(res), result: QueryResult::PutRecord(res),
stats, stats,
step: index,
}, },
))) )))
| Poll::Ready(Some(SwarmEvent::Behaviour( | Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::RepublishRecord(res), result: QueryResult::RepublishRecord(res),
stats, stats,
step: index,
}, },
))) => { ))) => {
assert!(qids.is_empty() || qids.remove(&id)); assert!(qids.is_empty() || qids.remove(&id));
@@ -591,6 +592,8 @@ fn put_record() {
assert!(stats.num_successes() >= replication_factor.get() as u32); assert!(stats.num_successes() >= replication_factor.get() as u32);
assert!(stats.num_requests() >= stats.num_successes()); assert!(stats.num_requests() >= stats.num_successes());
assert_eq!(stats.num_failures(), 0); assert_eq!(stats.num_failures(), 0);
assert_eq!(usize::from(index.count), 1);
assert!(index.last);
match res { match res {
Err(e) => panic!("{:?}", e), Err(e) => panic!("{:?}", e),
Ok(ok) => { Ok(ok) => {
@@ -755,36 +758,35 @@ fn get_record() {
let record = Record::new(random_multihash(), vec![4, 5, 6]); let record = Record::new(random_multihash(), vec![4, 5, 6]);
let expected_cache_candidate = *Swarm::local_peer_id(&swarms[1]);
swarms[2].behaviour_mut().store.put(record.clone()).unwrap(); swarms[2].behaviour_mut().store.put(record.clone()).unwrap();
let qid = swarms[0] let qid = swarms[0].behaviour_mut().get_record(record.key.clone());
.behaviour_mut()
.get_record(record.key.clone(), Quorum::One);
block_on(poll_fn(move |ctx| { block_on(poll_fn(move |ctx| {
for swarm in &mut swarms { for swarm in &mut swarms {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: result: QueryResult::GetRecord(Ok(r)),
QueryResult::GetRecord(Ok(GetRecordOk { step: ProgressStep { count, last },
records,
cache_candidates,
})),
.. ..
}, },
))) => { ))) => {
assert_eq!(id, qid); assert_eq!(id, qid);
assert_eq!(records.len(), 1); if usize::from(count) == 1 {
assert_eq!(records.first().unwrap().record, record); assert!(!last);
assert_eq!(cache_candidates.len(), 1); assert!(matches!(r, GetRecordOk::FoundRecord(_)));
assert_eq!( if let GetRecordOk::FoundRecord(r) = r {
cache_candidates.values().next(), assert_eq!(r.record, record);
Some(&expected_cache_candidate) }
); } else if last {
assert_eq!(usize::from(count), 2);
assert!(matches!(
r,
GetRecordOk::FinishedWithNoAdditionalRecord { .. }
));
}
return Poll::Ready(()); return Poll::Ready(());
} }
// Ignore any other event. // Ignore any other event.
@@ -816,25 +818,34 @@ fn get_record_many() {
} }
let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap()); let quorum = Quorum::N(NonZeroUsize::new(num_results).unwrap());
let qid = swarms[0] let qid = swarms[0].behaviour_mut().get_record(record.key.clone());
.behaviour_mut()
.get_record(record.key.clone(), quorum);
block_on(poll_fn(move |ctx| { block_on(poll_fn(move |ctx| {
for swarm in &mut swarms { for (i, swarm) in swarms.iter_mut().enumerate() {
let mut records = Vec::new();
let quorum = quorum.eval(swarm.behaviour().queries.config().replication_factor);
loop { loop {
if i == 0 && records.len() >= quorum.get() {
swarm.behaviour_mut().query_mut(&qid).unwrap().finish();
}
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })), result: QueryResult::GetRecord(Ok(r)),
step: ProgressStep { count: _, last },
.. ..
}, },
))) => { ))) => {
assert_eq!(id, qid); assert_eq!(id, qid);
assert!(records.len() >= num_results); if let GetRecordOk::FoundRecord(r) = r {
assert!(records.into_iter().all(|r| r.record == record)); assert_eq!(r.record, record);
return Poll::Ready(()); records.push(r);
}
if last {
return Poll::Ready(());
}
} }
// Ignore any other event. // Ignore any other event.
Poll::Ready(Some(_)) => (), Poll::Ready(Some(_)) => (),
@@ -913,14 +924,14 @@ fn add_provider() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::StartProviding(res), result: QueryResult::StartProviding(res),
.. ..
}, },
))) )))
| Poll::Ready(Some(SwarmEvent::Behaviour( | Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
id, id,
result: QueryResult::RepublishProvider(res), result: QueryResult::RepublishProvider(res),
.. ..
@@ -1051,7 +1062,7 @@ fn exceed_jobs_max_queries() {
loop { loop {
if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) { if let Poll::Ready(Some(e)) = swarm.poll_next_unpin(ctx) {
match e { match e {
SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(Ok(r)), result: QueryResult::GetClosestPeers(Ok(r)),
.. ..
}) => break assert!(r.peers.is_empty()), }) => break assert!(r.peers.is_empty()),
@@ -1097,12 +1108,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
// Make `bob` and `trudy` aware of their version of the record searched by // Make `bob` and `trudy` aware of their version of the record searched by
// `alice`. // `alice`.
bob.1.behaviour_mut().store.put(record_bob.clone()).unwrap(); bob.1.behaviour_mut().store.put(record_bob.clone()).unwrap();
trudy trudy.1.behaviour_mut().store.put(record_trudy).unwrap();
.1
.behaviour_mut()
.store
.put(record_trudy.clone())
.unwrap();
// Make `trudy` and `bob` known to `alice`. // Make `trudy` and `bob` known to `alice`.
alice alice
@@ -1118,7 +1124,7 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
let (mut alice, mut bob, mut trudy) = (alice.1, bob.1, trudy.1); let (mut alice, mut bob, mut trudy) = (alice.1, bob.1, trudy.1);
// Have `alice` query the Dht for `key` with a quorum of 1. // Have `alice` query the Dht for `key` with a quorum of 1.
alice.behaviour_mut().get_record(key, Quorum::One); alice.behaviour_mut().get_record(key);
// The default peer timeout is 10 seconds. Choosing 1 seconds here should // The default peer timeout is 10 seconds. Choosing 1 seconds here should
// give enough head room to prevent connections to `bob` to time out. // give enough head room to prevent connections to `bob` to time out.
@@ -1126,25 +1132,32 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
// Poll only `alice` and `trudy` expecting `alice` not yet to return a query // Poll only `alice` and `trudy` expecting `alice` not yet to return a query
// result as it is not able to connect to `bob` just yet. // result as it is not able to connect to `bob` just yet.
let addr_trudy = *Swarm::local_peer_id(&trudy);
block_on(poll_fn(|ctx| { block_on(poll_fn(|ctx| {
for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() { for (i, swarm) in [&mut alice, &mut trudy].iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetRecord(result), result: QueryResult::GetRecord(result),
step,
.. ..
}, },
))) => { ))) => {
if i != 0 { if i != 0 {
panic!("Expected `QueryResult` from Alice.") panic!("Expected `QueryResult` from Alice.")
} }
if step.last {
match result { panic!(
Ok(_) => panic!(
"Expected query not to finish until all \ "Expected query not to finish until all \
disjoint paths have been explored.", disjoint paths have been explored.",
), );
}
match result {
Ok(GetRecordOk::FoundRecord(r)) => {
assert_eq!(r.peer, Some(addr_trudy));
}
Ok(_) => {}
Err(e) => panic!("{:?}", e), Err(e) => panic!("{:?}", e),
} }
} }
@@ -1162,19 +1175,14 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
// Make sure `alice` has exactly one query with `trudy`'s record only. // Make sure `alice` has exactly one query with `trudy`'s record only.
assert_eq!(1, alice.behaviour().queries.iter().count()); assert_eq!(1, alice.behaviour().queries.iter().count());
alice alice
.behaviour() .behaviour()
.queries .queries
.iter() .iter()
.for_each(|q| match &q.inner.info { .for_each(|q| match &q.inner.info {
QueryInfo::GetRecord { records, .. } => { QueryInfo::GetRecord { step, .. } => {
assert_eq!( assert_eq!(usize::from(step.count), 2);
*records,
vec![PeerRecord {
peer: Some(*Swarm::local_peer_id(&trudy)),
record: record_trudy.clone(),
}],
);
} }
i => panic!("Unexpected query info: {:?}", i), i => panic!("Unexpected query info: {:?}", i),
}); });
@@ -1182,21 +1190,32 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
// Poll `alice` and `bob` expecting `alice` to return a successful query // Poll `alice` and `bob` expecting `alice` to return a successful query
// result as it is now able to explore the second disjoint path. // result as it is now able to explore the second disjoint path.
let records = block_on(poll_fn(|ctx| { let records = block_on(poll_fn(|ctx| {
let mut records = Vec::new();
for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() { for (i, swarm) in [&mut alice, &mut bob].iter_mut().enumerate() {
loop { loop {
match swarm.poll_next_unpin(ctx) { match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour( Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryCompleted { KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetRecord(result), result: QueryResult::GetRecord(result),
step,
.. ..
}, },
))) => { ))) => {
if i != 0 { if i != 0 {
panic!("Expected `QueryResult` from Alice.") panic!("Expected `QueryResult` from Alice.")
} }
match result { match result {
Ok(ok) => return Poll::Ready(ok.records), Ok(ok) => {
if let GetRecordOk::FoundRecord(record) = ok {
records.push(record);
}
if records.len() == 1 {
return Poll::Ready(records);
}
if step.last {
break;
}
}
Err(e) => unreachable!("{:?}", e), Err(e) => unreachable!("{:?}", e),
} }
} }
@@ -1211,15 +1230,11 @@ fn disjoint_query_does_not_finish_before_all_paths_did() {
Poll::Pending Poll::Pending
})); }));
assert_eq!(2, records.len()); assert_eq!(1, records.len());
assert!(records.contains(&PeerRecord { assert!(records.contains(&PeerRecord {
peer: Some(*Swarm::local_peer_id(&bob)), peer: Some(*Swarm::local_peer_id(&bob)),
record: record_bob, record: record_bob,
})); }));
assert!(records.contains(&PeerRecord {
peer: Some(*Swarm::local_peer_id(&trudy)),
record: record_trudy,
}));
} }
/// Tests that peers are not automatically inserted into /// Tests that peers are not automatically inserted into
@@ -1339,7 +1354,7 @@ fn network_behaviour_on_address_change() {
} }
#[test] #[test]
fn get_providers() { fn get_providers_single() {
fn prop(key: record::Key) { fn prop(key: record::Key) {
let (_, mut single_swarm) = build_node(); let (_, mut single_swarm) = build_node();
single_swarm single_swarm
@@ -1349,7 +1364,7 @@ fn get_providers() {
block_on(async { block_on(async {
match single_swarm.next().await.unwrap() { match single_swarm.next().await.unwrap() {
SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
result: QueryResult::StartProviding(Ok(_)), result: QueryResult::StartProviding(Ok(_)),
.. ..
}) => {} }) => {}
@@ -1358,30 +1373,137 @@ fn get_providers() {
} }
}); });
let query_id = single_swarm.behaviour_mut().get_providers(key.clone()); let query_id = single_swarm.behaviour_mut().get_providers(key);
block_on(async { block_on(async {
match single_swarm.next().await.unwrap() { loop {
SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted { match single_swarm.next().await.unwrap() {
id, SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
result: id,
QueryResult::GetProviders(Ok(GetProvidersOk { result: QueryResult::GetProviders(Ok(ok)),
key: found_key, step: index,
providers, ..
.. }) if id == query_id => {
})), if index.last {
.. assert!(matches!(
}) if id == query_id => { ok,
assert_eq!(key, found_key); GetProvidersOk::FinishedWithNoAdditionalRecord { .. }
assert_eq!( ));
single_swarm.local_peer_id(), break;
providers.iter().next().unwrap() } else {
); assert!(matches!(ok, GetProvidersOk::FoundProviders { .. }));
if let GetProvidersOk::FoundProviders { providers, .. } = ok {
assert_eq!(providers.len(), 1);
assert!(providers.contains(single_swarm.local_peer_id()));
}
}
}
SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e),
_ => {}
} }
SwarmEvent::Behaviour(e) => panic!("Unexpected event: {:?}", e),
_ => {}
} }
}); });
} }
QuickCheck::new().tests(10).quickcheck(prop as fn(_)) QuickCheck::new().tests(10).quickcheck(prop as fn(_))
} }
fn get_providers_limit<const N: usize>() {
fn prop<const N: usize>(key: record::Key) {
let mut swarms = build_nodes(3);
// Let first peer know of second peer and second peer know of third peer.
for i in 0..2 {
let (peer_id, address) = (
*Swarm::local_peer_id(&swarms[i + 1].1),
swarms[i + 1].0.clone(),
);
swarms[i].1.behaviour_mut().add_address(&peer_id, address);
}
// Drop the swarm addresses.
let mut swarms = swarms
.into_iter()
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();
// Provide the content on peer 2 and 3.
for swarm in swarms.iter_mut().take(3).skip(1) {
swarm
.behaviour_mut()
.start_providing(key.clone())
.expect("could not provide");
}
// Query with expecting a single provider.
let query_id = swarms[0].behaviour_mut().get_providers(key.clone());
let mut all_providers: Vec<PeerId> = vec![];
block_on(poll_fn(move |ctx| {
for (i, swarm) in swarms.iter_mut().enumerate() {
loop {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(SwarmEvent::Behaviour(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(Ok(ok)),
step: index,
..
},
))) if i == 0 && id == query_id => {
if index.last {
assert!(matches!(
ok,
GetProvidersOk::FinishedWithNoAdditionalRecord { .. }
));
assert_eq!(all_providers.len(), N);
return Poll::Ready(());
} else {
assert!(matches!(ok, GetProvidersOk::FoundProviders { .. }));
if let GetProvidersOk::FoundProviders {
key: found_key,
providers,
} = ok
{
// There are a total of 2 providers.
assert_eq!(key, found_key);
for provider in &providers {
// Providers should be either 2 or 3
assert_ne!(swarm.local_peer_id(), provider);
}
all_providers.extend(providers);
// If we have all providers, finish.
if all_providers.len() == N {
swarm.behaviour_mut().query_mut(&id).unwrap().finish();
}
}
return Poll::Ready(());
}
}
Poll::Ready(..) => {}
Poll::Pending => break,
}
}
}
Poll::Pending
}));
}
QuickCheck::new().tests(10).quickcheck(prop::<N> as fn(_))
}
#[test]
fn get_providers_limit_n_1() {
get_providers_limit::<1>();
}
#[test]
fn get_providers_limit_n_2() {
get_providers_limit::<2>();
}
#[test]
fn get_providers_limit_n_5() {
get_providers_limit::<5>();
}

View File

@@ -64,7 +64,7 @@ pub use behaviour::{
}; };
pub use behaviour::{ pub use behaviour::{
Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent, Kademlia, KademliaBucketInserts, KademliaCaching, KademliaConfig, KademliaEvent,
KademliaStoreInserts, Quorum, KademliaStoreInserts, ProgressStep, Quorum,
}; };
pub use protocol::KadConnectionType; pub use protocol::KadConnectionType;
pub use query::QueryId; pub use query::QueryId;