[kad] Store addresses of provider records. (#1708)

* Store addresses of provider records.

So far, provider records are stored without their
addresses and the addresses of provider records are
obtained from the routing table on demand. This has
two shortcomings:

  1. We can only return provider records whose provider
  peers happen to currently be in the local routing table.

  2. The local node never returns itself as a provider for
  a key, even if it is indeed a provider.

These issues are addressed here by storing the addresses
together with the provider records, falling back to
addresses from the routing table only for backward-compatibility
with existing implementations of `RecordStore` using persistent
storage.

Resolves https://github.com/libp2p/rust-libp2p/issues/1526.

* Update protocols/kad/src/behaviour.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Remove negligible use of with_capacity.

* Update changelog.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Roman Borschel 2020-08-18 14:14:31 +02:00 committed by GitHub
parent 269a2ac2a8
commit e1df920703
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 82 additions and 12 deletions

View File

@ -1,5 +1,8 @@
# 0.22.0 [unreleased] # 0.22.0 [unreleased]
- Store addresses in provider records.
See [PR 1708](https://github.com/libp2p/rust-libp2p/pull/1708).
- Update `libp2p-core` and `libp2p-swarm` dependencies. - Update `libp2p-core` and `libp2p-swarm` dependencies.
- Add `KBucketRef::range` exposing the minimum inclusive and maximum inclusive - Add `KBucketRef::range` exposing the minimum inclusive and maximum inclusive

View File

@ -92,6 +92,9 @@ pub struct Kademlia<TStore> {
/// Queued events to return when the behaviour is being polled. /// Queued events to return when the behaviour is being polled.
queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>, queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
/// The currently known addresses of the local node.
local_addrs: HashSet<Multiaddr>,
/// The record storage. /// The record storage.
store: TStore, store: TStore,
} }
@ -358,6 +361,7 @@ where
record_ttl: config.record_ttl, record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl, provider_record_ttl: config.provider_record_ttl,
connection_idle_timeout: config.connection_idle_timeout, connection_idle_timeout: config.connection_idle_timeout,
local_addrs: HashSet::new()
} }
} }
@ -708,7 +712,14 @@ where
/// The results of the (repeated) provider announcements sent by this node are /// The results of the (repeated) provider announcements sent by this node are
/// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`]. /// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`].
pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> { pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone()); // Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead these are
// acquired on demand when returning a `ProviderRecord` for the local node.
let local_addrs = Vec::new();
let record = ProviderRecord::new(
key.clone(),
self.kbuckets.local_key().preimage().clone(),
local_addrs);
self.store.add_provider(record)?; self.store.add_provider(record)?;
let target = kbucket::Key::new(key.clone()); let target = kbucket::Key::new(key.clone());
let peers = self.kbuckets.closest_keys(&target); let peers = self.kbuckets.closest_keys(&target);
@ -784,12 +795,42 @@ where
/// Collects all peers who are known to be providers of the value for a given `Multihash`. /// Collects all peers who are known to be providers of the value for a given `Multihash`.
fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> { fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
let kbuckets = &mut self.kbuckets; let kbuckets = &mut self.kbuckets;
let connected = &mut self.connected_peers;
let local_addrs = &self.local_addrs;
self.store.providers(key) self.store.providers(key)
.into_iter() .into_iter()
.filter_map(move |p| .filter_map(move |p|
if &p.provider != source { if &p.provider != source {
let key = kbucket::Key::new(p.provider.clone()); let node_id = p.provider;
kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned())) let multiaddrs = p.addresses;
let connection_ty = if connected.contains(&node_id) {
KadConnectionType::Connected
} else {
KadConnectionType::NotConnected
};
if multiaddrs.is_empty() {
// The provider is either the local node and we fill in
// the local addresses on demand, or it is a legacy
// provider record without addresses, in which case we
// try to find addresses in the routing table, as was
// done before provider records were stored along with
// their addresses.
if &node_id == kbuckets.local_key().preimage() {
Some(local_addrs.iter().cloned().collect::<Vec<_>>())
} else {
let key = kbucket::Key::new(node_id.clone());
kbuckets.entry(&key).view().map(|e| e.node.value.clone().into_vec())
}
} else {
Some(multiaddrs)
}
.map(|multiaddrs| {
KadPeer {
node_id,
multiaddrs,
connection_ty,
}
})
} else { } else {
None None
}) })
@ -1367,7 +1408,8 @@ where
let record = ProviderRecord { let record = ProviderRecord {
key, key,
provider: provider.node_id, provider: provider.node_id,
expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl) expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
addresses: provider.multiaddrs,
}; };
if let Err(e) = self.store.add_provider(record) { if let Err(e) = self.store.add_provider(record) {
info!("Provider record not stored: {:?}", e); info!("Provider record not stored: {:?}", e);
@ -1746,6 +1788,20 @@ where
}; };
} }
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.local_addrs.insert(addr.clone());
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.local_addrs.remove(addr);
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS {
self.local_addrs.insert(addr.clone());
}
}
fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll< fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction< NetworkBehaviourAction<
<KademliaHandler<QueryId> as ProtocolsHandler>::InEvent, <KademliaHandler<QueryId> as ProtocolsHandler>::InEvent,
@ -2497,3 +2553,8 @@ pub enum RoutingUpdate {
/// peer ID). /// peer ID).
Failed, Failed,
} }
/// The maximum number of local external addresses. When reached any
/// further externally reported addresses are ignored. The behaviour always
/// tracks all its listen addresses.
const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20;

