Kademlia Records (#1144)

* initial implementation of the records

* move to multihash keys

* correctly process query results

* comments and formatting

* correctly return closer_peers in query

* checking wrong peer id in test

* Apply suggestions from code review

Co-Authored-By: Roman Borschel <romanb@users.noreply.github.com>

* Fix changes from suggestions

* Send responses to PUT_VALUE requests

* Shortcut in get_value

* Update protocols/kad/src/behaviour.rs

Co-Authored-By: Roman Borschel <romanb@users.noreply.github.com>

* Revert "Update protocols/kad/src/behaviour.rs"

This reverts commit 579ce742a7f4c94587f1e1f0866d2a3a37418efb.

* Remove duplicate insertion

* Adds a record to a PUT_VALUE response

* Fix a racy put_value test

* Store value ourselves only if we are in K closest

* Abstract over storage

* Revert "Abstract over storage": bad take

This reverts commit eaebf5b6d915712eaf3b05929577fdf697f204d8.

* Abstract over records storage using hashmap as default

* Constructor for custom records

* New Record type and its traits

* Fix outdated storage name

* Fixes returning an event

* Change FindNodeReq key type to Multihash

* WriteState for a second stage of a PUT_VALUE request

* GET_VALUE should not have a record

* Refactor a match arm

* Add successes and failures counters to PutValueRes

* If value is found no need to return closer peers

* Remove a custo storage from tests

* Rename a test to get_value_not_found

* Adds a TODO to change FindNode request key to Multihash

Co-Authored-By: Roman Borschel <romanb@users.noreply.github.com>

* Move MemoryRecordStorage to record.rs

* Return a Cow-ed Record from get

* Fix incorrect GET_VALUE parsing

* Various fixes with review

* Fixes get_value_not_found

* Fix peerids names in test

* another fix

* PutValue correctly distributes values

* Simplify the test

* Check that results are actually the closest

* Reverts changes to tests

* Fix the test topology and checking the results

* Run put_value test ten times

* Adds a get_value test

* Apply suggestions from code review

Co-Authored-By: Roman Borschel <romanb@users.noreply.github.com>

* Make Record fields public

* Moves WriteState to write.rs

* A couple of minor fixes

* Another few fixes of review

* Simplify the put_value test

* Dont synchronously return an error from put_value

* Formatting fixes and comments

* Collect a bunch of results

* Take exactly as much elements as neede

* Check if the peer is still connected

* Adds a multiple GetValueResults results number test

* Unnecessary mut iterators in put_value

* Ask for num_results in get_value

* Dont allocate twice in get_value

* Dont count same errored peer multiple times

* Apply suggestions from code review

Co-Authored-By: Roman Borschel <romanb@users.noreply.github.com>

* Fix another review

* Apply suggestions from code review

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Bring back FromIterator and improve a panic message

* Update protocols/kad/src/behaviour.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Fedor Sakharov
2019-06-04 14:44:24 +03:00
committed by Pierre Krieger
parent 603fd5744f
commit 22527e7eb6
8 changed files with 1037 additions and 29 deletions

View File

@ -28,6 +28,7 @@ use libp2p::{
PeerId,
identity
};
use libp2p::kad::Kademlia;
fn main() {
// Create a random key for ourselves.
@ -44,7 +45,7 @@ fn main() {
// to insert our local node in the DHT. However here we use `without_init` because this
// example is very ephemeral and we don't want to pollute the DHT. In a real world
// application, you want to use `new` instead.
let mut behaviour = libp2p::kad::Kademlia::new(local_peer_id.clone());
let mut behaviour: Kademlia<_> = libp2p::kad::Kademlia::new(local_peer_id.clone());
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());

View File

@ -23,20 +23,22 @@ use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
use crate::write::WriteState;
use crate::record::{MemoryRecordStorage, RecordStore, Record, RecordStorageError};
use fnv::{FnvHashMap, FnvHashSet};
use futures::{prelude::*, stream};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use multihash::Multihash;
use smallvec::SmallVec;
use std::{borrow::Cow, error, marker::PhantomData, time::Duration};
use std::{borrow::Cow, error, iter::FromIterator, marker::PhantomData, num::NonZeroU8, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use wasm_timer::{Instant, Interval};
mod test;
/// Network behaviour that handles Kademlia.
pub struct Kademlia<TSubstream> {
pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorage> {
/// Storage for the nodes. Contains the known multiaddresses for this node.
kbuckets: KBucketsTable<PeerId, Addresses>,
@ -47,6 +49,9 @@ pub struct Kademlia<TSubstream> {
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
active_queries: FnvHashMap<QueryId, QueryState<QueryInfo, PeerId>>,
/// All the `PUT_VALUE` actions we are currently performing
active_writes: FnvHashMap<QueryId, WriteState<PeerId, Multihash>>,
/// List of peers the swarm is connected to.
connected_peers: FnvHashSet<PeerId>,
@ -89,6 +94,9 @@ pub struct Kademlia<TSubstream> {
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
/// The records that we keep.
records: TRecordStorage,
}
/// Opaque type. Each query that we start gets a unique number.
@ -131,6 +139,24 @@ enum QueryInfoInner {
/// Which hash we're targetting.
target: Multihash,
},
/// Put the value to the dht records
PutValue {
/// The key of the record being inserted
key: Multihash,
/// The value of the record being inserted
value: Vec<u8>,
},
/// Get value from the dht record
GetValue {
/// The key we're looking for
key: Multihash,
/// The results from peers are stored here
results: Vec<Record>,
/// The number of results to look for.
num_results: usize,
},
}
impl Into<kbucket::Key<QueryInfo>> for QueryInfo {
@ -146,6 +172,8 @@ impl AsRef<[u8]> for QueryInfo {
QueryInfoInner::FindPeer(peer) => peer.as_ref(),
QueryInfoInner::GetProviders { target, .. } => target.as_bytes(),
QueryInfoInner::AddProvider { target } => target.as_bytes(),
QueryInfoInner::GetValue { key, .. } => key.as_bytes(),
QueryInfoInner::PutValue { key, .. } => key.as_bytes(),
}
}
}
@ -155,11 +183,12 @@ impl QueryInfo {
fn to_rpc_request<TUserData>(&self, user_data: TUserData) -> KademliaHandlerIn<TUserData> {
match &self.inner {
QueryInfoInner::Initialization { target } => KademliaHandlerIn::FindNodeReq {
key: target.clone(),
key: target.clone().into(),
user_data,
},
QueryInfoInner::FindPeer(key) => KademliaHandlerIn::FindNodeReq {
key: key.clone(),
// TODO: Change the `key` of `QueryInfoInner::FindPeer` to be a `Multihash`.
key: key.clone().into(),
user_data,
},
QueryInfoInner::GetProviders { target, .. } => KademliaHandlerIn::GetProvidersReq {
@ -170,35 +199,62 @@ impl QueryInfo {
key: unimplemented!(), // TODO: target.clone(),
user_data,
},
QueryInfoInner::GetValue { key, .. } => KademliaHandlerIn::GetValue {
key: key.clone(),
user_data,
},
QueryInfoInner::PutValue { key, .. } => KademliaHandlerIn::FindNodeReq {
key: key.clone(),
user_data,
}
}
}
}
impl<TSubstream> Kademlia<TSubstream> {
impl<TSubstream, TRecordStorage> Kademlia<TSubstream, TRecordStorage>
where
TRecordStorage: RecordStore
{
/// Creates a `Kademlia`.
#[inline]
pub fn new(local_peer_id: PeerId) -> Self {
Self::new_inner(local_peer_id)
pub fn new(local_peer_id: PeerId) -> Self
where
TRecordStorage: Default
{
Self::new_inner(local_peer_id, Default::default())
}
/// The same as `new`, but using a custom protocol name.
///
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
/// custom name therefore allows to segregate the DHT from others, if that is desired.
pub fn with_protocol_name(local_peer_id: PeerId, name: impl Into<Cow<'static, [u8]>>) -> Self {
let mut me = Kademlia::new_inner(local_peer_id);
pub fn with_protocol_name(local_peer_id: PeerId, name: impl Into<Cow<'static, [u8]>>) -> Self
where
TRecordStorage: Default
{
let mut me = Kademlia::new_inner(local_peer_id, Default::default());
me.protocol_name_override = Some(name.into());
me
}
/// The same as `new`, but with a custom storage.
///
/// The default records storage is in memory, this lets override the
/// storage with user-defined behaviour
pub fn with_storage(local_peer_id: PeerId, records: TRecordStorage) -> Self {
Self::new_inner(local_peer_id, records)
}
/// Creates a `Kademlia`.
///
/// Contrary to `new`, doesn't perform the initialization queries that store our local ID into
/// the DHT and fill our buckets.
#[inline]
#[deprecated(note="this function is now equivalent to new() and will be removed in the future")]
pub fn without_init(local_peer_id: PeerId) -> Self {
Self::new_inner(local_peer_id)
pub fn without_init(local_peer_id: PeerId) -> Self
where TRecordStorage: Default
{
Self::new_inner(local_peer_id, Default::default())
}
/// Adds a known address of a peer participating in the Kademlia DHT to the
@ -242,7 +298,7 @@ impl<TSubstream> Kademlia<TSubstream> {
}
/// Inner implementation of the constructors.
fn new_inner(local_peer_id: PeerId) -> Self {
fn new_inner(local_peer_id: PeerId, records: TRecordStorage) -> Self {
let parallelism = 3;
Kademlia {
@ -250,6 +306,7 @@ impl<TSubstream> Kademlia<TSubstream> {
protocol_name_override: None,
queued_events: SmallVec::new(),
active_queries: Default::default(),
active_writes: Default::default(),
connected_peers: Default::default(),
pending_rpcs: SmallVec::with_capacity(parallelism),
next_query_id: QueryId(0),
@ -261,6 +318,7 @@ impl<TSubstream> Kademlia<TSubstream> {
rpc_timeout: Duration::from_secs(8),
add_provider: SmallVec::new(),
marker: PhantomData,
records,
}
}
@ -283,6 +341,45 @@ impl<TSubstream> Kademlia<TSubstream> {
self.start_query(QueryInfoInner::GetProviders { target, pending_results: Vec::new() });
}
/// Starts an iterative `GET_VALUE` request.
///
/// Returns a number of results that is in the interval [1, 20],
/// if the user requested a larger amount of results it is cropped to 20.
pub fn get_value(&mut self, key: &Multihash, num_results: NonZeroU8) {
let num_results = usize::min(num_results.get() as usize, kbucket::MAX_NODES_PER_BUCKET);
let mut results = Vec::with_capacity(num_results);
if let Some(record) = self.records.get(key) {
results.push(record.into_owned());
if num_results == 1 {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(
KademliaOut::GetValueResult(
GetValueResult::Found { results }
)));
return;
}
}
self.start_query(QueryInfoInner::GetValue {
key: key.clone(),
results,
num_results
});
}
/// Starts an iterative `PUT_VALUE` request
pub fn put_value(&mut self, key: Multihash, value: Vec<u8>) {
if let Err(error) = self.records.put(Record { key: key.clone(), value: value.clone() }) {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(
KademliaOut::PutValueResult(
PutValueResult::Err { key, cause: error }
)
));
} else {
self.start_query(QueryInfoInner::PutValue { key, value });
}
}
/// Register the local node as the provider for the given key.
///
/// This will periodically send `ADD_PROVIDER` messages to the nodes closest to the key. When
@ -454,9 +551,10 @@ impl<TSubstream> Kademlia<TSubstream> {
}
}
impl<TSubstream> NetworkBehaviour for Kademlia<TSubstream>
impl<TSubstream, TRecordStorage> NetworkBehaviour for Kademlia<TSubstream, TRecordStorage>
where
TSubstream: AsyncRead + AsyncWrite,
TRecordStorage: RecordStore,
{
type ProtocolsHandler = KademliaHandler<TSubstream, QueryId>;
type OutEvent = KademliaOut;
@ -532,12 +630,18 @@ where
for query in self.active_queries.values_mut() {
query.inject_rpc_error(peer_id);
}
for write in self.active_writes.values_mut() {
write.inject_write_error(peer_id);
}
}
fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) {
for query in self.active_queries.values_mut() {
query.inject_rpc_error(id);
}
for write in self.active_writes.values_mut() {
write.inject_write_error(id);
}
self.connection_updated(id.clone(), None, NodeStatus::Disconnected);
self.connected_peers.remove(id);
}
@ -613,6 +717,10 @@ where
if let Some(query) = self.active_queries.get_mut(&user_data) {
query.inject_rpc_error(&source)
}
if let Some(write) = self.active_writes.get_mut(&user_data) {
write.inject_write_error(&source);
}
}
KademliaHandlerEvent::AddProvider { key, provider_peer } => {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
@ -623,6 +731,94 @@ where
self.add_provider.push((key, provider_peer.node_id));
return;
}
KademliaHandlerEvent::GetValue { key, request_id } => {
let (result, closer_peers) = match self.records.get(&key) {
Some(record) => {
(Some(record.into_owned()), Vec::new())
},
None => {
let closer_peers = self.find_closest(&kbucket::Key::from(key), &source);
(None, closer_peers)
}
};
self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: source,
event: KademliaHandlerIn::GetValueRes {
result,
closer_peers,
request_id,
},
});
}
KademliaHandlerEvent::GetValueRes {
result,
closer_peers,
user_data,
} => {
let mut finished_query = None;
if let Some(query) = self.active_queries.get_mut(&user_data) {
if let QueryInfoInner::GetValue {
key: _,
results,
num_results,
} = &mut query.target_mut().inner {
if let Some(result) = result {
results.push(result);
if results.len() == *num_results {
finished_query = Some(user_data);
}
}
}
}
if let Some(finished_query) = finished_query {
let (query_info, _) = self
.active_queries
.remove(&finished_query)
.expect("finished_query was gathered when peeking into active_queries; QED.")
.into_target_and_closest_peers();
match query_info.inner {
QueryInfoInner::GetValue { key: _, results, .. } => {
let result = GetValueResult::Found { results };
let event = KademliaOut::GetValueResult(result);
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
}
// TODO: write a better proof
_ => panic!("unexpected query_info.inner variant for a get_value result; QED.")
}
}
self.discovered(&user_data, &source, closer_peers.iter());
}
KademliaHandlerEvent::PutValue {
key,
value,
request_id
} => {
// TODO: Log errors and immediately reset the stream on error instead of letting the request time out.
if let Ok(()) = self.records.put(Record { key: key.clone(), value: value.clone() }) {
self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: source,
event: KademliaHandlerIn::PutValueRes {
key,
value,
request_id,
},
});
}
}
KademliaHandlerEvent::PutValueRes {
key: _,
user_data,
} => {
if let Some(write) = self.active_writes.get_mut(&user_data) {
write.inject_write_success(&source);
}
}
};
}
@ -712,6 +908,29 @@ where
}
}
let finished_write = self.active_writes.iter()
.find_map(|(&query_id, write)|
if write.done() {
Some(query_id)
} else {
None
});
if let Some(finished_write) = finished_write {
let (t, successes, failures) = self
.active_writes
.remove(&finished_write)
.expect("finished_write was gathered when iterating active_writes; QED.")
.into_inner();
let event = KademliaOut::PutValueResult(PutValueResult::Ok {
key: t,
successes,
failures,
});
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
if let Some(finished_query) = finished_query {
let (query_info, closer_peers) = self
.active_queries
@ -753,6 +972,43 @@ where
self.queued_events.push(event);
}
},
QueryInfoInner::GetValue { key: _, results, .. } => {
let result = match results.len() {
0 => GetValueResult::NotFound{
closest_peers: closer_peers.collect()
},
_ => GetValueResult::Found{ results },
};
let event = KademliaOut::GetValueResult(result);
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::PutValue { key, value } => {
let closer_peers = Vec::from_iter(closer_peers);
for peer in &closer_peers {
let event = KademliaHandlerIn::PutValue {
key: key.clone(),
value: value.clone(),
user_data: finished_query,
};
if self.connected_peers.contains(peer) {
let event = NetworkBehaviourAction::SendEvent {
peer_id: peer.clone(),
event
};
self.queued_events.push(event);
} else {
self.pending_rpcs.push((peer.clone(), event));
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: peer.clone(),
});
}
}
self.active_writes.insert(finished_query, WriteState::new(key, closer_peers));
},
}
} else {
break Async::NotReady;
@ -761,6 +1017,24 @@ where
}
}
/// The result of a `GET_VALUE` query.
#[derive(Debug, Clone, PartialEq)]
pub enum GetValueResult {
/// The results received from peers. Always contains non-zero number of results.
Found { results: Vec<Record> },
/// The record wasn't found.
NotFound { closest_peers: Vec<PeerId> }
}
/// The result of a `PUT_VALUE` query.
#[derive(Debug, Clone, PartialEq)]
pub enum PutValueResult {
/// The value has been put successfully.
Ok { key: Multihash, successes: usize, failures: usize },
/// The put value failed.
Err { key: Multihash, cause: RecordStorageError }
}
/// Output event of the `Kademlia` behaviour.
#[derive(Debug, Clone)]
pub enum KademliaOut {
@ -802,6 +1076,12 @@ pub enum KademliaOut {
/// List of peers ordered from closest to furthest away.
closer_peers: Vec<PeerId>,
},
/// Result of a `GET_VALUE` query
GetValueResult(GetValueResult),
/// Result of a `PUT_VALUE` query
PutValueResult(PutValueResult),
}
impl From<kbucket::EntryView<PeerId, Addresses>> for KadPeer {

View File

@ -20,7 +20,13 @@
#![cfg(test)]
use crate::{Kademlia, KademliaOut, kbucket::{self, Distance}};
use crate::{
GetValueResult,
Kademlia,
KademliaOut,
kbucket::{self, Distance},
record::{Record, RecordStore},
};
use futures::{future, prelude::*};
use libp2p_core::{
PeerId,
@ -36,8 +42,9 @@ use libp2p_core::{
use libp2p_secio::SecioConfig;
use libp2p_yamux as yamux;
use rand::random;
use std::{io, u64};
use std::{collections::HashSet, iter::FromIterator, io, num::NonZeroU8, u64};
use tokio::runtime::Runtime;
use multihash::Hash;
type TestSwarm = Swarm<
Boxed<(PeerId, StreamMuxerBox), io::Error>,
@ -223,3 +230,227 @@ fn unresponsive_not_returned_indirect() {
}))
.unwrap();
}
#[test]
fn get_value_not_found() {
let (port_base, mut swarms) = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
let num_results = NonZeroU8::new(1).unwrap();
swarms[0].get_value(&target_key, num_results);
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::GetValueResult(result))) => {
if let GetValueResult::NotFound { closest_peers} = result {
assert_eq!(closest_peers.len(), 2);
assert!(closest_peers.contains(&swarm_ids[1]));
assert!(closest_peers.contains(&swarm_ids[2]));
return Ok(Async::Ready(()));
} else {
panic!("Expected GetValueResult::NotFound event");
}
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}
Ok(Async::NotReady)
}))
.unwrap()
}
#[test]
fn put_value() {
fn run() {
// Build a test that checks if PUT_VALUE gets correctly propagated in
// a nontrivial topology:
// [31]
// / \
// [29] [30]
// /|\ /|\
// [0]..[14] [15]..[28]
//
// Nodes [29] and [30] have less than kbuckets::MAX_NODES_PER_BUCKET
// peers to avoid the situation when the bucket may be overflowed and
// some of the connections are dropped from the routing table
let (port_base, mut swarms) = build_nodes(32);
let swarm_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();
// Connect swarm[30] to each swarm in swarms[..15]
for (i, peer) in swarm_ids.iter().take(15).enumerate() {
swarms[30].add_address(&peer, Protocol::Memory(port_base + i as u64).into());
}
// Connect swarm[29] to each swarm in swarms[15..29]
for (i, peer) in swarm_ids.iter().skip(15).take(14).enumerate() {
swarms[29].add_address(&peer, Protocol::Memory(port_base + (i + 15) as u64).into());
}
// Connect swarms[31] to swarms[29, 30]
swarms[31].add_address(&swarm_ids[30], Protocol::Memory(port_base + 30 as u64).into());
swarms[31].add_address(&swarm_ids[29], Protocol::Memory(port_base + 29 as u64).into());
let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
let mut sorted_peer_ids: Vec<_> = swarm_ids
.iter()
.map(|id| (id.clone(), kbucket::Key::from(id.clone()).distance(&kbucket::Key::from(target_key.clone()))))
.collect();
sorted_peer_ids.sort_by(|(_, d1), (_, d2)| d1.cmp(d2));
let closest: HashSet<PeerId> = HashSet::from_iter(sorted_peer_ids.into_iter().map(|(id, _)| id));
swarms[31].put_value(target_key.clone(), vec![4,5,6]);
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
let mut check_results = false;
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::PutValueResult{ .. })) => {
check_results = true;
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}
if check_results {
let mut have: HashSet<_> = Default::default();
for (i, swarm) in swarms.iter().take(31).enumerate() {
if swarm.records.get(&target_key).is_some() {
have.insert(swarm_ids[i].clone());
}
}
let intersection: HashSet<_> = have.intersection(&closest).collect();
assert_eq!(have.len(), kbucket::MAX_NODES_PER_BUCKET);
assert_eq!(intersection.len(), kbucket::MAX_NODES_PER_BUCKET);
return Ok(Async::Ready(()));
}
Ok(Async::NotReady)
}))
.unwrap()
}
for _ in 0 .. 10 {
run();
}
}
#[test]
fn get_value() {
let (port_base, mut swarms) = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();
swarms[0].add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into());
let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
let target_value = vec![4,5,6];
let num_results = NonZeroU8::new(1).unwrap();
swarms[1].records.put(Record {
key: target_key.clone(),
value: target_value.clone()
}).unwrap();
swarms[0].get_value(&target_key, num_results);
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::GetValueResult(result))) => {
if let GetValueResult::Found { results } = result {
assert_eq!(results.len(), 1);
let record = results.first().unwrap();
assert_eq!(record.key, target_key);
assert_eq!(record.value, target_value);
return Ok(Async::Ready(()));
} else {
panic!("Expected GetValueResult::Found event");
}
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}
Ok(Async::NotReady)
}))
.unwrap()
}
#[test]
fn get_value_multiple() {
// Check that if we have responses from multiple peers, a correct number of
// results is returned.
let num_results = NonZeroU8::new(10).unwrap();
let (port_base, mut swarms) = build_nodes(2 + num_results.get() as usize);
let swarm_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();
let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
let target_value = vec![4,5,6];
for (i, swarm_id) in swarm_ids.iter().skip(1).enumerate() {
swarms[i + 1].records.put(Record {
key: target_key.clone(),
value: target_value.clone()
}).unwrap();
swarms[0].add_address(&swarm_id, Protocol::Memory(port_base + (i + 1) as u64).into());
}
swarms[0].records.put(Record { key: target_key.clone(), value: target_value.clone() }).unwrap();
swarms[0].get_value(&target_key, num_results);
Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::GetValueResult(result))) => {
if let GetValueResult::Found { results } = result {
assert_eq!(results.len(), num_results.get() as usize);
let record = results.first().unwrap();
assert_eq!(record.key, target_key);
assert_eq!(record.value, target_value);
return Ok(Async::Ready(()));
} else {
panic!("Expected GetValueResult::Found event");
}
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}
Ok(Async::NotReady)
}))
.unwrap()
}

