Generalise record keys. (#1215)

Generalise record keys from Multihash to a new opaque record::Key type.
This commit is contained in:
Roman Borschel
2019-08-15 11:36:47 +02:00
committed by GitHub
parent c154771de0
commit 56c14071d8
8 changed files with 190 additions and 144 deletions

View File

@ -29,13 +29,12 @@ use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{store::{self, RecordStore}, Record, ProviderRecord};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use log::{info, debug, warn};
use multihash::Multihash;
use smallvec::SmallVec;
use std::{borrow::Cow, error, iter, marker::PhantomData, time::Duration};
use std::collections::VecDeque;
@ -337,11 +336,11 @@ where
/// The result of this operation is delivered in [`KademliaEvent::GetClosestPeersResult`].
pub fn get_closest_peers<K>(&mut self, key: K)
where
K: Into<Multihash> + Clone
K: AsRef<[u8]> + Clone
{
let multihash = key.into();
let info = QueryInfo::GetClosestPeers { key: multihash.clone() };
let target = kbucket::Key::new(multihash);
let key = key.as_ref().to_vec();
let info = QueryInfo::GetClosestPeers { key: key.clone() };
let target = kbucket::Key::new(key);
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner);
@ -350,7 +349,7 @@ where
/// Performs a lookup for a record in the DHT.
///
/// The result of this operation is delivered in [`KademliaEvent::GetRecordResult`].
pub fn get_record(&mut self, key: &Multihash, quorum: Quorum) {
pub fn get_record(&mut self, key: &record::Key, quorum: Quorum) {
let quorum = quorum.eval(self.queries.config().replication_factor);
let mut records = Vec::with_capacity(quorum.get());
@ -368,7 +367,7 @@ where
}
}
let target = kbucket::Key::from(key.clone());
let target = kbucket::Key::new(key.clone());
let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
@ -404,7 +403,7 @@ where
record.expires = record.expires.or_else(||
self.record_ttl.map(|ttl| Instant::now() + ttl));
let quorum = quorum.eval(self.queries.config().replication_factor);
let target = kbucket::Key::from(record.key.clone());
let target = kbucket::Key::new(record.key.clone());
let peers = self.kbuckets.closest_keys(&target);
let context = PutRecordContext::Publish;
let info = QueryInfo::PreparePutRecord { record, quorum, context };
@ -422,7 +421,7 @@ where
/// This is a _local_ operation. However, it also has the effect that
/// the record will no longer be periodically re-published, allowing the
/// record to eventually expire throughout the DHT.
pub fn remove_record(&mut self, key: &Multihash) {
pub fn remove_record(&mut self, key: &record::Key) {
if let Some(r) = self.store.get(key) {
if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
self.store.remove(key)
@ -478,7 +477,7 @@ where
///
/// The results of the (repeated) provider announcements sent by this node are
/// delivered in [`KademliaEvent::AddProviderResult`].
pub fn start_providing(&mut self, key: Multihash) {
pub fn start_providing(&mut self, key: record::Key) {
let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone());
if let Err(err) = self.store.add_provider(record) {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
@ -487,7 +486,7 @@ where
))
));
} else {
let target = kbucket::Key::from(key.clone());
let target = kbucket::Key::new(key.clone());
let peers = self.kbuckets.closest_keys(&target);
let context = AddProviderContext::Publish;
let info = QueryInfo::PrepareAddProvider { key, context };
@ -500,19 +499,19 @@ where
///
/// This is a local operation. The local node will still be considered as a
/// provider for the key by other nodes until these provider records expire.
pub fn stop_providing(&mut self, key: &Multihash) {
pub fn stop_providing(&mut self, key: &record::Key) {
self.store.remove_provider(key, self.kbuckets.local_key().preimage());
}
/// Performs a lookup for providers of a value to the given key.
///
/// The result of this operation is delivered in [`KademliaEvent::GetProvidersResult`].
pub fn get_providers(&mut self, key: Multihash) {
pub fn get_providers(&mut self, key: record::Key) {
let info = QueryInfo::GetProviders {
key: key.clone(),
providers: Vec::new(),
};
let target = kbucket::Key::from(key);
let target = kbucket::Key::new(key);
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner);
@ -562,7 +561,7 @@ where
}
/// Collects all peers who are known to be providers of the value for a given `Multihash`.
fn provider_peers(&mut self, key: &Multihash, source: &PeerId) -> Vec<KadPeer> {
fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
let kbuckets = &mut self.kbuckets;
self.store.providers(key)
.into_iter()
@ -578,9 +577,9 @@ where
}
/// Starts an iterative `ADD_PROVIDER` query for the given key.
fn start_add_provider(&mut self, key: Multihash, context: AddProviderContext) {
fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
let info = QueryInfo::PrepareAddProvider { key: key.clone(), context };
let target = kbucket::Key::from(key);
let target = kbucket::Key::new(key);
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner);
@ -589,7 +588,7 @@ where
/// Starts an iterative `PUT_VALUE` query for the given record.
fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
let quorum = quorum.eval(self.queries.config().replication_factor);
let target = kbucket::Key::from(record.key.clone());
let target = kbucket::Key::new(record.key.clone());
let peers = self.kbuckets.closest_keys(&target);
let info = QueryInfo::PreparePutRecord { record, quorum, context };
let inner = QueryInner::new(info);
@ -791,7 +790,7 @@ where
}
QueryInfo::PutRecord { record, quorum, num_results, context } => {
let result = |key: Multihash| {
let result = |key: record::Key| {
if num_results >= quorum.get() {
Ok(PutRecordOk { key })
} else {
@ -933,7 +932,7 @@ where
// number of nodes between the local node and the closest node to the key
// (beyond the replication factor). This ensures avoiding over-caching
// outside of the k closest nodes to a key.
let target = kbucket::Key::from(record.key.clone());
let target = kbucket::Key::new(record.key.clone());
let num_between = self.kbuckets.count_nodes_between(&target);
let k = self.queries.config().replication_factor.get();
let num_beyond_k = (usize::max(k, num_between) - k) as u32;
@ -988,7 +987,7 @@ where
}
/// Processes a provider record received from a peer.
fn provider_received(&mut self, key: Multihash, provider: KadPeer) {
fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::Discovered {
peer_id: provider.node_id.clone(),
@ -1167,7 +1166,7 @@ where
KademliaHandlerEvent::GetProvidersReq { key, request_id } => {
let provider_peers = self.provider_peers(&key, &source);
let closer_peers = self.find_closest(&kbucket::Key::from(key), &source);
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: source,
event: KademliaHandlerIn::GetProvidersRes {
@ -1230,7 +1229,7 @@ where
// If no record is found, at least report known closer peers.
let closer_peers =
if record.is_none() {
self.find_closest(&kbucket::Key::from(key), &source)
self.find_closest(&kbucket::Key::new(key), &source)
} else {
Vec::new()
};
@ -1266,7 +1265,7 @@ where
// that node if the query turns out to be successful.
let source_key = kbucket::Key::from(source.clone());
if let Some(cache_key) = cache_at {
let key = kbucket::Key::from(key.clone());
let key = kbucket::Key::new(key.clone());
if source_key.distance(&key) < cache_key.distance(&key) {
*cache_at = Some(source_key)
}
@ -1511,14 +1510,25 @@ pub struct GetRecordOk {
/// The error result of [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub enum GetRecordError {
NotFound { key: Multihash, closest_peers: Vec<PeerId> },
QuorumFailed { key: Multihash, records: Vec<Record>, quorum: NonZeroUsize },
Timeout { key: Multihash, records: Vec<Record>, quorum: NonZeroUsize }
NotFound {
key: record::Key,
closest_peers: Vec<PeerId>
},
QuorumFailed {
key: record::Key,
records: Vec<Record>,
quorum: NonZeroUsize
},
Timeout {
key: record::Key,
records: Vec<Record>,
quorum: NonZeroUsize
}
}
impl GetRecordError {
/// Gets the key of the record for which the operation failed.
pub fn key(&self) -> &Multihash {
pub fn key(&self) -> &record::Key {
match self {
GetRecordError::QuorumFailed { key, .. } => key,
GetRecordError::Timeout { key, .. } => key,
@ -1528,7 +1538,7 @@ impl GetRecordError {
/// Extracts the key of the record for which the operation failed,
/// consuming the error.
pub fn into_key(self) -> Multihash {
pub fn into_key(self) -> record::Key {
match self {
GetRecordError::QuorumFailed { key, .. } => key,
GetRecordError::Timeout { key, .. } => key,
@ -1543,31 +1553,31 @@ pub type PutRecordResult = Result<PutRecordOk, PutRecordError>;
/// The successful result of [`Kademlia::put_record`].
#[derive(Debug, Clone)]
pub struct PutRecordOk {
pub key: Multihash
pub key: record::Key
}
/// The error result of [`Kademlia::put_record`].
#[derive(Debug)]
pub enum PutRecordError {
QuorumFailed {
key: Multihash,
key: record::Key,
num_results: usize,
quorum: NonZeroUsize
},
Timeout {
key: Multihash,
key: record::Key,
num_results: usize,
quorum: NonZeroUsize
},
LocalStorageError {
key: Multihash,
key: record::Key,
cause: store::Error
}
}
impl PutRecordError {
/// Gets the key of the record for which the operation failed.
pub fn key(&self) -> &Multihash {
pub fn key(&self) -> &record::Key {
match self {
PutRecordError::QuorumFailed { key, .. } => key,
PutRecordError::Timeout { key, .. } => key,
@ -1577,7 +1587,7 @@ impl PutRecordError {
/// Extracts the key of the record for which the operation failed,
/// consuming the error.
pub fn into_key(self) -> Multihash {
pub fn into_key(self) -> record::Key {
match self {
PutRecordError::QuorumFailed { key, .. } => key,
PutRecordError::Timeout { key, .. } => key,
@ -1607,7 +1617,7 @@ pub type GetClosestPeersResult = Result<GetClosestPeersOk, GetClosestPeersError>
/// The successful result of [`Kademlia::get_closest_peers`].
#[derive(Debug, Clone)]
pub struct GetClosestPeersOk {
pub key: Multihash,
pub key: Vec<u8>,
pub peers: Vec<PeerId>
}
@ -1615,14 +1625,14 @@ pub struct GetClosestPeersOk {
#[derive(Debug, Clone)]
pub enum GetClosestPeersError {
Timeout {
key: Multihash,
key: Vec<u8>,
peers: Vec<PeerId>
}
}
impl GetClosestPeersError {
/// Gets the key for which the operation failed.
pub fn key(&self) -> &Multihash {
pub fn key(&self) -> &Vec<u8> {
match self {
GetClosestPeersError::Timeout { key, .. } => key,
}
@ -1630,7 +1640,7 @@ impl GetClosestPeersError {
/// Extracts the key for which the operation failed,
/// consuming the error.
pub fn into_key(self) -> Multihash {
pub fn into_key(self) -> Vec<u8> {
match self {
GetClosestPeersError::Timeout { key, .. } => key,
}
@ -1643,7 +1653,7 @@ pub type GetProvidersResult = Result<GetProvidersOk, GetProvidersError>;
/// The successful result of [`Kademlia::get_providers`].
#[derive(Debug, Clone)]
pub struct GetProvidersOk {
pub key: Multihash,
pub key: record::Key,
pub providers: Vec<PeerId>,
pub closest_peers: Vec<PeerId>
}
@ -1652,7 +1662,7 @@ pub struct GetProvidersOk {
#[derive(Debug, Clone)]
pub enum GetProvidersError {
Timeout {
key: Multihash,
key: record::Key,
providers: Vec<PeerId>,
closest_peers: Vec<PeerId>
}
@ -1660,7 +1670,7 @@ pub enum GetProvidersError {
impl GetProvidersError {
/// Gets the key for which the operation failed.
pub fn key(&self) -> &Multihash {
pub fn key(&self) -> &record::Key {
match self {
GetProvidersError::Timeout { key, .. } => key,
}
@ -1668,7 +1678,7 @@ impl GetProvidersError {
/// Extracts the key for which the operation failed,
/// consuming the error.
pub fn into_key(self) -> Multihash {
pub fn into_key(self) -> record::Key {
match self {
GetProvidersError::Timeout { key, .. } => key,
}
@ -1681,7 +1691,7 @@ pub type AddProviderResult = Result<AddProviderOk, AddProviderError>;
/// The successful result of publishing a provider record.
#[derive(Debug, Clone)]
pub struct AddProviderOk {
pub key: Multihash,
pub key: record::Key,
}
/// The possible errors when publishing a provider record.
@ -1689,18 +1699,18 @@ pub struct AddProviderOk {
pub enum AddProviderError {
/// The query timed out.
Timeout {
key: Multihash,
key: record::Key,
},
/// The provider record could not be stored.
LocalStorageError {
key: Multihash,
key: record::Key,
cause: store::Error
}
}
impl AddProviderError {
/// Gets the key for which the operation failed.
pub fn key(&self) -> &Multihash {
pub fn key(&self) -> &record::Key {
match self {
AddProviderError::Timeout { key, .. } => key,
AddProviderError::LocalStorageError { key, .. } => key,
@ -1709,7 +1719,7 @@ impl AddProviderError {
/// Extracts the key for which the operation failed,
/// consuming the error.
pub fn into_key(self) -> Multihash {
pub fn into_key(self) -> record::Key {
match self {
AddProviderError::Timeout { key, .. } => key,
AddProviderError::LocalStorageError { key, .. } => key,
@ -1779,12 +1789,12 @@ enum QueryInfo {
},
/// A query to find the closest peers to a key.
GetClosestPeers { key: Multihash },
GetClosestPeers { key: Vec<u8> },
/// A query for the providers of a key.
GetProviders {
/// The key for which to search for providers.
key: Multihash,
key: record::Key,
/// The found providers.
providers: Vec<PeerId>,
},
@ -1792,13 +1802,13 @@ enum QueryInfo {
/// A query that searches for the closest closest nodes to a key to be
/// used in a subsequent `AddProvider` query.
PrepareAddProvider {
key: Multihash,
key: record::Key,
context: AddProviderContext,
},
/// A query that advertises the local node as a provider for a key.
AddProvider {
key: Multihash,
key: record::Key,
provider_id: PeerId,
external_addresses: Vec<Multiaddr>,
context: AddProviderContext,
@ -1823,7 +1833,7 @@ enum QueryInfo {
/// A query that searches for values for a key.
GetRecord {
/// The key to look for.
key: Multihash,
key: record::Key,
/// The records found.
records: Vec<Record>,
/// The number of records to look for.
@ -1842,7 +1852,7 @@ impl QueryInfo {
fn to_request(&self, query_id: QueryId) -> KademliaHandlerIn<QueryId> {
match &self {
QueryInfo::Bootstrap { peer } => KademliaHandlerIn::FindNodeReq {
key: peer.clone().into(),
key: peer.clone().into_bytes(),
user_data: query_id,
},
QueryInfo::GetClosestPeers { key, .. } => KademliaHandlerIn::FindNodeReq {
@ -1854,7 +1864,7 @@ impl QueryInfo {
user_data: query_id,
},
QueryInfo::PrepareAddProvider { key, .. } => KademliaHandlerIn::FindNodeReq {
key: key.clone(),
key: key.to_vec(),
user_data: query_id,
},
QueryInfo::AddProvider {
@ -1875,7 +1885,7 @@ impl QueryInfo {
user_data: query_id,
},
QueryInfo::PreparePutRecord { record, .. } => KademliaHandlerIn::FindNodeReq {
key: record.key.clone(),
key: record.key.to_vec(),
user_data: query_id,
},
QueryInfo::PutRecord { record, .. } => KademliaHandlerIn::PutRecord {

View File

@ -43,7 +43,7 @@ use quickcheck::*;
use rand::{Rng, random, thread_rng};
use std::{collections::{HashSet, HashMap}, io, num::NonZeroUsize, u64};
use tokio::runtime::current_thread;
use multihash::Hash::SHA2256;
use multihash::{Multihash, Hash::SHA2256};
type TestSwarm = Swarm<
Boxed<(PeerId, StreamMuxerBox), io::Error>,
@ -170,7 +170,7 @@ fn query_iter() {
// Ask the first peer in the list to search a random peer. The search should
// propagate forwards through the list of peers.
let search_target = PeerId::random();
let search_target_key = kbucket::Key::from(search_target.clone());
let search_target_key = kbucket::Key::new(search_target.clone());
swarms[0].get_closest_peers(search_target.clone());
// Set up expectations.
@ -186,7 +186,7 @@ fn query_iter() {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(ok.key, search_target);
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(swarm_ids[i], expected_swarm_id);
assert_eq!(swarm.queries.size(), 0);
assert!(expected_peer_ids.iter().all(|p| ok.peers.contains(p)));
@ -231,7 +231,7 @@ fn unresponsive_not_returned_direct() {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(ok.key, search_target);
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 0);
return Ok(Async::Ready(()));
}
@ -272,7 +272,7 @@ fn unresponsive_not_returned_indirect() {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaEvent::GetClosestPeersResult(Ok(ok)))) => {
assert_eq!(ok.key, search_target);
assert_eq!(&ok.key[..], search_target.as_bytes());
assert_eq!(ok.peers.len(), 1);
assert_eq!(ok.peers[0], first_peer_id);
return Ok(Async::Ready(()));
@ -296,7 +296,7 @@ fn get_record_not_found() {
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
let target_key = multihash::encode(SHA2256, &vec![1,2,3]).unwrap();
let target_key = record::Key::from(Multihash::random(SHA2256));
swarms[0].get_record(&target_key, Quorum::One);
current_thread::run(
@ -449,7 +449,7 @@ fn get_value() {
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
let record = Record::new(multihash::encode(SHA2256, &vec![1,2,3]).unwrap(), vec![4,5,6]);
let record = Record::new(Multihash::random(SHA2256), vec![4,5,6]);
swarms[1].store.put(record.clone()).unwrap();
swarms[0].get_record(&record.key, Quorum::One);
@ -481,7 +481,7 @@ fn get_value_many() {
let (_, mut swarms) = build_connected_nodes(num_nodes, num_nodes);
let num_results = 10;
let record = Record::new(multihash::encode(SHA2256, &vec![1,2,3]).unwrap(), vec![4,5,6]);
let record = Record::new(Multihash::random(SHA2256), vec![4,5,6]);
for i in 0 .. num_nodes {
swarms[i].store.put(record.clone()).unwrap();
@ -511,7 +511,7 @@ fn get_value_many() {
#[test]
fn add_provider() {
fn prop(replication_factor: usize, keys: Vec<kbucket::Key<Multihash>>) {
fn prop(replication_factor: usize, keys: Vec<record::Key>) {
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap();
let num_total = replication_factor.get() * 2;
let num_group = replication_factor.get();
@ -531,7 +531,7 @@ fn add_provider() {
// Initiate the first round of publishing.
for k in &keys {
swarms[0].start_providing(k.preimage().clone());
swarms[0].start_providing(k.clone());
}
current_thread::run(
@ -545,9 +545,8 @@ fn add_provider() {
match res {
Err(e) => panic!(e),
Ok(ok) => {
let key = kbucket::Key::new(ok.key.clone());
assert!(keys.contains(&key));
results.push(key);
assert!(keys.contains(&ok.key));
results.push(ok.key);
}
}
}
@ -574,7 +573,7 @@ fn add_provider() {
// Collect the nodes that have a provider record for `key`.
let actual = swarms.iter().enumerate().skip(1)
.filter_map(|(i, s)|
if s.store.providers(key.preimage()).len() == 1 {
if s.store.providers(&key).len() == 1 {
Some(swarm_ids[i].clone())
} else {
None
@ -588,9 +587,10 @@ fn add_provider() {
}
let mut expected = swarm_ids.clone().split_off(1);
let kbucket_key = kbucket::Key::new(key);
expected.sort_by(|id1, id2|
kbucket::Key::new(id1).distance(&key).cmp(
&kbucket::Key::new(id2).distance(&key)));
kbucket::Key::new(id1).distance(&kbucket_key).cmp(
&kbucket::Key::new(id2).distance(&kbucket_key)));
let expected = expected
.into_iter()
@ -609,7 +609,7 @@ fn add_provider() {
if republished {
assert_eq!(swarms[0].store.provided().count(), keys.len());
for k in &keys {
swarms[0].stop_providing(k.preimage());
swarms[0].stop_providing(&k);
}
assert_eq!(swarms[0].store.provided().count(), 0);
// All records have been republished, thus the test is complete.

View File

@ -22,7 +22,7 @@ use crate::protocol::{
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
KademliaProtocolConfig,
};
use crate::record::Record;
use crate::record::{self, Record};
use futures::prelude::*;
use libp2p_swarm::{
KeepAlive,
@ -36,7 +36,6 @@ use libp2p_core::{
upgrade::{self, InboundUpgrade, OutboundUpgrade, Negotiated}
};
use log::trace;
use multihash::Multihash;
use std::{borrow::Cow, error, fmt, io, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use wasm_timer::Instant;
@ -139,7 +138,7 @@ pub enum KademliaHandlerEvent<TUserData> {
/// returned is not specified, but should be around 20.
FindNodeReq {
/// The key for which to locate the closest nodes.
key: Multihash,
key: Vec<u8>,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
},
@ -155,8 +154,8 @@ pub enum KademliaHandlerEvent<TUserData> {
/// Same as `FindNodeReq`, but should also return the entries of the local providers list for
/// this key.
GetProvidersReq {
/// Identifier being searched.
key: Multihash,
/// The key for which providers are requested.
key: record::Key,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
},
@ -182,7 +181,7 @@ pub enum KademliaHandlerEvent<TUserData> {
/// The peer announced itself as a provider of a key.
AddProvider {
/// The key for which the peer is a provider of the associated value.
key: Multihash,
key: record::Key,
/// The peer that is the provider of the value for `key`.
provider: KadPeer,
},
@ -190,7 +189,7 @@ pub enum KademliaHandlerEvent<TUserData> {
/// Request to get a value from the dht records
GetRecord {
/// Key for which we should look in the dht
key: Multihash,
key: record::Key,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
},
@ -215,7 +214,7 @@ pub enum KademliaHandlerEvent<TUserData> {
/// Response to a request to store a record.
PutRecordRes {
/// The key of the stored record.
key: Multihash,
key: record::Key,
/// The value of the stored record.
value: Vec<u8>,
/// The user data passed to the `PutValue`.
@ -283,7 +282,7 @@ pub enum KademliaHandlerIn<TUserData> {
/// returned is not specified, but should be around 20.
FindNodeReq {
/// Identifier of the node.
key: Multihash,
key: Vec<u8>,
/// Custom user data. Passed back in the out event when the results arrive.
user_data: TUserData,
},
@ -302,7 +301,7 @@ pub enum KademliaHandlerIn<TUserData> {
/// this key.
GetProvidersReq {
/// Identifier being searched.
key: Multihash,
key: record::Key,
/// Custom user data. Passed back in the out event when the results arrive.
user_data: TUserData,
},
@ -325,7 +324,7 @@ pub enum KademliaHandlerIn<TUserData> {
/// succeeded.
AddProvider {
/// Key for which we should add providers.
key: Multihash,
key: record::Key,
/// Known provider for this key.
provider: KadPeer,
},
@ -333,7 +332,7 @@ pub enum KademliaHandlerIn<TUserData> {
/// Request to retrieve a record from the DHT.
GetRecord {
/// The key of the record.
key: Multihash,
key: record::Key,
/// Custom data. Passed back in the out event when the results arrive.
user_data: TUserData,
},
@ -358,7 +357,7 @@ pub enum KademliaHandlerIn<TUserData> {
/// Response to a `PutRecord`.
PutRecordRes {
/// Key of the value that was put.
key: Multihash,
key: record::Key,
/// Value that was put.
value: Vec<u8>,
/// Identifier of the request that was made by the remote.
@ -517,7 +516,7 @@ where
}
}
KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key: key.clone() };
let msg = KadRequestMsg::GetProviders { key };
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
}
@ -550,17 +549,12 @@ where
}
}
KademliaHandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider {
key: key.clone(),
provider: provider.clone(),
};
self.substreams
.push(SubstreamState::OutPendingOpen(msg, None));
let msg = KadRequestMsg::AddProvider { key, provider };
self.substreams.push(SubstreamState::OutPendingOpen(msg, None));
}
KademliaHandlerIn::GetRecord { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::PutRecord { record, user_data } => {

View File

@ -61,11 +61,9 @@
//! > to the size of all stored records. As a job runs, the records are moved
//! > out of the job to the consumer, where they can be dropped after being sent.
use crate::record::{Record, ProviderRecord, store::RecordStore};
use crate::record::{self, Record, ProviderRecord, store::RecordStore};
use libp2p_core::PeerId;
use futures::prelude::*;
use multihash::Multihash;
use std::collections::HashSet;
use std::time::Duration;
use std::vec;
@ -131,7 +129,7 @@ pub struct PutRecordJob {
next_publish: Option<Instant>,
publish_interval: Option<Duration>,
record_ttl: Option<Duration>,
skipped: HashSet<Multihash>,
skipped: HashSet<record::Key>,
inner: PeriodicJob<vec::IntoIter<Record>>,
}
@ -162,7 +160,7 @@ impl PutRecordJob {
/// Adds the key of a record that is ignored on the current or
/// next run of the job.
pub fn skip(&mut self, key: Multihash) {
pub fn skip(&mut self, key: record::Key) {
self.skipped.insert(key);
}

View File

@ -33,11 +33,10 @@
use bytes::BytesMut;
use codec::UviBytes;
use crate::protobuf_structs::dht as proto;
use crate::record::Record;
use crate::record::{self, Record};
use futures::{future::{self, FutureResult}, sink, stream, Sink, Stream};
use libp2p_core::{Multiaddr, PeerId};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
use multihash::Multihash;
use protobuf::{self, Message};
use std::{borrow::Cow, convert::TryFrom, time::Duration};
use std::{io, iter};
@ -258,20 +257,20 @@ pub enum KadRequestMsg {
/// returned is not specified, but should be around 20.
FindNode {
/// The key for which to locate the closest nodes.
key: Multihash,
key: Vec<u8>,
},
/// Same as `FindNode`, but should also return the entries of the local providers list for
/// this key.
GetProviders {
/// Identifier being searched.
key: Multihash,
key: record::Key,
},
/// Indicates that this list of providers is known for this key.
AddProvider {
/// Key for which we should add providers.
key: Multihash,
key: record::Key,
/// Known provider for this key.
provider: KadPeer,
},
@ -279,7 +278,7 @@ pub enum KadRequestMsg {
/// Request to get a value from the dht records.
GetValue {
/// The key we are searching for.
key: Multihash,
key: record::Key,
},
/// Request to put a value into the dht records.
@ -319,7 +318,7 @@ pub enum KadResponseMsg {
/// Response to a `PutValue`.
PutValue {
/// The key of the record.
key: Multihash,
key: record::Key,
/// Value of the record.
value: Vec<u8>,
},
@ -336,14 +335,14 @@ fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
KadRequestMsg::FindNode { key } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::FIND_NODE);
msg.set_key(key.into_bytes());
msg.set_key(key);
msg.set_clusterLevelRaw(10);
msg
}
KadRequestMsg::GetProviders { key } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::GET_PROVIDERS);
msg.set_key(key.into_bytes());
msg.set_key(key.to_vec());
msg.set_clusterLevelRaw(10);
msg
}
@ -351,7 +350,7 @@ fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::ADD_PROVIDER);
msg.set_clusterLevelRaw(10);
msg.set_key(key.into_bytes());
msg.set_key(key.to_vec());
msg.mut_providerPeers().push(provider.into());
msg
}
@ -359,7 +358,7 @@ fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::GET_VALUE);
msg.set_clusterLevelRaw(10);
msg.set_key(key.into_bytes());
msg.set_key(key.to_vec());
msg
}
@ -426,10 +425,10 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
} => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::PUT_VALUE);
msg.set_key(key.clone().into_bytes());
msg.set_key(key.to_vec());
let mut record = proto::Record::new();
record.set_key(key.into_bytes());
record.set_key(key.to_vec());
record.set_value(value);
msg.set_record(record);
@ -451,18 +450,17 @@ fn proto_to_req_msg(mut message: proto::Message) -> Result<KadRequestMsg, io::Er
}
proto::Message_MessageType::GET_VALUE => {
let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?;
let key = record::Key::from(message.take_key());
Ok(KadRequestMsg::GetValue { key })
}
proto::Message_MessageType::FIND_NODE => {
let key = Multihash::from_bytes(message.take_key())
.map_err(|_| invalid_data("Invalid key in FIND_NODE"))?;
let key = message.take_key();
Ok(KadRequestMsg::FindNode { key })
}
proto::Message_MessageType::GET_PROVIDERS => {
let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?;
let key = record::Key::from(message.take_key());
Ok(KadRequestMsg::GetProviders { key })
}
@ -476,7 +474,7 @@ fn proto_to_req_msg(mut message: proto::Message) -> Result<KadRequestMsg, io::Er
.find_map(|peer| KadPeer::try_from(peer).ok());
if let Some(provider) = provider {
let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?;
let key = record::Key::from(message.take_key());
Ok(KadRequestMsg::AddProvider { key, provider })
} else {
Err(invalid_data("ADD_PROVIDER message with no valid peer."))
@ -539,7 +537,7 @@ fn proto_to_resp_msg(mut message: proto::Message) -> Result<KadResponseMsg, io::
}
proto::Message_MessageType::PUT_VALUE => {
let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?;
let key = record::Key::from(message.take_key());
if !message.has_record() {
return Err(invalid_data("received PUT_VALUE message with no record"));
}
@ -557,7 +555,7 @@ fn proto_to_resp_msg(mut message: proto::Message) -> Result<KadResponseMsg, io::
}
fn record_from_proto(mut record: proto::Record) -> Result<Record, io::Error> {
let key = Multihash::from_bytes(record.take_key()).map_err(invalid_data)?;
let key = record::Key::from(record.take_key());
let value = record.take_value();
let publisher =
@ -581,7 +579,7 @@ fn record_from_proto(mut record: proto::Record) -> Result<Record, io::Error> {
fn record_to_proto(record: Record) -> proto::Record {
let mut pb_record = proto::Record::new();
pb_record.key = record.key.into_bytes();
pb_record.key = record.key.to_vec();
pb_record.value = record.value;
if let Some(p) = record.publisher {
pb_record.publisher = p.into_bytes();

View File

@ -22,16 +22,51 @@
pub mod store;
use bytes::Bytes;
use libp2p_core::PeerId;
use multihash::Multihash;
use std::hash::{Hash, Hasher};
use wasm_timer::Instant;
/// The (opaque) key of a record.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct Key(Bytes);
impl Key {
/// Creates a new key from the bytes of the input.
pub fn new<K: AsRef<[u8]>>(key: &K) -> Self {
Key(Bytes::from(key.as_ref()))
}
/// Copies the bytes of the key into a new vector.
pub fn to_vec(&self) -> Vec<u8> {
Vec::from(&self.0[..])
}
}
impl AsRef<[u8]> for Key {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}
impl From<Vec<u8>> for Key {
fn from(v: Vec<u8>) -> Key {
Key(Bytes::from(v))
}
}
impl From<Multihash> for Key {
fn from(m: Multihash) -> Key {
Key::from(m.into_bytes())
}
}
/// A record stored in the DHT.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Record {
/// Key of the record.
pub key: Multihash,
pub key: Key,
/// Value of the record.
pub value: Vec<u8>,
/// The (original) publisher of the record.
@ -42,9 +77,12 @@ pub struct Record {
impl Record {
/// Creates a new record for insertion into the DHT.
pub fn new(key: Multihash, value: Vec<u8>) -> Self {
pub fn new<K>(key: K, value: Vec<u8>) -> Self
where
K: Into<Key>
{
Record {
key,
key: key.into(),
value,
publisher: None,
expires: None,
@ -62,7 +100,7 @@ impl Record {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ProviderRecord {
/// The key whose value is provided by the provider.
pub key: Multihash,
pub key: Key,
/// The provider of the value for the key.
pub provider: PeerId,
/// The expiration time as measured by a local, monotonic clock.
@ -78,9 +116,12 @@ impl Hash for ProviderRecord {
impl ProviderRecord {
/// Creates a new provider record for insertion into a `RecordStore`.
pub fn new(key: Multihash, provider: PeerId) -> Self {
pub fn new<K>(key: K, provider: PeerId) -> Self
where
K: Into<Key>
{
ProviderRecord {
key, provider, expires: None
key: key.into(), provider, expires: None
}
}
@ -98,10 +139,16 @@ mod tests {
use rand::Rng;
use std::time::Duration;
impl Arbitrary for Key {
fn arbitrary<G: Gen>(_: &mut G) -> Key {
Key::from(Multihash::random(SHA2256))
}
}
impl Arbitrary for Record {
fn arbitrary<G: Gen>(g: &mut G) -> Record {
Record {
key: Multihash::random(SHA2256),
key: Key::arbitrary(g),
value: Vec::arbitrary(g),
publisher: if g.gen() { Some(PeerId::random()) } else { None },
expires: if g.gen() {
@ -116,7 +163,7 @@ mod tests {
impl Arbitrary for ProviderRecord {
fn arbitrary<G: Gen>(g: &mut G) -> ProviderRecord {
ProviderRecord {
key: Multihash::random(SHA2256),
key: Key::arbitrary(g),
provider: PeerId::random(),
expires: if g.gen() {
Some(Instant::now() + Duration::from_secs(g.gen_range(0, 60)))

View File

@ -64,13 +64,13 @@ pub trait RecordStore<'a> {
type ProvidedIter: Iterator<Item = Cow<'a, ProviderRecord>>;
/// Gets a record from the store, given its key.
fn get(&'a self, k: &Multihash) -> Option<Cow<Record>>;
fn get(&'a self, k: &Key) -> Option<Cow<Record>>;
/// Puts a record into the store.
fn put(&'a mut self, r: Record) -> Result<()>;
/// Removes the record with the given key from the store.
fn remove(&'a mut self, k: &Multihash);
fn remove(&'a mut self, k: &Key);
/// Gets an iterator over all (value-) records currently stored.
fn records(&'a self) -> Self::RecordsIter;
@ -83,13 +83,13 @@ pub trait RecordStore<'a> {
fn add_provider(&'a mut self, record: ProviderRecord) -> Result<()>;
/// Gets a copy of the stored provider records for the given key.
fn providers(&'a self, key: &Multihash) -> Vec<ProviderRecord>;
fn providers(&'a self, key: &Key) -> Vec<ProviderRecord>;
/// Gets an iterator over all stored provider records for which the
/// node owning the store is itself the provider.
fn provided(&'a self) -> Self::ProvidedIter;
/// Removes a provider record from the store.
fn remove_provider(&'a mut self, k: &Multihash, p: &PeerId);
fn remove_provider(&'a mut self, k: &Key, p: &PeerId);
}

View File

@ -22,7 +22,6 @@ use super::*;
use crate::kbucket;
use libp2p_core::PeerId;
use multihash::Multihash;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::collections::{hash_map, hash_set, HashMap, HashSet};
@ -35,9 +34,9 @@ pub struct MemoryStore {
/// The configuration of the store.
config: MemoryStoreConfig,
/// The stored (regular) records.
records: HashMap<Multihash, Record>,
records: HashMap<Key, Record>,
/// The stored provider records.
providers: HashMap<Multihash, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
providers: HashMap<Key, SmallVec<[ProviderRecord; K_VALUE.get()]>>,
/// The set of all provider records for the node identified by `local_key`.
///
/// Must be kept in sync with `providers`.
@ -90,7 +89,7 @@ impl MemoryStore {
/// Retains the records satisfying a predicate.
pub fn retain<F>(&mut self, f: F)
where
F: FnMut(&Multihash, &mut Record) -> bool
F: FnMut(&Key, &mut Record) -> bool
{
self.records.retain(f);
}
@ -98,7 +97,7 @@ impl MemoryStore {
impl<'a> RecordStore<'a> for MemoryStore {
type RecordsIter = iter::Map<
hash_map::Values<'a, Multihash, Record>,
hash_map::Values<'a, Key, Record>,
fn(&'a Record) -> Cow<'a, Record>
>;
@ -107,7 +106,7 @@ impl<'a> RecordStore<'a> for MemoryStore {
fn(&'a ProviderRecord) -> Cow<'a, ProviderRecord>
>;
fn get(&'a self, k: &Multihash) -> Option<Cow<Record>> {
fn get(&'a self, k: &Key) -> Option<Cow<Record>> {
self.records.get(k).map(Cow::Borrowed)
}
@ -133,7 +132,7 @@ impl<'a> RecordStore<'a> for MemoryStore {
Ok(())
}
fn remove(&'a mut self, k: &Multihash) {
fn remove(&'a mut self, k: &Key) {
self.records.remove(k);
}
@ -191,7 +190,7 @@ impl<'a> RecordStore<'a> for MemoryStore {
Ok(())
}
fn providers(&'a self, key: &Multihash) -> Vec<ProviderRecord> {
fn providers(&'a self, key: &Key) -> Vec<ProviderRecord> {
self.providers.get(&key).map_or_else(Vec::new, |ps| ps.clone().into_vec())
}
@ -199,7 +198,7 @@ impl<'a> RecordStore<'a> for MemoryStore {
self.provided.iter().map(Cow::Borrowed)
}
fn remove_provider(&'a mut self, key: &Multihash, provider: &PeerId) {
fn remove_provider(&'a mut self, key: &Key, provider: &PeerId) {
if let hash_map::Entry::Occupied(mut e) = self.providers.entry(key.clone()) {
let providers = e.get_mut();
if let Some(i) = providers.iter().position(|p| &p.provider == provider) {
@ -252,7 +251,7 @@ mod tests {
fn providers_ordered_by_distance_to_key() {
fn prop(providers: Vec<kbucket::Key<PeerId>>) -> bool {
let mut store = MemoryStore::new(PeerId::random());
let key = Multihash::random(SHA2256);
let key = Key::from(Multihash::random(SHA2256));
let mut records = providers.into_iter().map(|p| {
ProviderRecord::new(key.clone(), p.into_preimage())