View File

@ -23,7 +23,7 @@
pub mod store; pub mod store;
use bytes::Bytes; use bytes::Bytes;
use libp2p_core::PeerId; use libp2p_core::{PeerId, Multiaddr};
use multihash::Multihash; use multihash::Multihash;
use std::borrow::Borrow; use std::borrow::Borrow;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
@ -112,6 +112,8 @@ pub struct ProviderRecord {
pub provider: PeerId, pub provider: PeerId,
/// The expiration time as measured by a local, monotonic clock. /// The expiration time as measured by a local, monotonic clock.
pub expires: Option<Instant>, pub expires: Option<Instant>,
/// The known addresses that the provider may be listening on.
pub addresses: Vec<Multiaddr>
} }
impl Hash for ProviderRecord { impl Hash for ProviderRecord {
@ -123,12 +125,15 @@ impl Hash for ProviderRecord {
impl ProviderRecord { impl ProviderRecord {
/// Creates a new provider record for insertion into a `RecordStore`. /// Creates a new provider record for insertion into a `RecordStore`.
pub fn new<K>(key: K, provider: PeerId) -> Self pub fn new<K>(key: K, provider: PeerId, addresses: Vec<Multiaddr>) -> Self
where where
K: Into<Key> K: Into<Key>
{ {
ProviderRecord { ProviderRecord {
key: key.into(), provider, expires: None key: key.into(),
provider,
expires: None,
addresses,
} }
} }
@ -178,6 +183,7 @@ mod tests {
} else { } else {
None None
}, },
addresses: vec![],
} }
} }
} }

View File

@ -259,7 +259,7 @@ mod tests {
let key = Key::from(random_multihash()); let key = Key::from(random_multihash());
let mut records = providers.into_iter().map(|p| { let mut records = providers.into_iter().map(|p| {
ProviderRecord::new(key.clone(), p.into_preimage()) ProviderRecord::new(key.clone(), p.into_preimage(), Vec::new())
}).collect::<Vec<_>>(); }).collect::<Vec<_>>();
for r in &records { for r in &records {
@ -280,7 +280,7 @@ mod tests {
let id = PeerId::random(); let id = PeerId::random();
let mut store = MemoryStore::new(id.clone()); let mut store = MemoryStore::new(id.clone());
let key = random_multihash(); let key = random_multihash();
let rec = ProviderRecord::new(key, id.clone()); let rec = ProviderRecord::new(key, id.clone(), Vec::new());
assert!(store.add_provider(rec.clone()).is_ok()); assert!(store.add_provider(rec.clone()).is_ok());
assert_eq!(vec![Cow::Borrowed(&rec)], store.provided().collect::<Vec<_>>()); assert_eq!(vec![Cow::Borrowed(&rec)], store.provided().collect::<Vec<_>>());
store.remove_provider(&rec.key, &id); store.remove_provider(&rec.key, &id);
@ -292,7 +292,7 @@ mod tests {
let mut store = MemoryStore::new(PeerId::random()); let mut store = MemoryStore::new(PeerId::random());
let key = random_multihash(); let key = random_multihash();
let prv = PeerId::random(); let prv = PeerId::random();
let mut rec = ProviderRecord::new(key, prv); let mut rec = ProviderRecord::new(key, prv, Vec::new());
assert!(store.add_provider(rec.clone()).is_ok()); assert!(store.add_provider(rec.clone()).is_ok());
assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec()); assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
rec.expires = Some(Instant::now()); rec.expires = Some(Instant::now());
@ -306,12 +306,12 @@ mod tests {
for _ in 0 .. store.config.max_provided_keys { for _ in 0 .. store.config.max_provided_keys {
let key = random_multihash(); let key = random_multihash();
let prv = PeerId::random(); let prv = PeerId::random();
let rec = ProviderRecord::new(key, prv); let rec = ProviderRecord::new(key, prv, Vec::new());
let _ = store.add_provider(rec); let _ = store.add_provider(rec);
} }
let key = random_multihash(); let key = random_multihash();
let prv = PeerId::random(); let prv = PeerId::random();
let rec = ProviderRecord::new(key, prv); let rec = ProviderRecord::new(key, prv, Vec::new());
match store.add_provider(rec) { match store.add_provider(rec) {
Err(Error::MaxProvidedKeys) => {} Err(Error::MaxProvidedKeys) => {}
_ => panic!("Unexpected result"), _ => panic!("Unexpected result"),