View File

@ -22,6 +22,7 @@ use crate::protocol::{
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
KademliaProtocolConfig,
};
use crate::record::Record;
use futures::prelude::*;
use libp2p_core::protocols_handler::{
KeepAlive,
@ -182,6 +183,42 @@ pub enum KademliaHandlerEvent<TUserData> {
/// Known provider for this key.
provider_peer: KadPeer,
},
/// Request to get a value from the dht records
GetValue {
/// Key for which we should look in the dht
key: Multihash,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
},
/// Response to a `KademliaHandlerIn::GetValue`.
GetValueRes {
/// The result is present if the key has been found
result: Option<Record>,
/// Nodes closest to the key.
closer_peers: Vec<KadPeer>,
/// The user data passed to the `GetValue`.
user_data: TUserData,
},
/// Request to put a value in the dht records
PutValue {
/// The key of the record
key: Multihash,
/// The value of the record
value: Vec<u8>,
/// Identifier of the request. Needs to be passed back when answering.
request_id: KademliaRequestId,
},
/// Response to a request to put a value
PutValueRes {
/// The key we were putting in
key: Multihash,
/// The user data passed to the `GetValue`.
user_data: TUserData,
}
}
/// Error that can happen when requesting an RPC query.
@ -229,12 +266,13 @@ impl From<ProtocolsHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
}
/// Event to send to the handler.
#[derive(Debug)]
pub enum KademliaHandlerIn<TUserData> {
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
/// returned is not specified, but should be around 20.
FindNodeReq {
/// Identifier of the node.
key: PeerId,
key: Multihash,
/// Custom user data. Passed back in the out event when the results arrive.
user_data: TUserData,
},
@ -280,6 +318,44 @@ pub enum KademliaHandlerIn<TUserData> {
/// Known provider for this key.
provider_peer: KadPeer,
},
/// Request to get a node from the dht
GetValue {
/// The key of the value we are looking for
key: Multihash,
/// Custom data. Passed back in the out event when the results arrive.
user_data: TUserData,
},
/// Response to a `GetValue`.
GetValueRes {
/// The value that might have been found in our storage.
result: Option<Record>,
/// Nodes that are closer to the key we were searching for.
closer_peers: Vec<KadPeer>,
/// Identifier of the request that was made by the remote.
request_id: KademliaRequestId,
},
/// Put a value into the dht records.
PutValue {
/// The key of the record.
key: Multihash,
/// The value of the record.
value: Vec<u8>,
/// Custom data. Passed back in the out event when the results arrive.
user_data: TUserData,
},
/// Response to a `PutValue`.
PutValueRes {
/// Key of the value that was put.
key: Multihash,
/// Value that was put.
value: Vec<u8>,
/// Identifier of the request that was made by the remote.
request_id: KademliaRequestId,
}
}
/// Unique identifier for a request. Must be passed back in order to answer a request from
@ -397,9 +473,15 @@ where
fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) {
match message {
KademliaHandlerIn::FindNodeReq { key, user_data } => {
let msg = KadRequestMsg::FindNode { key: key.clone() };
// FIXME: Change `KadRequestMsg::FindNode::key` to be a `Multihash`.
match PeerId::from_multihash(key.clone()) {
Ok(key) => {
let msg = KadRequestMsg::FindNode { key };
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
},
Err(_) => (),
}
}
KademliaHandlerIn::FindNodeRes {
closer_peers,
@ -465,6 +547,74 @@ where
self.substreams
.push(SubstreamState::OutPendingOpen(msg, None));
}
KademliaHandlerIn::GetValue { key, user_data } => {
let msg = KadRequestMsg::GetValue { key };
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::PutValue { key, value, user_data } => {
let msg = KadRequestMsg::PutValue {
key,
value,
};
self.substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
}
KademliaHandlerIn::GetValueRes {
result,
closer_peers,
request_id,
} => {
let pos = self.substreams.iter().position(|state| match state {
SubstreamState::InWaitingUser(ref conn_id, _)
=> conn_id == &request_id.connec_unique_id,
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
_ => unreachable!(),
};
let msg = KadResponseMsg::GetValue {
result,
closer_peers: closer_peers.clone(),
};
self.substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
}
}
KademliaHandlerIn::PutValueRes {
key,
request_id,
value,
} => {
let pos = self.substreams.iter().position(|state| match state {
SubstreamState::InWaitingUser(ref conn_id, _)
if conn_id == &request_id.connec_unique_id =>
{
true
}
_ => false,
});
if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
_ => unreachable!(),
};
let msg = KadResponseMsg::PutValue {
key,
value,
};
self.substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg));
}
}
}
}
@ -740,6 +890,15 @@ fn process_kad_request<TUserData>(
KadRequestMsg::AddProvider { key, provider_peer } => {
Ok(KademliaHandlerEvent::AddProvider { key, provider_peer })
}
KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetValue {
key,
request_id: KademliaRequestId { connec_unique_id },
}),
KadRequestMsg::PutValue { key, value } => Ok(KademliaHandlerEvent::PutValue {
key,
value,
request_id: KademliaRequestId { connec_unique_id },
})
}
}
@ -771,5 +930,19 @@ fn process_kad_response<TUserData>(
provider_peers,
user_data,
},
KadResponseMsg::GetValue {
result,
closer_peers,
} => KademliaHandlerEvent::GetValueRes {
result,
closer_peers,
user_data,
},
KadResponseMsg::PutValue { key, .. } => {
KademliaHandlerEvent::PutValueRes {
key,
user_data,
}
}
}
}

View File

@ -24,14 +24,17 @@
// be useful later for record store
#![allow(dead_code)]
pub use self::behaviour::{Kademlia, KademliaOut};
pub use self::behaviour::{Kademlia, KademliaOut, GetValueResult, PutValueResult};
pub use self::protocol::KadConnectionType;
pub use self::record::{RecordStore, RecordStorageError, MemoryRecordStorage};
pub mod handler;
pub mod kbucket;
pub mod protocol;
pub mod record;
mod addresses;
mod behaviour;
mod protobuf_structs;
mod query;
mod write;

View File

@ -33,6 +33,7 @@
use bytes::BytesMut;
use codec::UviBytes;
use crate::protobuf_structs::dht as proto;
use crate::record::Record;
use futures::{future::{self, FutureResult}, sink, stream, Sink, Stream};
use libp2p_core::{Multiaddr, PeerId};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
@ -273,6 +274,20 @@ pub enum KadRequestMsg {
/// Known provider for this key.
provider_peer: KadPeer,
},
/// Request to get a value from the dht records.
GetValue {
/// The key we are searching for.
key: Multihash,
},
/// Request to put a value into the dht records.
PutValue {
/// The key of the record.
key: Multihash,
/// The value of the record.
value: Vec<u8>,
}
}
/// Response that we can send to a peer or that we received from a peer.
@ -294,6 +309,22 @@ pub enum KadResponseMsg {
/// Known providers for this key.
provider_peers: Vec<KadPeer>,
},
/// Response to a `GetValue`.
GetValue {
/// Result that might have been found
result: Option<Record>,
/// Nodes closest to the key
closer_peers: Vec<KadPeer>,
},
/// Response to a `PutValue`.
PutValue {
/// The key of the record.
key: Multihash,
/// Value of the record.
value: Vec<u8>,
},
}
/// Converts a `KadRequestMsg` into the corresponding protobuf message for sending.
@ -326,6 +357,24 @@ fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
msg.mut_providerPeers().push(provider_peer.into());
msg
}
KadRequestMsg::GetValue { key } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::GET_VALUE);
msg.set_clusterLevelRaw(10);
msg.set_key(key.into_bytes());
msg
}
KadRequestMsg::PutValue { key, value} => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::PUT_VALUE);
let mut record = proto::Record::new();
record.set_value(value);
record.set_key(key.into_bytes());
msg.set_record(record);
msg
}
}
}
@ -361,6 +410,41 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
}
msg
}
KadResponseMsg::GetValue {
result,
closer_peers,
} => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::GET_VALUE);
msg.set_clusterLevelRaw(9);
for peer in closer_peers {
msg.mut_closerPeers().push(peer.into());
}
if let Some(Record{ key, value }) = result {
let mut record = proto::Record::new();
record.set_key(key.into_bytes());
record.set_value(value);
msg.set_record(record);
}
msg
}
KadResponseMsg::PutValue {
key,
value,
} => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::PUT_VALUE);
msg.set_key(key.clone().into_bytes());
let mut record = proto::Record::new();
record.set_key(key.into_bytes());
record.set_value(value);
msg.set_record(record);
msg
}
}
}
@ -371,11 +455,16 @@ fn proto_to_req_msg(mut message: proto::Message) -> Result<KadRequestMsg, io::Er
match message.get_field_type() {
proto::Message_MessageType::PING => Ok(KadRequestMsg::Ping),
proto::Message_MessageType::PUT_VALUE =>
Err(invalid_data("Unsupported: PUT_VALUE message.")),
proto::Message_MessageType::PUT_VALUE => {
let record = message.mut_record();
let key = Multihash::from_bytes(record.take_key()).map_err(invalid_data)?;
Ok(KadRequestMsg::PutValue { key, value: record.take_value() })
}
proto::Message_MessageType::GET_VALUE =>
Err(invalid_data("Unsupported: GET_VALUE message.")),
proto::Message_MessageType::GET_VALUE => {
let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?;
Ok(KadRequestMsg::GetValue { key })
}
proto::Message_MessageType::FIND_NODE => {
let key = PeerId::from_bytes(message.take_key())
@ -414,8 +503,24 @@ fn proto_to_resp_msg(mut message: proto::Message) -> Result<KadResponseMsg, io::
match message.get_field_type() {
proto::Message_MessageType::PING => Ok(KadResponseMsg::Pong),
proto::Message_MessageType::GET_VALUE =>
Err(invalid_data("Unsupported: GET_VALUE message")),
proto::Message_MessageType::GET_VALUE => {
let result = match message.has_record() {
true => {
let mut record = message.take_record();
let key = Multihash::from_bytes(record.take_key()).map_err(invalid_data)?;
Some(Record { key, value: record.take_value() })
}
false => None,
};
let closer_peers = message
.mut_closerPeers()
.iter_mut()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect::<Vec<_>>();
Ok(KadResponseMsg::GetValue { result, closer_peers })
},
proto::Message_MessageType::FIND_NODE => {
let closer_peers = message
@ -446,8 +551,18 @@ fn proto_to_resp_msg(mut message: proto::Message) -> Result<KadResponseMsg, io::
})
}
proto::Message_MessageType::PUT_VALUE =>
Err(invalid_data("received an unexpected PUT_VALUE message")),
proto::Message_MessageType::PUT_VALUE => {
let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?;
if !message.has_record() {
return Err(invalid_data("received PUT_VALUE message with no record"));
}
let mut record = message.take_record();
Ok(KadResponseMsg::PutValue {
key,
value: record.take_value(),
})
}
proto::Message_MessageType::ADD_PROVIDER =>
Err(invalid_data("received an unexpected ADD_PROVIDER message"))

102
protocols/kad/src/record.rs Normal file
View File

@ -0,0 +1,102 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Abstracts the Kademlia record store behaviour and provides default in-memory store
use fnv::FnvHashMap;
use multihash::Multihash;
use std::borrow::Cow;
/// The error record store may return
#[derive(Clone, Debug, PartialEq)]
pub enum RecordStorageError {
/// Store reached the capacity limit.
AtCapacity,
/// Value being put is larger than the limit.
ValueTooLarge,
}
/// The records that are kept in the dht.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Record {
/// Key of the record.
pub key: Multihash,
/// Value of the record.
pub value: Vec<u8>,
}
/// Trait for a record store.
pub trait RecordStore {
fn get(&self, k: &Multihash) -> Option<Cow<Record>>;
fn put(&mut self, r: Record) -> Result<(), RecordStorageError>;
}
/// In-memory implementation of the record store.
pub struct MemoryRecordStorage {
/// Maximum number of records we will store.
max_records: usize,
/// Maximum size of the record we will store.
max_record_size: usize,
/// The records.
records: FnvHashMap<Multihash, Record>
}
impl MemoryRecordStorage {
const MAX_RECORDS: usize = 1024;
const MAX_RECORD_SIZE: usize = 65535;
/// Creates a new `MemoryRecordStorage`.
pub fn new(max_records: usize, max_record_size: usize) -> Self {
MemoryRecordStorage{
max_records,
max_record_size,
records: FnvHashMap::default()
}
}
}
impl Default for MemoryRecordStorage {
fn default() -> Self {
MemoryRecordStorage::new(Self::MAX_RECORDS, Self::MAX_RECORD_SIZE)
}
}
impl RecordStore for MemoryRecordStorage {
fn get(&self, k: &Multihash) -> Option<Cow<Record>> {
match self.records.get(k) {
Some(rec) => Some(Cow::Borrowed(rec)),
None => None,
}
}
fn put(&mut self, r: Record) -> Result<(), RecordStorageError> {
if self.records.len() >= self.max_records {
return Err(RecordStorageError::AtCapacity);
}
if r.value.len() >= self.max_record_size {
return Err(RecordStorageError::ValueTooLarge)
}
self.records.insert(r.key.clone(), r);
Ok(())
}
}

103
protocols/kad/src/write.rs Normal file
View File

@ -0,0 +1,103 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Contains the state of the second stage of PUT_VALUE process of Kademlia.
use fnv::FnvHashMap;
/// The state of the single peer.
#[derive(Clone)]
enum PeerState {
/// We don't know yet.
Unknown,
/// Putting a value failed.
Failed,
/// Putting a value succeeded.
Succeeded,
}
/// State of the `PUV_VALUE` second stage
///
/// Here we are gathering the results of all `PUT_VALUE` requests that we've
/// sent to the appropriate peers. We keep track of the set of peers that we've
/// sent the requests to and the counts for error and normal responses
pub struct WriteState<TPeerId, TTarget> {
/// The key that we're inserting into the dht.
target: TTarget,
/// The peers thae we'are asking to store our value.
peers: FnvHashMap<TPeerId, PeerState>,
/// The count of successful stores.
successes: usize,
/// The count of errors.
failures: usize,
}
impl<TPeerId, TTarget> WriteState<TPeerId, TTarget>
where
TPeerId: std::hash::Hash + Clone + Eq
{
/// Creates a new WriteState.
///
/// Stores the state of an ongoing second stage of a PUT_VALUE process
pub fn new(target: TTarget, peers: Vec<TPeerId>) -> Self {
use std::iter::FromIterator;
WriteState {
target,
peers: FnvHashMap::from_iter(peers
.into_iter()
.zip(std::iter::repeat(PeerState::Unknown))
),
successes: 0,
failures: 0,
}
}
/// Inform the state that writing to one of the target peers has succeeded
pub fn inject_write_success(&mut self, peer: &TPeerId) {
if let Some(state @ PeerState::Unknown) = self.peers.get_mut(peer) {
*state = PeerState::Succeeded;
self.successes += 1;
}
}
/// Inform the state that writing to one of the target peers has failed
pub fn inject_write_error(&mut self, peer: &TPeerId) {
if let Some(state @ PeerState::Unknown) = self.peers.get_mut(peer) {
*state = PeerState::Failed;
self.failures += 1;
}
}
/// Ask the state if it is done
// TODO: probably it should also be a poll() in the fashion of QueryState and have a timeout
pub fn done(&self) -> bool {
self.peers.len() == self.successes + self.failures
}
/// Consume the state and return a list of target peers and succeess/error counters
pub fn into_inner(self) -> (TTarget, usize, usize) {
(self.target, self.successes, self.failures)
}
}