Clean up directory structure (#426)

* Remove unused circular-buffer crate
* Move transports into subdirectory
* Move misc into subdirectory
* Move stores into subdirectory
* Move multiplexers
* Move protocols
* Move libp2p top layer
* Fix Test: skip doctest if secio isn't enabled
This commit is contained in:
Benjamin Kampmann
2018-08-29 11:24:44 +02:00
committed by GitHub
parent f5ce93c730
commit 2ea49718f3
131 changed files with 146 additions and 1023 deletions

View File

@ -0,0 +1,464 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use fnv::FnvHashSet;
use futures::{future, Future, IntoFuture, stream, Stream};
use kad_server::KadConnecController;
use kbucket::{KBucketsTable, KBucketsPeerId};
use libp2p_core::PeerId;
use multiaddr::Multiaddr;
use protocol;
use rand;
use smallvec::SmallVec;
use std::cmp::Ordering;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use std::time::Duration;
use tokio_timer::Timeout;
/// Prototype for a future Kademlia protocol running on a socket.
#[derive(Debug, Clone)]
pub struct KadSystemConfig<I> {
/// Degree of parallelism on the network. Often called `alpha` in technical papers.
/// No more than this number of remotes will be used at a given time for any given operation.
// TODO: ^ share this number between operations? or does each operation use `alpha` remotes?
pub parallelism: u32,
/// Id of the local peer.
pub local_peer_id: PeerId,
/// List of peers initially known.
pub known_initial_peers: I,
/// Duration after which a node in the k-buckets needs to be pinged again.
pub kbuckets_timeout: Duration,
/// When contacting a node, duration after which we consider it unresponsive.
pub request_timeout: Duration,
}
/// System that drives the whole Kademlia process.
pub struct KadSystem {
// The actual DHT.
kbuckets: KBucketsTable<PeerId, ()>,
// Same as in the config.
parallelism: u32,
// Same as in the config.
request_timeout: Duration,
}
/// Event that happens during a query.
#[derive(Debug, Clone)]
pub enum KadQueryEvent<TOut> {
/// Learned about new mutiaddresses for the given peers.
NewKnownMultiaddrs(Vec<(PeerId, Vec<Multiaddr>)>),
/// Finished the processing of the query. Contains the result.
Finished(TOut),
}
impl KadSystem {
/// Starts a new Kademlia system.
///
/// Also produces a `Future` that drives a Kademlia initialization process.
/// This future should be driven to completion by the caller.
pub fn start<'a, F, Fut>(config: KadSystemConfig<impl Iterator<Item = PeerId>>, access: F)
-> (KadSystem, impl Future<Item = (), Error = IoError> + 'a)
where F: FnMut(&PeerId) -> Fut + Clone + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
let system = KadSystem::without_init(config);
let init_future = system.perform_initialization(access);
(system, init_future)
}
/// Same as `start`, but doesn't perform the initialization process.
pub fn without_init(config: KadSystemConfig<impl Iterator<Item = PeerId>>) -> KadSystem {
let kbuckets = KBucketsTable::new(config.local_peer_id.clone(), config.kbuckets_timeout);
for peer in config.known_initial_peers {
let _ = kbuckets.update(peer, ());
}
let system = KadSystem {
kbuckets: kbuckets,
parallelism: config.parallelism,
request_timeout: config.request_timeout,
};
system
}
/// Starts an initialization process.
pub fn perform_initialization<'a, F, Fut>(&self, access: F) -> impl Future<Item = (), Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + Clone + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
let futures: Vec<_> = (0..256) // TODO: 256 is arbitrary
.map(|n| {
refresh(n, access.clone(), &self.kbuckets,
self.parallelism as usize, self.request_timeout)
})
.map(|stream| stream.for_each(|_| Ok(())))
.collect();
future::loop_fn(futures, |futures| {
if futures.is_empty() {
let fut = future::ok(future::Loop::Break(()));
return future::Either::A(fut);
}
let fut = future::select_all(futures)
.map_err(|(err, _, _)| err)
.map(|(_, _, rest)| future::Loop::Continue(rest));
future::Either::B(fut)
})
}
/// Updates the k-buckets with the specific peer.
///
/// Should be called whenever we receive a message from a peer.
pub fn update_kbuckets(&self, peer: PeerId) {
// TODO: ping system
let _ = self.kbuckets.update(peer, ());
}
/// Returns the local peer ID, as passed in the configuration.
pub fn local_peer_id(&self) -> &PeerId {
self.kbuckets.my_id()
}
/// Finds the known nodes closest to `id`, ordered by distance.
pub fn known_closest_peers(&self, id: &PeerId) -> impl Iterator<Item = PeerId> {
self.kbuckets.find_closest_with_self(id)
}
/// Starts a query for an iterative `FIND_NODE` request.
pub fn find_node<'a, F, Fut>(&self, searched_key: PeerId, access: F)
-> impl Stream<Item = KadQueryEvent<Vec<PeerId>>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
query(access, &self.kbuckets, searched_key, self.parallelism as usize,
20, self.request_timeout) // TODO: arbitrary const
}
}
// Refreshes a specific bucket by performing an iterative `FIND_NODE` on a random ID of this
// bucket.
//
// Returns a dummy no-op future if `bucket_num` is out of range.
fn refresh<'a, F, Fut>(bucket_num: usize, access: F, kbuckets: &KBucketsTable<PeerId, ()>,
parallelism: usize, request_timeout: Duration)
-> impl Stream<Item = KadQueryEvent<()>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
let peer_id = match gen_random_id(kbuckets.my_id(), bucket_num) {
Ok(p) => p,
Err(()) => {
let stream = stream::once(Ok(KadQueryEvent::Finished(())));
return Box::new(stream) as Box<Stream<Item = _, Error = _>>;
},
};
let stream = query(access, kbuckets, peer_id, parallelism, 20, request_timeout) // TODO: 20 is arbitrary
.map(|event| {
match event {
KadQueryEvent::NewKnownMultiaddrs(peers) => KadQueryEvent::NewKnownMultiaddrs(peers),
KadQueryEvent::Finished(_) => KadQueryEvent::Finished(()),
}
});
Box::new(stream) as Box<Stream<Item = _, Error = _>>
}
// Generates a random `PeerId` that belongs to the given bucket.
//
// Returns an error if `bucket_num` is out of range.
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
let my_id_len = my_id.as_bytes().len();
// TODO: this 2 is magic here ; it is the length of the hash of the multihash
let bits_diff = bucket_num + 1;
if bits_diff > 8 * (my_id_len - 2) {
return Err(());
}
let mut random_id = [0; 64];
for byte in 0..my_id_len {
match byte.cmp(&(my_id_len - bits_diff / 8 - 1)) {
Ordering::Less => {
random_id[byte] = my_id.as_bytes()[byte];
}
Ordering::Equal => {
let mask: u8 = (1 << (bits_diff % 8)) - 1;
random_id[byte] = (my_id.as_bytes()[byte] & !mask) | (rand::random::<u8>() & mask);
}
Ordering::Greater => {
random_id[byte] = rand::random();
}
}
}
let peer_id = PeerId::from_bytes(random_id[..my_id_len].to_owned())
.expect("randomly-generated peer ID should always be valid");
Ok(peer_id)
}
// Generic query-performing function.
fn query<'a, F, Fut>(
access: F,
kbuckets: &KBucketsTable<PeerId, ()>,
searched_key: PeerId,
parallelism: usize,
num_results: usize,
request_timeout: Duration,
) -> impl Stream<Item = KadQueryEvent<Vec<PeerId>>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
// State of the current iterative process.
struct State<'a, F> {
// At which stage we are.
stage: Stage,
// The `access` parameter.
access: F,
// Final output of the iteration.
result: Vec<PeerId>,
// For each open connection, a future with the response of the remote.
// Note that don't use a `SmallVec` here because `select_all` produces a `Vec`.
current_attempts_fut: Vec<Box<Future<Item = Vec<protocol::KadPeer>, Error = IoError> + 'a>>,
// For each open connection, the peer ID that we are connected to.
// Must always have the same length as `current_attempts_fut`.
current_attempts_addrs: SmallVec<[PeerId; 32]>,
// Nodes that need to be attempted.
pending_nodes: Vec<PeerId>,
// Peers that we tried to contact but failed.
failed_to_contact: FnvHashSet<PeerId>,
}
// General stage of the state.
#[derive(Copy, Clone, PartialEq, Eq)]
enum Stage {
// We are still in the first step of the algorithm where we try to find the closest node.
FirstStep,
// We are contacting the k closest nodes in order to fill the list with enough results.
SecondStep,
// The results are complete, and the next stream iteration will produce the outcome.
FinishingNextIter,
// We are finished and the stream shouldn't return anything anymore.
Finished,
}
let initial_state = State {
stage: Stage::FirstStep,
access: access,
result: Vec::with_capacity(num_results),
current_attempts_fut: Vec::new(),
current_attempts_addrs: SmallVec::new(),
pending_nodes: kbuckets.find_closest(&searched_key).collect(),
failed_to_contact: Default::default(),
};
// Start of the iterative process.
let stream = stream::unfold(initial_state, move |mut state| -> Option<_> {
match state.stage {
Stage::FinishingNextIter => {
let result = mem::replace(&mut state.result, Vec::new());
debug!("Query finished with {} results", result.len());
state.stage = Stage::Finished;
let future = future::ok((Some(KadQueryEvent::Finished(result)), state));
return Some(future::Either::A(future));
},
Stage::Finished => {
return None;
},
_ => ()
};
let searched_key = searched_key.clone();
// Find out which nodes to contact at this iteration.
let to_contact = {
let wanted_len = if state.stage == Stage::FirstStep {
parallelism.saturating_sub(state.current_attempts_fut.len())
} else {
num_results.saturating_sub(state.current_attempts_fut.len())
};
let mut to_contact = SmallVec::<[_; 16]>::new();
while to_contact.len() < wanted_len && !state.pending_nodes.is_empty() {
// Move the first element of `pending_nodes` to `to_contact`, but ignore nodes that
// are already part of the results or of a current attempt or if we failed to
// contact it before.
let peer = state.pending_nodes.remove(0);
if state.result.iter().any(|p| p == &peer) {
continue;
}
if state.current_attempts_addrs.iter().any(|p| p == &peer) {
continue;
}
if state.failed_to_contact.iter().any(|p| p == &peer) {
continue;
}
to_contact.push(peer);
}
to_contact
};
debug!("New query round ; {} queries in progress ; contacting {} new peers",
state.current_attempts_fut.len(),
to_contact.len());
// For each node in `to_contact`, start an RPC query and a corresponding entry in the two
// `state.current_attempts_*` fields.
for peer in to_contact {
let searched_key2 = searched_key.clone();
let current_attempt = (state.access)(&peer)
.into_future()
.and_then(move |controller| {
controller.find_node(&searched_key2)
});
let with_deadline = Timeout::new(current_attempt, request_timeout)
.map_err(|err| {
if let Some(err) = err.into_inner() {
err
} else {
IoError::new(IoErrorKind::ConnectionAborted, "kademlia request timeout")
}
});
state.current_attempts_addrs.push(peer.clone());
state
.current_attempts_fut
.push(Box::new(with_deadline) as Box<_>);
}
debug_assert_eq!(
state.current_attempts_addrs.len(),
state.current_attempts_fut.len()
);
// Extract `current_attempts_fut` so that we can pass it to `select_all`. We will push the
// values back when inside the loop.
let current_attempts_fut = mem::replace(&mut state.current_attempts_fut, Vec::new());
if current_attempts_fut.is_empty() {
// If `current_attempts_fut` is empty, then `select_all` would panic. It happens
// when we have no additional node to query.
debug!("Finishing query early because no additional node available");
state.stage = Stage::FinishingNextIter;
let future = future::ok((None, state));
return Some(future::Either::A(future));
}
// This is the future that continues or breaks the `loop_fn`.
let future = future::select_all(current_attempts_fut.into_iter()).then(move |result| {
let (message, trigger_idx, other_current_attempts) = match result {
Err((err, trigger_idx, other_current_attempts)) => {
(Err(err), trigger_idx, other_current_attempts)
}
Ok((message, trigger_idx, other_current_attempts)) => {
(Ok(message), trigger_idx, other_current_attempts)
}
};
// Putting back the extracted elements in `state`.
let remote_id = state.current_attempts_addrs.remove(trigger_idx);
debug_assert!(state.current_attempts_fut.is_empty());
state.current_attempts_fut = other_current_attempts;
// `message` contains the reason why the current future was woken up.
let closer_peers = match message {
Ok(msg) => msg,
Err(err) => {
trace!("RPC query failed for {:?}: {:?}", remote_id, err);
state.failed_to_contact.insert(remote_id);
return future::ok((None, state));
}
};
// Inserting the node we received a response from into `state.result`.
// The code is non-trivial because `state.result` is ordered by distance and is limited
// by `num_results` elements.
if let Some(insert_pos) = state.result.iter().position(|e| {
e.distance_with(&searched_key) >= remote_id.distance_with(&searched_key)
}) {
if state.result[insert_pos] != remote_id {
if state.result.len() >= num_results {
state.result.pop();
}
state.result.insert(insert_pos, remote_id);
}
} else if state.result.len() < num_results {
state.result.push(remote_id);
}
// The loop below will set this variable to `true` if we find a new element to put at
// the top of the result. This would mean that we have to continue looping.
let mut local_nearest_node_updated = false;
// Update `state` with the actual content of the message.
let mut new_known_multiaddrs = Vec::with_capacity(closer_peers.len());
for mut peer in closer_peers {
// Update the peerstore with the information sent by
// the remote.
{
let multiaddrs = mem::replace(&mut peer.multiaddrs, Vec::new());
trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, multiaddrs);
new_known_multiaddrs.push((peer.node_id.clone(), multiaddrs));
}
if peer.node_id.distance_with(&searched_key)
<= state.result[0].distance_with(&searched_key)
{
local_nearest_node_updated = true;
}
if state.result.iter().any(|ma| ma == &peer.node_id) {
continue;
}
// Insert the node into `pending_nodes` at the right position, or do not
// insert it if it is already in there.
if let Some(insert_pos) = state.pending_nodes.iter().position(|e| {
e.distance_with(&searched_key) >= peer.node_id.distance_with(&searched_key)
}) {
if state.pending_nodes[insert_pos] != peer.node_id {
state.pending_nodes.insert(insert_pos, peer.node_id.clone());
}
} else {
state.pending_nodes.push(peer.node_id.clone());
}
}
if state.result.len() >= num_results
|| (state.stage != Stage::FirstStep && state.current_attempts_fut.is_empty())
{
state.stage = Stage::FinishingNextIter;
} else {
if !local_nearest_node_updated {
trace!("Loop didn't update closer node ; jumping to step 2");
state.stage = Stage::SecondStep;
}
}
future::ok((Some(KadQueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state))
});
Some(future::Either::B(future))
}).filter_map(|val| val);
// Boxing the stream is not necessary, but we do it in order to improve compilation time.
Box::new(stream) as Box<_>
}

View File

@ -0,0 +1,508 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Contains a `ConnectionUpgrade` that makes it possible to send requests and receive responses
//! from nodes after the upgrade.
//!
//! # Usage
//!
//! - Create a `KadConnecConfig` object. This struct implements `ConnectionUpgrade`.
//!
//! - Update a connection through that `KadConnecConfig`. The output yields you a
//! `KadConnecController` and a stream that must be driven to completion. The controller
//! allows you to perform queries and receive responses. The stream produces incoming requests
//! from the remote.
//!
//! This `KadConnecController` is usually extracted and stored in some sort of hash map in an
//! `Arc` in order to be available whenever we need to request something from a node.
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::{future, Future, Sink, stream, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
use protocol::{self, KadMsg, KademliaProtocolConfig, KadPeer};
use std::collections::VecDeque;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
/// Configuration for a Kademlia server.
///
/// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KadConnecController`
/// and a `Future`. The controller lets you send queries to the remote and receive answers, while
/// the `Future` must be driven to completion in order for things to work.
#[derive(Debug, Clone)]
pub struct KadConnecConfig {
raw_proto: KademliaProtocolConfig,
}
impl KadConnecConfig {
/// Builds a configuration object for an upcoming Kademlia server.
#[inline]
pub fn new() -> Self {
KadConnecConfig {
raw_proto: KademliaProtocolConfig,
}
}
}
impl<C, Maf> ConnectionUpgrade<C, Maf> for KadConnecConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
{
type Output = (
KadConnecController,
Box<Stream<Item = KadIncomingRequest, Error = IoError>>,
);
type MultiaddrFuture = Maf;
type Future = future::Map<<KademliaProtocolConfig as ConnectionUpgrade<C, Maf>>::Future, fn((<KademliaProtocolConfig as ConnectionUpgrade<C, Maf>>::Output, Maf)) -> (Self::Output, Maf)>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
ConnectionUpgrade::<C, Maf>::protocol_names(&self.raw_proto)
}
#[inline]
fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: Maf) -> Self::Future {
self.raw_proto
.upgrade(incoming, id, endpoint, addr)
.map::<fn(_) -> _, _>(move |(connec, addr)| {
(build_from_sink_stream(connec), addr)
})
}
}
/// Allows sending Kademlia requests and receiving responses.
#[derive(Debug, Clone)]
pub struct KadConnecController {
// In order to send a request, we use this sender to send a tuple. The first element of the
// tuple is the message to send to the remote, and the second element is what is used to
// receive the response. If the query doesn't expect a response (eg. `PUT_VALUE`), then the
// one-shot sender will be dropped without being used.
inner: mpsc::UnboundedSender<(KadMsg, oneshot::Sender<KadMsg>)>,
}
impl KadConnecController {
/// Sends a `FIND_NODE` query to the node and provides a future that will contain the response.
// TODO: future item could be `impl Iterator` instead
pub fn find_node(
&self,
searched_key: &PeerId,
) -> impl Future<Item = Vec<KadPeer>, Error = IoError> {
let message = protocol::KadMsg::FindNodeReq {
key: searched_key.clone().into_bytes(),
};
let (tx, rx) = oneshot::channel();
match self.inner.unbounded_send((message, tx)) {
Ok(()) => (),
Err(_) => {
let fut = future::err(IoError::new(
IoErrorKind::ConnectionAborted,
"connection to remote has aborted",
));
return future::Either::B(fut);
}
};
let future = rx.map_err(|_| {
IoError::new(
IoErrorKind::ConnectionAborted,
"connection to remote has aborted",
)
}).and_then(|msg| match msg {
KadMsg::FindNodeRes { closer_peers, .. } => Ok(closer_peers),
_ => Err(IoError::new(
IoErrorKind::InvalidData,
"invalid response type received from the remote",
)),
});
future::Either::A(future)
}
/// Sends a `PING` query to the node. Because of the way the protocol is designed, there is
/// no way to differentiate between a ping and a pong. Therefore this function doesn't return a
/// future, and the only way to be notified of the result is through the stream.
pub fn ping(&self) -> Result<(), IoError> {
// Dummy channel, as the `tx` is going to be dropped anyway.
let (tx, _rx) = oneshot::channel();
match self.inner.unbounded_send((protocol::KadMsg::Ping, tx)) {
Ok(()) => Ok(()),
Err(_) => Err(IoError::new(
IoErrorKind::ConnectionAborted,
"connection to remote has aborted",
)),
}
}
}
/// Request received from the remote.
pub enum KadIncomingRequest {
/// Find the nodes closest to `searched`.
FindNode {
/// The value being searched.
searched: PeerId,
/// Object to use to respond to the request.
responder: KadFindNodeRespond,
},
// TODO: PutValue and FindValue
/// Received either a ping or a pong.
PingPong,
}
/// Object used to respond to `FindNode` queries from remotes.
pub struct KadFindNodeRespond {
inner: oneshot::Sender<KadMsg>,
}
impl KadFindNodeRespond {
/// Respond to the `FindNode` request.
pub fn respond<I>(self, peers: I)
where I: IntoIterator<Item = protocol::KadPeer>
{
let _ = self.inner.send(KadMsg::FindNodeRes {
closer_peers: peers.into_iter().collect()
});
}
}
// Builds a controller and stream from a stream/sink of raw messages.
fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + 'a>)
where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Error = IoError> + 'a
{
let (tx, rx) = mpsc::unbounded();
let future = kademlia_handler(connec, rx);
let controller = KadConnecController { inner: tx };
(controller, future)
}
// Handles a newly-opened Kademlia stream with a remote peer.
//
// Takes a `Stream` and `Sink` of Kademlia messages representing the connection to the client,
// plus a `Receiver` that will receive messages to transmit to that connection.
//
// Returns a `Stream` that must be resolved in order for progress to work. The `Stream` will
// produce objects that represent the requests sent by the remote. These requests must be answered
// immediately before the stream continues to produce items.
fn kademlia_handler<'a, S>(
kad_bistream: S,
rq_rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender<KadMsg>)>,
) -> Box<Stream<Item = KadIncomingRequest, Error = IoError> + 'a>
where
S: Stream<Item = KadMsg, Error = IoError> + Sink<SinkItem = KadMsg, SinkError = IoError> + 'a,
{
let (kad_sink, kad_stream) = kad_bistream.split();
// This is a stream of futures containing local responses.
// Every time we receive a request from the remote, we create a `oneshot::channel()` and send
// the receiving end to `responders_tx`.
// This way, if a future is available on `responders_rx`, we block until it produces the
// response.
let (responders_tx, responders_rx) = mpsc::unbounded();
// We combine all the streams into one so that the loop wakes up whenever any generates
// something.
enum EventSource {
Remote(KadMsg),
LocalRequest(KadMsg, oneshot::Sender<KadMsg>),
LocalResponse(oneshot::Receiver<KadMsg>),
Finished,
}
let events = {
let responders = responders_rx
.map(|m| EventSource::LocalResponse(m))
.map_err(|_| unreachable!());
let rq_rx = rq_rx
.map(|(m, o)| EventSource::LocalRequest(m, o))
.map_err(|_| unreachable!());
let kad_stream = kad_stream
.map(|m| EventSource::Remote(m))
.chain(future::ok(EventSource::Finished).into_stream());
responders.select(rq_rx).select(kad_stream)
};
let stream = stream::unfold((events, kad_sink, responders_tx, VecDeque::new(), 0u32, false),
move |(events, kad_sink, responders_tx, mut send_back_queue, expected_pongs, finished)| {
if finished {
return None;
}
Some(events
.into_future()
.map_err(|(err, _)| err)
.and_then(move |(message, events)| -> Box<Future<Item = _, Error = _>> {
match message {
Some(EventSource::Finished) | None => {
let future = future::ok({
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, true);
(None, state)
});
Box::new(future)
},
Some(EventSource::LocalResponse(message)) => {
let future = message
.map_err(|_| {
// The user destroyed the responder without responding.
warn!("Kad responder object destroyed without responding");
panic!() // TODO: what to do here? we have to close the connection
})
.and_then(move |message| {
kad_sink
.send(message)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
(None, state)
})
});
Box::new(future)
},
Some(EventSource::LocalRequest(message @ KadMsg::PutValue { .. }, _)) => {
// A `PutValue` request. Contrary to other types of messages, this one
// doesn't expect any answer and therefore we ignore the sender.
let future = kad_sink
.send(message)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
(None, state)
});
Box::new(future) as Box<_>
}
Some(EventSource::LocalRequest(message @ KadMsg::Ping { .. }, _)) => {
// A local `Ping` request.
let expected_pongs = expected_pongs.checked_add(1)
.expect("overflow in number of simultaneous pings");
let future = kad_sink
.send(message)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
(None, state)
});
Box::new(future) as Box<_>
}
Some(EventSource::LocalRequest(message, send_back)) => {
// Any local request other than `PutValue` or `Ping`.
send_back_queue.push_back(send_back);
let future = kad_sink
.send(message)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
(None, state)
});
Box::new(future) as Box<_>
}
Some(EventSource::Remote(KadMsg::Ping)) => {
// The way the protocol was designed, there is no way to differentiate
// between a ping and a pong.
if let Some(expected_pongs) = expected_pongs.checked_sub(1) {
// Maybe we received a PONG, or maybe we received a PONG, no way
// to tell. If it was a PING and we expected a PONG, then the
// remote will see its PING answered only when it PONGs us.
let future = future::ok({
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
let rq = KadIncomingRequest::PingPong;
(Some(rq), state)
});
Box::new(future) as Box<_>
} else {
let future = kad_sink
.send(KadMsg::Ping)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
let rq = KadIncomingRequest::PingPong;
(Some(rq), state)
});
Box::new(future) as Box<_>
}
}
Some(EventSource::Remote(message @ KadMsg::FindNodeRes { .. }))
| Some(EventSource::Remote(message @ KadMsg::GetValueRes { .. })) => {
// `FindNodeRes` or `GetValueRes` received on the socket.
// Send it back through `send_back_queue`.
if let Some(send_back) = send_back_queue.pop_front() {
let _ = send_back.send(message);
let future = future::ok({
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
(None, state)
});
Box::new(future)
} else {
debug!("Remote sent a Kad response but we didn't request anything");
let future = future::err(IoErrorKind::InvalidData.into());
Box::new(future)
}
}
Some(EventSource::Remote(KadMsg::FindNodeReq { key, .. })) => {
let peer_id = match PeerId::from_bytes(key) {
Ok(id) => id,
Err(key) => {
debug!("Ignoring FIND_NODE request with invalid key: {:?}", key);
let future = future::err(IoError::new(IoErrorKind::InvalidData, "invalid key in FIND_NODE"));
return Box::new(future);
}
};
let (tx, rx) = oneshot::channel();
let _ = responders_tx.unbounded_send(rx);
let future = future::ok({
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
let rq = KadIncomingRequest::FindNode {
searched: peer_id,
responder: KadFindNodeRespond {
inner: tx
}
};
(Some(rq), state)
});
Box::new(future)
}
Some(EventSource::Remote(KadMsg::GetValueReq { .. })) => {
warn!("GET_VALUE requests are not implemented yet");
let future = future::err(IoError::new(IoErrorKind::Other,
"GET_VALUE requests are not implemented yet"));
return Box::new(future);
}
Some(EventSource::Remote(KadMsg::PutValue { .. })) => {
warn!("PUT_VALUE requests are not implemented yet");
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
let future = future::ok((None, state));
return Box::new(future);
}
}
}))
}).filter_map(|val| val);
Box::new(stream) as Box<Stream<Item = _, Error = IoError>>
}
#[cfg(test)]
mod tests {
use std::io::Error as IoError;
use std::iter;
use futures::{Future, Poll, Sink, StartSend, Stream};
use futures::sync::mpsc;
use kad_server::{self, KadIncomingRequest, KadConnecController};
use libp2p_core::PublicKey;
use protocol::{KadConnectionType, KadPeer};
use rand;
// This struct merges a stream and a sink and is quite useful for tests.
struct Wrapper<St, Si>(St, Si);
impl<St, Si> Stream for Wrapper<St, Si>
where
St: Stream,
{
type Item = St::Item;
type Error = St::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll()
}
}
impl<St, Si> Sink for Wrapper<St, Si>
where
Si: Sink,
{
type SinkItem = Si::SinkItem;
type SinkError = Si::SinkError;
fn start_send(
&mut self,
item: Self::SinkItem,
) -> StartSend<Self::SinkItem, Self::SinkError> {
self.1.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.1.poll_complete()
}
}
fn build_test() -> (KadConnecController, impl Stream<Item = KadIncomingRequest, Error = IoError>, KadConnecController, impl Stream<Item = KadIncomingRequest, Error = IoError>) {
let (a_to_b, b_from_a) = mpsc::unbounded();
let (b_to_a, a_from_b) = mpsc::unbounded();
let sink_stream_a = Wrapper(a_from_b, a_to_b)
.map_err(|_| panic!()).sink_map_err(|_| panic!());
let sink_stream_b = Wrapper(b_from_a, b_to_a)
.map_err(|_| panic!()).sink_map_err(|_| panic!());
let (controller_a, stream_events_a) = kad_server::build_from_sink_stream(sink_stream_a);
let (controller_b, stream_events_b) = kad_server::build_from_sink_stream(sink_stream_b);
(controller_a, stream_events_a, controller_b, stream_events_b)
}
#[test]
fn ping_response() {
let (controller_a, stream_events_a, _controller_b, stream_events_b) = build_test();
controller_a.ping().unwrap();
let streams = stream_events_a.map(|ev| (ev, "a"))
.select(stream_events_b.map(|ev| (ev, "b")));
match streams.into_future().map_err(|(err, _)| err).wait().unwrap() {
(Some((KadIncomingRequest::PingPong, "b")), _) => {},
_ => panic!()
}
}
#[test]
fn find_node_response() {
let (controller_a, stream_events_a, _controller_b, stream_events_b) = build_test();
let random_peer_id = {
let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::<Vec<_>>();
PublicKey::Rsa(buf).into_peer_id()
};
let find_node_fut = controller_a.find_node(&random_peer_id);
let example_response = KadPeer {
node_id: {
let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::<Vec<_>>();
PublicKey::Rsa(buf).into_peer_id()
},
multiaddrs: Vec::new(),
connection_ty: KadConnectionType::Connected,
};
let streams = stream_events_a.map(|ev| (ev, "a"))
.select(stream_events_b.map(|ev| (ev, "b")));
let streams = match streams.into_future().map_err(|(err, _)| err).wait().unwrap() {
(Some((KadIncomingRequest::FindNode { searched, responder }, "b")), streams) => {
assert_eq!(searched, random_peer_id);
responder.respond(iter::once(example_response.clone()));
streams
},
_ => panic!()
};
let resp = streams.into_future().map_err(|(err, _)| err).map(|_| unreachable!())
.select(find_node_fut)
.map_err(|_| -> IoError { panic!() });
assert_eq!(resp.wait().unwrap().0, vec![example_response]);
}
}

View File

@ -0,0 +1,498 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Key-value storage, with a refresh and a time-to-live system.
//!
//! A k-buckets table allows one to store a value identified by keys, ordered by their distance
//! to a reference key passed to the constructor.
//!
//! If the local ID has `N` bits, then the k-buckets table contains `N` *buckets* each containing
//! a constant number of entries. Storing a key in the k-buckets table adds it to the bucket
//! corresponding to its distance with the reference key.
use arrayvec::ArrayVec;
use bigint::U512;
use libp2p_core::PeerId;
use parking_lot::{Mutex, MutexGuard};
use std::mem;
use std::slice::Iter as SliceIter;
use std::time::{Duration, Instant};
use std::vec::IntoIter as VecIntoIter;
/// Maximum number of nodes in a bucket.
pub const MAX_NODES_PER_BUCKET: usize = 20;
/// Table of k-buckets with interior mutability.
#[derive(Debug)]
pub struct KBucketsTable<Id, Val> {
my_id: Id,
tables: Vec<Mutex<KBucket<Id, Val>>>,
// The timeout when pinging the first node after which we consider that it no longer responds.
ping_timeout: Duration,
}
impl<Id, Val> Clone for KBucketsTable<Id, Val>
where
Id: Clone,
Val: Clone,
{
#[inline]
fn clone(&self) -> Self {
KBucketsTable {
my_id: self.my_id.clone(),
tables: self.tables
.iter()
.map(|t| t.lock().clone())
.map(Mutex::new)
.collect(),
ping_timeout: self.ping_timeout.clone(),
}
}
}
#[derive(Debug, Clone)]
struct KBucket<Id, Val> {
// Nodes are always ordered from oldest to newest.
// Note that we will very often move elements to the end of this. No benchmarking has been
// performed, but it is very likely that a `ArrayVec` is the most performant data structure.
nodes: ArrayVec<[Node<Id, Val>; MAX_NODES_PER_BUCKET]>,
// Node received when the bucket was full. Will be added to the list if the first node doesn't
// respond in time to our ping. The second element is the time when the pending node was added.
// If it is too much in the past, then we drop the first node and add the pending node to the
// end of the list.
pending_node: Option<(Node<Id, Val>, Instant)>,
// Last time this bucket was updated.
last_update: Instant,
}
impl<Id, Val> KBucket<Id, Val> {
// Puts the kbucket into a coherent state.
// If a node is pending and the timeout has expired, removes the first element of `nodes`
// and pushes back the node in `pending_node`.
fn flush(&mut self, timeout: Duration) {
if let Some((pending_node, instant)) = self.pending_node.take() {
if instant.elapsed() >= timeout {
let _ = self.nodes.remove(0);
self.nodes.push(pending_node);
} else {
self.pending_node = Some((pending_node, instant));
}
}
}
}
#[derive(Debug, Clone)]
struct Node<Id, Val> {
id: Id,
value: Val,
}
/// Trait that must be implemented on types that can be used as an identifier in a k-bucket.
pub trait KBucketsPeerId: Eq + Clone {
/// Distance between two peer IDs.
type Distance: Ord;
/// Computes the XOR of this value and another one.
fn distance_with(&self, other: &Self) -> Self::Distance;
/// Returns then number of bits that are necessary to store the distance between peer IDs.
/// Used for pre-allocations.
///
/// > **Note**: Returning 0 would lead to a panic.
fn num_bits() -> usize;
/// Returns the number of leading zeroes of the distance between peer IDs.
fn leading_zeros(Self::Distance) -> u32;
}
impl KBucketsPeerId for PeerId {
type Distance = U512;
#[inline]
fn num_bits() -> usize {
512
}
#[inline]
fn distance_with(&self, other: &Self) -> Self::Distance {
// Note that we don't compare the hash functions because there's no chance of collision
// of the same value hashed with two different hash functions.
let my_hash = U512::from(self.digest());
let other_hash = U512::from(other.digest());
my_hash ^ other_hash
}
#[inline]
fn leading_zeros(distance: Self::Distance) -> u32 {
distance.leading_zeros()
}
}
impl<Id, Val> KBucketsTable<Id, Val>
where
Id: KBucketsPeerId,
{
/// Builds a new routing table.
pub fn new(my_id: Id, ping_timeout: Duration) -> Self {
KBucketsTable {
my_id: my_id,
tables: (0..Id::num_bits())
.map(|_| KBucket {
nodes: ArrayVec::new(),
pending_node: None,
last_update: Instant::now(),
})
.map(Mutex::new)
.collect(),
ping_timeout: ping_timeout,
}
}
// Returns the id of the bucket that should contain the peer with the given ID.
//
// Returns `None` if out of range, which happens if `id` is the same as the local peer id.
#[inline]
fn bucket_num(&self, id: &Id) -> Option<usize> {
(Id::num_bits() - 1).checked_sub(Id::leading_zeros(self.my_id.distance_with(id)) as usize)
}
/// Returns an iterator to all the buckets of this table.
///
/// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes
/// first.
#[inline]
pub fn buckets(&self) -> BucketsIter<Id, Val> {
BucketsIter(self.tables.iter(), self.ping_timeout)
}
/// Returns the ID of the local node.
#[inline]
pub fn my_id(&self) -> &Id {
&self.my_id
}
/// Finds the `num` nodes closest to `id`, ordered by distance.
pub fn find_closest(&self, id: &Id) -> VecIntoIter<Id>
where
Id: Clone,
{
// TODO: optimize
let mut out = Vec::new();
for table in self.tables.iter() {
let mut table = table.lock();
table.flush(self.ping_timeout);
if table.last_update.elapsed() > self.ping_timeout {
continue // ignore bucket with expired nodes
}
for node in table.nodes.iter() {
out.push(node.id.clone());
}
}
out.sort_by(|a, b| b.distance_with(id).cmp(&a.distance_with(id)));
out.into_iter()
}
/// Same as `find_closest`, but includes the local peer as well.
pub fn find_closest_with_self(&self, id: &Id) -> VecIntoIter<Id>
where
Id: Clone,
{
// TODO: optimize
let mut intermediate: Vec<_> = self.find_closest(&id).collect();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(&id) >= self.my_id.distance_with(&id))
{
if intermediate[pos] != self.my_id {
intermediate.insert(pos, self.my_id.clone());
}
} else {
intermediate.push(self.my_id.clone());
}
intermediate.into_iter()
}
/// Marks the node as "most recent" in its bucket and modifies the value associated to it.
/// This function should be called whenever we receive a communication from a node.
pub fn update(&self, id: Id, value: Val) -> UpdateOutcome<Id, Val> {
let table = match self.bucket_num(&id) {
Some(n) => &self.tables[n],
None => return UpdateOutcome::FailSelfUpdate,
};
let mut table = table.lock();
table.flush(self.ping_timeout);
if let Some(pos) = table.nodes.iter().position(|n| n.id == id) {
// Node is already in the bucket.
let mut existing = table.nodes.remove(pos);
let old_val = mem::replace(&mut existing.value, value);
if pos == 0 {
// If it's the first node of the bucket that we update, then we drop the node that
// was waiting for a ping.
table.nodes.truncate(MAX_NODES_PER_BUCKET - 1);
table.pending_node = None;
}
table.nodes.push(existing);
table.last_update = Instant::now();
UpdateOutcome::Refreshed(old_val)
} else if table.nodes.len() < MAX_NODES_PER_BUCKET {
// Node not yet in the bucket, but there's plenty of space.
table.nodes.push(Node {
id: id,
value: value,
});
table.last_update = Instant::now();
UpdateOutcome::Added
} else {
// Not enough space to put the node, but we can add it to the end as "pending". We
// then need to tell the caller that we want it to ping the node at the top of the
// list.
if table.pending_node.is_none() {
table.pending_node = Some((
Node {
id: id,
value: value,
},
Instant::now(),
));
UpdateOutcome::NeedPing(table.nodes[0].id.clone())
} else {
UpdateOutcome::Discarded
}
}
}
}
/// Return value of the `update()` method.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[must_use]
pub enum UpdateOutcome<Id, Val> {
/// The node has been added to the bucket.
Added,
/// The node was already in the bucket and has been refreshed.
Refreshed(Val),
/// The node wasn't added. Instead we need to ping the node passed as parameter, and call
/// `update` if it responds.
NeedPing(Id),
/// The node wasn't added at all because a node was already pending.
Discarded,
/// Tried to update the local peer ID. This is an invalid operation.
FailSelfUpdate,
}
/// Iterator giving access to a bucket.
pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIter<'a, Mutex<KBucket<Id, Val>>>, Duration);
impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> {
type Item = Bucket<'a, Id, Val>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|bucket| {
let mut bucket = bucket.lock();
bucket.flush(self.1);
Bucket(bucket)
})
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}
impl<'a, Id: 'a, Val: 'a> ExactSizeIterator for BucketsIter<'a, Id, Val> {}
/// Access to a bucket.
pub struct Bucket<'a, Id: 'a, Val: 'a>(MutexGuard<'a, KBucket<Id, Val>>);
impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> {
/// Returns the number of entries in that bucket.
///
/// > **Note**: Keep in mind that this operation can be racy. If `update()` is called on the
/// > table while this function is running, the `update()` may or may not be taken
/// > into account.
#[inline]
pub fn num_entries(&self) -> usize {
self.0.nodes.len()
}
/// Returns true if this bucket has a pending node.
#[inline]
pub fn has_pending(&self) -> bool {
self.0.pending_node.is_some()
}
/// Returns the time when any of the values in this bucket was last updated.
///
/// If the bucket is empty, this returns the time when the whole table was created.
#[inline]
pub fn last_update(&self) -> Instant {
self.0.last_update.clone()
}
}
#[cfg(test)]
mod tests {
extern crate rand;
use self::rand::random;
use kbucket::{KBucketsTable, UpdateOutcome, MAX_NODES_PER_BUCKET};
use libp2p_core::PeerId;
use std::thread;
use std::time::Duration;
#[test]
fn basic_closest() {
let my_id = {
let mut bytes = vec![random(); 34];
bytes[0] = 18;
bytes[1] = 32;
PeerId::from_bytes(bytes).unwrap()
};
let other_id = {
let mut bytes = vec![random(); 34];
bytes[0] = 18;
bytes[1] = 32;
PeerId::from_bytes(bytes).unwrap()
};
let table = KBucketsTable::new(my_id, Duration::from_secs(5));
let _ = table.update(other_id.clone(), ());
let res = table.find_closest(&other_id).collect::<Vec<_>>();
assert_eq!(res.len(), 1);
assert_eq!(res[0], other_id);
}
#[test]
fn update_local_id_fails() {
let my_id = {
let mut bytes = vec![random(); 34];
bytes[0] = 18;
bytes[1] = 32;
PeerId::from_bytes(bytes).unwrap()
};
let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5));
match table.update(my_id, ()) {
UpdateOutcome::FailSelfUpdate => (),
_ => panic!()
}
}
#[test]
fn update_time_last_refresh() {
let my_id = {
let mut bytes = vec![random(); 34];
bytes[0] = 18;
bytes[1] = 32;
PeerId::from_bytes(bytes).unwrap()
};
// Generate some other IDs varying by just one bit.
let other_ids = (0..random::<usize>() % 20)
.map(|_| {
let bit_num = random::<usize>() % 256;
let mut id = my_id.as_bytes().to_vec().clone();
id[33 - (bit_num / 8)] ^= 1 << (bit_num % 8);
(PeerId::from_bytes(id).unwrap(), bit_num)
})
.collect::<Vec<_>>();
let table = KBucketsTable::new(my_id, Duration::from_secs(5));
let before_update = table.buckets().map(|b| b.last_update()).collect::<Vec<_>>();
thread::sleep(Duration::from_secs(2));
for &(ref id, _) in &other_ids {
let _ = table.update(id.clone(), ());
}
let after_update = table.buckets().map(|b| b.last_update()).collect::<Vec<_>>();
for (offset, (bef, aft)) in before_update.iter().zip(after_update.iter()).enumerate() {
if other_ids.iter().any(|&(_, bucket)| bucket == offset) {
assert_ne!(bef, aft);
} else {
assert_eq!(bef, aft);
}
}
}
#[test]
fn full_kbucket() {
let my_id = {
let mut bytes = vec![random(); 34];
bytes[0] = 18;
bytes[1] = 32;
PeerId::from_bytes(bytes).unwrap()
};
assert!(MAX_NODES_PER_BUCKET <= 251); // Test doesn't work otherwise.
let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3)
.map(|n| {
let mut id = my_id.clone().into_bytes();
id[2] ^= 0x80; // Flip the first bit so that we get in the most distant bucket.
id[33] = id[33].wrapping_add(n as u8);
PeerId::from_bytes(id).unwrap()
})
.collect::<Vec<_>>();
let first_node = fill_ids[0].clone();
let second_node = fill_ids[1].clone();
let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1));
for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() {
assert_eq!(table.update(id, ()), UpdateOutcome::Added);
assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1);
}
assert_eq!(
table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET
);
assert!(!table.buckets().nth(255).unwrap().has_pending());
assert_eq!(
table.update(fill_ids.remove(0), ()),
UpdateOutcome::NeedPing(first_node)
);
assert_eq!(
table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET
);
assert!(table.buckets().nth(255).unwrap().has_pending());
assert_eq!(
table.update(fill_ids.remove(0), ()),
UpdateOutcome::Discarded
);
thread::sleep(Duration::from_secs(2));
assert!(!table.buckets().nth(255).unwrap().has_pending());
assert_eq!(
table.update(fill_ids.remove(0), ()),
UpdateOutcome::NeedPing(second_node)
);
}
}

89
protocols/kad/src/lib.rs Normal file
View File

@ -0,0 +1,89 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Kademlia protocol. Allows peer discovery, records store and records fetch.
//!
//! # Usage
//!
//! Usage is done in the following steps:
//!
//! - Build a `KadSystemConfig` and a `KadConnecConfig` object that contain the way you want the
//! Kademlia protocol to behave.
//!
//! - Create a swarm that upgrades incoming connections with the `KadConnecConfig`.
//!
//! - Build a `KadSystem` from the `KadSystemConfig`. This requires passing a closure that provides
//! the Kademlia controller of a peer.
//!
//! - You can perform queries using the `KadSystem`.
//!
// TODO: we allow dead_code for now because this library contains a lot of unused code that will
// be useful later for record store
#![allow(dead_code)]
// # Crate organization
//
// The crate contains three levels of abstractions over the Kademlia protocol.
//
// - The first level of abstraction is in `protocol`. The API of this module lets you turn a raw
// bytes stream (`AsyncRead + AsyncWrite`) into a `Sink + Stream` of raw but strongly-typed
// Kademlia messages.
//
// - The second level of abstraction is in `kad_server`. Its API lets you upgrade a connection and
// obtain a future (that must be driven to completion), plus a controller. Processing the future
// will automatically respond to Kad requests received by the remote. The controller lets you
// send your own requests to this remote and obtain strongly-typed responses.
//
// - The third level of abstraction is in `high_level`. This module only provides the
// `KademliaSystem`.
//
extern crate arrayvec;
extern crate bigint;
extern crate bs58;
extern crate bytes;
extern crate datastore;
extern crate fnv;
extern crate futures;
extern crate libp2p_identify;
extern crate libp2p_ping;
extern crate libp2p_core;
#[macro_use]
extern crate log;
extern crate multiaddr;
extern crate parking_lot;
extern crate protobuf;
extern crate rand;
extern crate smallvec;
extern crate tokio_codec;
extern crate tokio_io;
extern crate tokio_timer;
extern crate unsigned_varint;
pub use self::high_level::{KadSystemConfig, KadSystem, KadQueryEvent};
pub use self::kad_server::{KadConnecController, KadConnecConfig, KadIncomingRequest, KadFindNodeRespond};
pub use self::protocol::{KadConnectionType, KadPeer};
mod high_level;
mod kad_server;
mod kbucket;
mod protobuf_structs;
mod protocol;

View File

@ -0,0 +1,900 @@
// This file is generated by rust-protobuf 2.0.2. Do not edit
// @generated
// https://github.com/Manishearth/rust-clippy/issues/702
#![allow(unknown_lints)]
#![allow(clippy)]
#![cfg_attr(rustfmt, rustfmt_skip)]
#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(trivial_casts)]
#![allow(unsafe_code)]
#![allow(unused_imports)]
#![allow(unused_results)]
use protobuf::Message as Message_imported_for_functions;
use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;
#[derive(PartialEq,Clone,Default)]
pub struct Message {
// message fields
field_type: ::std::option::Option<Message_MessageType>,
clusterLevelRaw: ::std::option::Option<i32>,
key: ::protobuf::SingularField<::std::vec::Vec<u8>>,
record: ::protobuf::SingularPtrField<super::record::Record>,
closerPeers: ::protobuf::RepeatedField<Message_Peer>,
providerPeers: ::protobuf::RepeatedField<Message_Peer>,
// special fields
unknown_fields: ::protobuf::UnknownFields,
cached_size: ::protobuf::CachedSize,
}
impl Message {
pub fn new() -> Message {
::std::default::Default::default()
}
// optional .dht.pb.Message.MessageType type = 1;
pub fn clear_field_type(&mut self) {
self.field_type = ::std::option::Option::None;
}
pub fn has_field_type(&self) -> bool {
self.field_type.is_some()
}
// Param is passed by value, moved
pub fn set_field_type(&mut self, v: Message_MessageType) {
self.field_type = ::std::option::Option::Some(v);
}
pub fn get_field_type(&self) -> Message_MessageType {
self.field_type.unwrap_or(Message_MessageType::PUT_VALUE)
}
// optional int32 clusterLevelRaw = 10;
pub fn clear_clusterLevelRaw(&mut self) {
self.clusterLevelRaw = ::std::option::Option::None;
}
pub fn has_clusterLevelRaw(&self) -> bool {
self.clusterLevelRaw.is_some()
}
// Param is passed by value, moved
pub fn set_clusterLevelRaw(&mut self, v: i32) {
self.clusterLevelRaw = ::std::option::Option::Some(v);
}
pub fn get_clusterLevelRaw(&self) -> i32 {
self.clusterLevelRaw.unwrap_or(0)
}
// optional bytes key = 2;
pub fn clear_key(&mut self) {
self.key.clear();
}
pub fn has_key(&self) -> bool {
self.key.is_some()
}
// Param is passed by value, moved
pub fn set_key(&mut self, v: ::std::vec::Vec<u8>) {
self.key = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_key(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.key.is_none() {
self.key.set_default();
}
self.key.as_mut().unwrap()
}
// Take field
pub fn take_key(&mut self) -> ::std::vec::Vec<u8> {
self.key.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
pub fn get_key(&self) -> &[u8] {
match self.key.as_ref() {
Some(v) => &v,
None => &[],
}
}
// optional .record.pb.Record record = 3;
pub fn clear_record(&mut self) {
self.record.clear();
}
pub fn has_record(&self) -> bool {
self.record.is_some()
}
// Param is passed by value, moved
pub fn set_record(&mut self, v: super::record::Record) {
self.record = ::protobuf::SingularPtrField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_record(&mut self) -> &mut super::record::Record {
if self.record.is_none() {
self.record.set_default();
}
self.record.as_mut().unwrap()
}
// Take field
pub fn take_record(&mut self) -> super::record::Record {
self.record.take().unwrap_or_else(|| super::record::Record::new())
}
pub fn get_record(&self) -> &super::record::Record {
self.record.as_ref().unwrap_or_else(|| super::record::Record::default_instance())
}
// repeated .dht.pb.Message.Peer closerPeers = 8;
pub fn clear_closerPeers(&mut self) {
self.closerPeers.clear();
}
// Param is passed by value, moved
pub fn set_closerPeers(&mut self, v: ::protobuf::RepeatedField<Message_Peer>) {
self.closerPeers = v;
}
// Mutable pointer to the field.
pub fn mut_closerPeers(&mut self) -> &mut ::protobuf::RepeatedField<Message_Peer> {
&mut self.closerPeers
}
// Take field
pub fn take_closerPeers(&mut self) -> ::protobuf::RepeatedField<Message_Peer> {
::std::mem::replace(&mut self.closerPeers, ::protobuf::RepeatedField::new())
}
pub fn get_closerPeers(&self) -> &[Message_Peer] {
&self.closerPeers
}
// repeated .dht.pb.Message.Peer providerPeers = 9;
pub fn clear_providerPeers(&mut self) {
self.providerPeers.clear();
}
// Param is passed by value, moved
pub fn set_providerPeers(&mut self, v: ::protobuf::RepeatedField<Message_Peer>) {
self.providerPeers = v;
}
// Mutable pointer to the field.
pub fn mut_providerPeers(&mut self) -> &mut ::protobuf::RepeatedField<Message_Peer> {
&mut self.providerPeers
}
// Take field
pub fn take_providerPeers(&mut self) -> ::protobuf::RepeatedField<Message_Peer> {
::std::mem::replace(&mut self.providerPeers, ::protobuf::RepeatedField::new())
}
pub fn get_providerPeers(&self) -> &[Message_Peer] {
&self.providerPeers
}
}
impl ::protobuf::Message for Message {
fn is_initialized(&self) -> bool {
for v in &self.record {
if !v.is_initialized() {
return false;
}
};
for v in &self.closerPeers {
if !v.is_initialized() {
return false;
}
};
for v in &self.providerPeers {
if !v.is_initialized() {
return false;
}
};
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_proto2_enum_with_unknown_fields_into(wire_type, is, &mut self.field_type, 1, &mut self.unknown_fields)?
},
10 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
let tmp = is.read_int32()?;
self.clusterLevelRaw = ::std::option::Option::Some(tmp);
},
2 => {
::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.key)?;
},
3 => {
::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.record)?;
},
8 => {
::protobuf::rt::read_repeated_message_into(wire_type, is, &mut self.closerPeers)?;
},
9 => {
::protobuf::rt::read_repeated_message_into(wire_type, is, &mut self.providerPeers)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if let Some(v) = self.field_type {
my_size += ::protobuf::rt::enum_size(1, v);
}
if let Some(v) = self.clusterLevelRaw {
my_size += ::protobuf::rt::value_size(10, v, ::protobuf::wire_format::WireTypeVarint);
}
if let Some(ref v) = self.key.as_ref() {
my_size += ::protobuf::rt::bytes_size(2, &v);
}
if let Some(ref v) = self.record.as_ref() {
let len = v.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
}
for value in &self.closerPeers {
let len = value.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
};
for value in &self.providerPeers {
let len = value.compute_size();
my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len;
};
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> {
if let Some(v) = self.field_type {
os.write_enum(1, v.value())?;
}
if let Some(v) = self.clusterLevelRaw {
os.write_int32(10, v)?;
}
if let Some(ref v) = self.key.as_ref() {
os.write_bytes(2, &v)?;
}
if let Some(ref v) = self.record.as_ref() {
os.write_tag(3, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
}
for v in &self.closerPeers {
os.write_tag(8, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
};
for v in &self.providerPeers {
os.write_tag(9, ::protobuf::wire_format::WireTypeLengthDelimited)?;
os.write_raw_varint32(v.get_cached_size())?;
v.write_to_with_cached_sizes(os)?;
};
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &::std::any::Any {
self as &::std::any::Any
}
fn as_any_mut(&mut self) -> &mut ::std::any::Any {
self as &mut ::std::any::Any
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<::std::any::Any> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> Message {
Message::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeEnum<Message_MessageType>>(
"type",
|m: &Message| { &m.field_type },
|m: &mut Message| { &mut m.field_type },
));
fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeInt32>(
"clusterLevelRaw",
|m: &Message| { &m.clusterLevelRaw },
|m: &mut Message| { &mut m.clusterLevelRaw },
));
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"key",
|m: &Message| { &m.key },
|m: &mut Message| { &mut m.key },
));
fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<super::record::Record>>(
"record",
|m: &Message| { &m.record },
|m: &mut Message| { &mut m.record },
));
fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<Message_Peer>>(
"closerPeers",
|m: &Message| { &m.closerPeers },
|m: &mut Message| { &mut m.closerPeers },
));
fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage<Message_Peer>>(
"providerPeers",
|m: &Message| { &m.providerPeers },
|m: &mut Message| { &mut m.providerPeers },
));
::protobuf::reflect::MessageDescriptor::new::<Message>(
"Message",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static Message {
static mut instance: ::protobuf::lazy::Lazy<Message> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const Message,
};
unsafe {
instance.get(Message::new)
}
}
}
impl ::protobuf::Clear for Message {
fn clear(&mut self) {
self.clear_field_type();
self.clear_clusterLevelRaw();
self.clear_key();
self.clear_record();
self.clear_closerPeers();
self.clear_providerPeers();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for Message {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for Message {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
#[derive(PartialEq,Clone,Default)]
pub struct Message_Peer {
// message fields
id: ::protobuf::SingularField<::std::vec::Vec<u8>>,
addrs: ::protobuf::RepeatedField<::std::vec::Vec<u8>>,
connection: ::std::option::Option<Message_ConnectionType>,
// special fields
unknown_fields: ::protobuf::UnknownFields,
cached_size: ::protobuf::CachedSize,
}
impl Message_Peer {
pub fn new() -> Message_Peer {
::std::default::Default::default()
}
// optional bytes id = 1;
pub fn clear_id(&mut self) {
self.id.clear();
}
pub fn has_id(&self) -> bool {
self.id.is_some()
}
// Param is passed by value, moved
pub fn set_id(&mut self, v: ::std::vec::Vec<u8>) {
self.id = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_id(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.id.is_none() {
self.id.set_default();
}
self.id.as_mut().unwrap()
}
// Take field
pub fn take_id(&mut self) -> ::std::vec::Vec<u8> {
self.id.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
pub fn get_id(&self) -> &[u8] {
match self.id.as_ref() {
Some(v) => &v,
None => &[],
}
}
// repeated bytes addrs = 2;
pub fn clear_addrs(&mut self) {
self.addrs.clear();
}
// Param is passed by value, moved
pub fn set_addrs(&mut self, v: ::protobuf::RepeatedField<::std::vec::Vec<u8>>) {
self.addrs = v;
}
// Mutable pointer to the field.
pub fn mut_addrs(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec<u8>> {
&mut self.addrs
}
// Take field
pub fn take_addrs(&mut self) -> ::protobuf::RepeatedField<::std::vec::Vec<u8>> {
::std::mem::replace(&mut self.addrs, ::protobuf::RepeatedField::new())
}
pub fn get_addrs(&self) -> &[::std::vec::Vec<u8>] {
&self.addrs
}
// optional .dht.pb.Message.ConnectionType connection = 3;
pub fn clear_connection(&mut self) {
self.connection = ::std::option::Option::None;
}
pub fn has_connection(&self) -> bool {
self.connection.is_some()
}
// Param is passed by value, moved
pub fn set_connection(&mut self, v: Message_ConnectionType) {
self.connection = ::std::option::Option::Some(v);
}
pub fn get_connection(&self) -> Message_ConnectionType {
self.connection.unwrap_or(Message_ConnectionType::NOT_CONNECTED)
}
}
impl ::protobuf::Message for Message_Peer {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.id)?;
},
2 => {
::protobuf::rt::read_repeated_bytes_into(wire_type, is, &mut self.addrs)?;
},
3 => {
::protobuf::rt::read_proto2_enum_with_unknown_fields_into(wire_type, is, &mut self.connection, 3, &mut self.unknown_fields)?
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if let Some(ref v) = self.id.as_ref() {
my_size += ::protobuf::rt::bytes_size(1, &v);
}
for value in &self.addrs {
my_size += ::protobuf::rt::bytes_size(2, &value);
};
if let Some(v) = self.connection {
my_size += ::protobuf::rt::enum_size(3, v);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> {
if let Some(ref v) = self.id.as_ref() {
os.write_bytes(1, &v)?;
}
for v in &self.addrs {
os.write_bytes(2, &v)?;
};
if let Some(v) = self.connection {
os.write_enum(3, v.value())?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &::std::any::Any {
self as &::std::any::Any
}
fn as_any_mut(&mut self) -> &mut ::std::any::Any {
self as &mut ::std::any::Any
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<::std::any::Any> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> Message_Peer {
Message_Peer::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"id",
|m: &Message_Peer| { &m.id },
|m: &mut Message_Peer| { &mut m.id },
));
fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"addrs",
|m: &Message_Peer| { &m.addrs },
|m: &mut Message_Peer| { &mut m.addrs },
));
fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeEnum<Message_ConnectionType>>(
"connection",
|m: &Message_Peer| { &m.connection },
|m: &mut Message_Peer| { &mut m.connection },
));
::protobuf::reflect::MessageDescriptor::new::<Message_Peer>(
"Message_Peer",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static Message_Peer {
static mut instance: ::protobuf::lazy::Lazy<Message_Peer> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const Message_Peer,
};
unsafe {
instance.get(Message_Peer::new)
}
}
}
impl ::protobuf::Clear for Message_Peer {
fn clear(&mut self) {
self.clear_id();
self.clear_addrs();
self.clear_connection();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for Message_Peer {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for Message_Peer {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum Message_MessageType {
PUT_VALUE = 0,
GET_VALUE = 1,
ADD_PROVIDER = 2,
GET_PROVIDERS = 3,
FIND_NODE = 4,
PING = 5,
}
impl ::protobuf::ProtobufEnum for Message_MessageType {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<Message_MessageType> {
match value {
0 => ::std::option::Option::Some(Message_MessageType::PUT_VALUE),
1 => ::std::option::Option::Some(Message_MessageType::GET_VALUE),
2 => ::std::option::Option::Some(Message_MessageType::ADD_PROVIDER),
3 => ::std::option::Option::Some(Message_MessageType::GET_PROVIDERS),
4 => ::std::option::Option::Some(Message_MessageType::FIND_NODE),
5 => ::std::option::Option::Some(Message_MessageType::PING),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [Message_MessageType] = &[
Message_MessageType::PUT_VALUE,
Message_MessageType::GET_VALUE,
Message_MessageType::ADD_PROVIDER,
Message_MessageType::GET_PROVIDERS,
Message_MessageType::FIND_NODE,
Message_MessageType::PING,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::EnumDescriptor,
};
unsafe {
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new("Message_MessageType", file_descriptor_proto())
})
}
}
}
impl ::std::marker::Copy for Message_MessageType {
}
impl ::protobuf::reflect::ProtobufValue for Message_MessageType {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor())
}
}
#[derive(Clone,PartialEq,Eq,Debug,Hash)]
pub enum Message_ConnectionType {
NOT_CONNECTED = 0,
CONNECTED = 1,
CAN_CONNECT = 2,
CANNOT_CONNECT = 3,
}
impl ::protobuf::ProtobufEnum for Message_ConnectionType {
fn value(&self) -> i32 {
*self as i32
}
fn from_i32(value: i32) -> ::std::option::Option<Message_ConnectionType> {
match value {
0 => ::std::option::Option::Some(Message_ConnectionType::NOT_CONNECTED),
1 => ::std::option::Option::Some(Message_ConnectionType::CONNECTED),
2 => ::std::option::Option::Some(Message_ConnectionType::CAN_CONNECT),
3 => ::std::option::Option::Some(Message_ConnectionType::CANNOT_CONNECT),
_ => ::std::option::Option::None
}
}
fn values() -> &'static [Self] {
static values: &'static [Message_ConnectionType] = &[
Message_ConnectionType::NOT_CONNECTED,
Message_ConnectionType::CONNECTED,
Message_ConnectionType::CAN_CONNECT,
Message_ConnectionType::CANNOT_CONNECT,
];
values
}
fn enum_descriptor_static() -> &'static ::protobuf::reflect::EnumDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::EnumDescriptor,
};
unsafe {
descriptor.get(|| {
::protobuf::reflect::EnumDescriptor::new("Message_ConnectionType", file_descriptor_proto())
})
}
}
}
impl ::std::marker::Copy for Message_ConnectionType {
}
impl ::protobuf::reflect::ProtobufValue for Message_ConnectionType {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor())
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\tdht.proto\x12\x06dht.pb\x1a\x0crecord.proto\"\xc7\x04\n\x07Message\
\x12/\n\x04type\x18\x01\x20\x01(\x0e2\x1b.dht.pb.Message.MessageTypeR\
\x04type\x12(\n\x0fclusterLevelRaw\x18\n\x20\x01(\x05R\x0fclusterLevelRa\
w\x12\x10\n\x03key\x18\x02\x20\x01(\x0cR\x03key\x12)\n\x06record\x18\x03\
\x20\x01(\x0b2\x11.record.pb.RecordR\x06record\x126\n\x0bcloserPeers\x18\
\x08\x20\x03(\x0b2\x14.dht.pb.Message.PeerR\x0bcloserPeers\x12:\n\rprovi\
derPeers\x18\t\x20\x03(\x0b2\x14.dht.pb.Message.PeerR\rproviderPeers\x1a\
l\n\x04Peer\x12\x0e\n\x02id\x18\x01\x20\x01(\x0cR\x02id\x12\x14\n\x05add\
rs\x18\x02\x20\x03(\x0cR\x05addrs\x12>\n\nconnection\x18\x03\x20\x01(\
\x0e2\x1e.dht.pb.Message.ConnectionTypeR\nconnection\"i\n\x0bMessageType\
\x12\r\n\tPUT_VALUE\x10\0\x12\r\n\tGET_VALUE\x10\x01\x12\x10\n\x0cADD_PR\
OVIDER\x10\x02\x12\x11\n\rGET_PROVIDERS\x10\x03\x12\r\n\tFIND_NODE\x10\
\x04\x12\x08\n\x04PING\x10\x05\"W\n\x0eConnectionType\x12\x11\n\rNOT_CON\
NECTED\x10\0\x12\r\n\tCONNECTED\x10\x01\x12\x0f\n\x0bCAN_CONNECT\x10\x02\
\x12\x12\n\x0eCANNOT_CONNECT\x10\x03J\xc9\x10\n\x06\x12\x04\0\0>\x01\n\
\x08\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\x08\x0e\n\t\n\
\x02\x03\0\x12\x03\x03\x07\x15\n\n\n\x02\x04\0\x12\x04\x05\0>\x01\n\n\n\
\x03\x04\0\x01\x12\x03\x05\x08\x0f\n\x0c\n\x04\x04\0\x04\0\x12\x04\x06\
\x08\r\t\n\x0c\n\x05\x04\0\x04\0\x01\x12\x03\x06\r\x18\n\r\n\x06\x04\0\
\x04\0\x02\0\x12\x03\x07\x10\x1e\n\x0e\n\x07\x04\0\x04\0\x02\0\x01\x12\
\x03\x07\x10\x19\n\x0e\n\x07\x04\0\x04\0\x02\0\x02\x12\x03\x07\x1c\x1d\n\
\r\n\x06\x04\0\x04\0\x02\x01\x12\x03\x08\x10\x1e\n\x0e\n\x07\x04\0\x04\0\
\x02\x01\x01\x12\x03\x08\x10\x19\n\x0e\n\x07\x04\0\x04\0\x02\x01\x02\x12\
\x03\x08\x1c\x1d\n\r\n\x06\x04\0\x04\0\x02\x02\x12\x03\t\x10!\n\x0e\n\
\x07\x04\0\x04\0\x02\x02\x01\x12\x03\t\x10\x1c\n\x0e\n\x07\x04\0\x04\0\
\x02\x02\x02\x12\x03\t\x1f\x20\n\r\n\x06\x04\0\x04\0\x02\x03\x12\x03\n\
\x10\"\n\x0e\n\x07\x04\0\x04\0\x02\x03\x01\x12\x03\n\x10\x1d\n\x0e\n\x07\
\x04\0\x04\0\x02\x03\x02\x12\x03\n\x20!\n\r\n\x06\x04\0\x04\0\x02\x04\
\x12\x03\x0b\x10\x1e\n\x0e\n\x07\x04\0\x04\0\x02\x04\x01\x12\x03\x0b\x10\
\x19\n\x0e\n\x07\x04\0\x04\0\x02\x04\x02\x12\x03\x0b\x1c\x1d\n\r\n\x06\
\x04\0\x04\0\x02\x05\x12\x03\x0c\x10\x19\n\x0e\n\x07\x04\0\x04\0\x02\x05\
\x01\x12\x03\x0c\x10\x14\n\x0e\n\x07\x04\0\x04\0\x02\x05\x02\x12\x03\x0c\
\x17\x18\n\x0c\n\x04\x04\0\x04\x01\x12\x04\x0f\x08\x1c\t\n\x0c\n\x05\x04\
\0\x04\x01\x01\x12\x03\x0f\r\x1b\n^\n\x06\x04\0\x04\x01\x02\0\x12\x03\
\x11\x10\"\x1aO\x20sender\x20does\x20not\x20have\x20a\x20connection\x20t\
o\x20peer,\x20and\x20no\x20extra\x20information\x20(default)\n\n\x0e\n\
\x07\x04\0\x04\x01\x02\0\x01\x12\x03\x11\x10\x1d\n\x0e\n\x07\x04\0\x04\
\x01\x02\0\x02\x12\x03\x11\x20!\n5\n\x06\x04\0\x04\x01\x02\x01\x12\x03\
\x14\x10\x1e\x1a&\x20sender\x20has\x20a\x20live\x20connection\x20to\x20p\
eer\n\n\x0e\n\x07\x04\0\x04\x01\x02\x01\x01\x12\x03\x14\x10\x19\n\x0e\n\
\x07\x04\0\x04\x01\x02\x01\x02\x12\x03\x14\x1c\x1d\n2\n\x06\x04\0\x04\
\x01\x02\x02\x12\x03\x17\x10\x20\x1a#\x20sender\x20recently\x20connected\
\x20to\x20peer\n\n\x0e\n\x07\x04\0\x04\x01\x02\x02\x01\x12\x03\x17\x10\
\x1b\n\x0e\n\x07\x04\0\x04\x01\x02\x02\x02\x12\x03\x17\x1e\x1f\n\xa7\x01\
\n\x06\x04\0\x04\x01\x02\x03\x12\x03\x1b\x10#\x1a\x97\x01\x20sender\x20r\
ecently\x20tried\x20to\x20connect\x20to\x20peer\x20repeatedly\x20but\x20\
failed\x20to\x20connect\n\x20(\"try\"\x20here\x20is\x20loose,\x20but\x20\
this\x20should\x20signal\x20\"made\x20strong\x20effort,\x20failed\")\n\n\
\x0e\n\x07\x04\0\x04\x01\x02\x03\x01\x12\x03\x1b\x10\x1e\n\x0e\n\x07\x04\
\0\x04\x01\x02\x03\x02\x12\x03\x1b!\"\n\x0c\n\x04\x04\0\x03\0\x12\x04\
\x1e\x08'\t\n\x0c\n\x05\x04\0\x03\0\x01\x12\x03\x1e\x10\x14\n$\n\x06\x04\
\0\x03\0\x02\0\x12\x03\x20\x10&\x1a\x15\x20ID\x20of\x20a\x20given\x20pee\
r.\n\n\x0e\n\x07\x04\0\x03\0\x02\0\x04\x12\x03\x20\x10\x18\n\x0e\n\x07\
\x04\0\x03\0\x02\0\x05\x12\x03\x20\x19\x1e\n\x0e\n\x07\x04\0\x03\0\x02\0\
\x01\x12\x03\x20\x1f!\n\x0e\n\x07\x04\0\x03\0\x02\0\x03\x12\x03\x20$%\n,\
\n\x06\x04\0\x03\0\x02\x01\x12\x03#\x10)\x1a\x1d\x20multiaddrs\x20for\
\x20a\x20given\x20peer\n\n\x0e\n\x07\x04\0\x03\0\x02\x01\x04\x12\x03#\
\x10\x18\n\x0e\n\x07\x04\0\x03\0\x02\x01\x05\x12\x03#\x19\x1e\n\x0e\n\
\x07\x04\0\x03\0\x02\x01\x01\x12\x03#\x1f$\n\x0e\n\x07\x04\0\x03\0\x02\
\x01\x03\x12\x03#'(\nP\n\x06\x04\0\x03\0\x02\x02\x12\x03&\x107\x1aA\x20u\
sed\x20to\x20signal\x20the\x20sender's\x20connection\x20capabilities\x20\
to\x20the\x20peer\n\n\x0e\n\x07\x04\0\x03\0\x02\x02\x04\x12\x03&\x10\x18\
\n\x0e\n\x07\x04\0\x03\0\x02\x02\x06\x12\x03&\x19'\n\x0e\n\x07\x04\0\x03\
\0\x02\x02\x01\x12\x03&(2\n\x0e\n\x07\x04\0\x03\0\x02\x02\x03\x12\x03&56\
\n2\n\x04\x04\0\x02\0\x12\x03*\x08&\x1a%\x20defines\x20what\x20type\x20o\
f\x20message\x20it\x20is.\n\n\x0c\n\x05\x04\0\x02\0\x04\x12\x03*\x08\x10\
\n\x0c\n\x05\x04\0\x02\0\x06\x12\x03*\x11\x1c\n\x0c\n\x05\x04\0\x02\0\
\x01\x12\x03*\x1d!\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03*$%\nO\n\x04\x04\0\
\x02\x01\x12\x03-\x08,\x1aB\x20defines\x20what\x20coral\x20cluster\x20le\
vel\x20this\x20query/response\x20belongs\x20to.\n\n\x0c\n\x05\x04\0\x02\
\x01\x04\x12\x03-\x08\x10\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03-\x11\x16\
\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03-\x17&\n\x0c\n\x05\x04\0\x02\x01\
\x03\x12\x03-)+\nw\n\x04\x04\0\x02\x02\x12\x031\x08\x1f\x1aj\x20Used\x20\
to\x20specify\x20the\x20key\x20associated\x20with\x20this\x20message.\n\
\x20PUT_VALUE,\x20GET_VALUE,\x20ADD_PROVIDER,\x20GET_PROVIDERS\n\n\x0c\n\
\x05\x04\0\x02\x02\x04\x12\x031\x08\x10\n\x0c\n\x05\x04\0\x02\x02\x05\
\x12\x031\x11\x16\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x031\x17\x1a\n\x0c\n\
\x05\x04\0\x02\x02\x03\x12\x031\x1d\x1e\n;\n\x04\x04\0\x02\x03\x12\x035\
\x08-\x1a.\x20Used\x20to\x20return\x20a\x20value\n\x20PUT_VALUE,\x20GET_\
VALUE\n\n\x0c\n\x05\x04\0\x02\x03\x04\x12\x035\x08\x10\n\x0c\n\x05\x04\0\
\x02\x03\x06\x12\x035\x11!\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x035\"(\n\
\x0c\n\x05\x04\0\x02\x03\x03\x12\x035+,\nc\n\x04\x04\0\x02\x04\x12\x039\
\x08&\x1aV\x20Used\x20to\x20return\x20peers\x20closer\x20to\x20a\x20key\
\x20in\x20a\x20query\n\x20GET_VALUE,\x20GET_PROVIDERS,\x20FIND_NODE\n\n\
\x0c\n\x05\x04\0\x02\x04\x04\x12\x039\x08\x10\n\x0c\n\x05\x04\0\x02\x04\
\x06\x12\x039\x11\x15\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x039\x16!\n\x0c\
\n\x05\x04\0\x02\x04\x03\x12\x039$%\nO\n\x04\x04\0\x02\x05\x12\x03=\x08(\
\x1aB\x20Used\x20to\x20return\x20Providers\n\x20GET_VALUE,\x20ADD_PROVID\
ER,\x20GET_PROVIDERS\n\n\x0c\n\x05\x04\0\x02\x05\x04\x12\x03=\x08\x10\n\
\x0c\n\x05\x04\0\x02\x05\x06\x12\x03=\x11\x15\n\x0c\n\x05\x04\0\x02\x05\
\x01\x12\x03=\x16#\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03=&'\
";
static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto,
};
fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap()
}
pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
unsafe {
file_descriptor_proto_lazy.get(|| {
parse_descriptor_proto()
})
}
}

View File

@ -0,0 +1,22 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
pub mod dht;
pub mod record;

View File

@ -0,0 +1,453 @@
// This file is generated by rust-protobuf 2.0.2. Do not edit
// @generated
// https://github.com/Manishearth/rust-clippy/issues/702
#![allow(unknown_lints)]
#![allow(clippy)]
#![cfg_attr(rustfmt, rustfmt_skip)]
#![allow(box_pointers)]
#![allow(dead_code)]
#![allow(missing_docs)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(trivial_casts)]
#![allow(unsafe_code)]
#![allow(unused_imports)]
#![allow(unused_results)]
use protobuf::Message as Message_imported_for_functions;
use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions;
#[derive(PartialEq,Clone,Default)]
pub struct Record {
// message fields
key: ::protobuf::SingularField<::std::string::String>,
value: ::protobuf::SingularField<::std::vec::Vec<u8>>,
author: ::protobuf::SingularField<::std::string::String>,
signature: ::protobuf::SingularField<::std::vec::Vec<u8>>,
timeReceived: ::protobuf::SingularField<::std::string::String>,
// special fields
unknown_fields: ::protobuf::UnknownFields,
cached_size: ::protobuf::CachedSize,
}
impl Record {
pub fn new() -> Record {
::std::default::Default::default()
}
// optional string key = 1;
pub fn clear_key(&mut self) {
self.key.clear();
}
pub fn has_key(&self) -> bool {
self.key.is_some()
}
// Param is passed by value, moved
pub fn set_key(&mut self, v: ::std::string::String) {
self.key = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_key(&mut self) -> &mut ::std::string::String {
if self.key.is_none() {
self.key.set_default();
}
self.key.as_mut().unwrap()
}
// Take field
pub fn take_key(&mut self) -> ::std::string::String {
self.key.take().unwrap_or_else(|| ::std::string::String::new())
}
pub fn get_key(&self) -> &str {
match self.key.as_ref() {
Some(v) => &v,
None => "",
}
}
// optional bytes value = 2;
pub fn clear_value(&mut self) {
self.value.clear();
}
pub fn has_value(&self) -> bool {
self.value.is_some()
}
// Param is passed by value, moved
pub fn set_value(&mut self, v: ::std::vec::Vec<u8>) {
self.value = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_value(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.value.is_none() {
self.value.set_default();
}
self.value.as_mut().unwrap()
}
// Take field
pub fn take_value(&mut self) -> ::std::vec::Vec<u8> {
self.value.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
pub fn get_value(&self) -> &[u8] {
match self.value.as_ref() {
Some(v) => &v,
None => &[],
}
}
// optional string author = 3;
pub fn clear_author(&mut self) {
self.author.clear();
}
pub fn has_author(&self) -> bool {
self.author.is_some()
}
// Param is passed by value, moved
pub fn set_author(&mut self, v: ::std::string::String) {
self.author = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_author(&mut self) -> &mut ::std::string::String {
if self.author.is_none() {
self.author.set_default();
}
self.author.as_mut().unwrap()
}
// Take field
pub fn take_author(&mut self) -> ::std::string::String {
self.author.take().unwrap_or_else(|| ::std::string::String::new())
}
pub fn get_author(&self) -> &str {
match self.author.as_ref() {
Some(v) => &v,
None => "",
}
}
// optional bytes signature = 4;
pub fn clear_signature(&mut self) {
self.signature.clear();
}
pub fn has_signature(&self) -> bool {
self.signature.is_some()
}
// Param is passed by value, moved
pub fn set_signature(&mut self, v: ::std::vec::Vec<u8>) {
self.signature = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_signature(&mut self) -> &mut ::std::vec::Vec<u8> {
if self.signature.is_none() {
self.signature.set_default();
}
self.signature.as_mut().unwrap()
}
// Take field
pub fn take_signature(&mut self) -> ::std::vec::Vec<u8> {
self.signature.take().unwrap_or_else(|| ::std::vec::Vec::new())
}
pub fn get_signature(&self) -> &[u8] {
match self.signature.as_ref() {
Some(v) => &v,
None => &[],
}
}
// optional string timeReceived = 5;
pub fn clear_timeReceived(&mut self) {
self.timeReceived.clear();
}
pub fn has_timeReceived(&self) -> bool {
self.timeReceived.is_some()
}
// Param is passed by value, moved
pub fn set_timeReceived(&mut self, v: ::std::string::String) {
self.timeReceived = ::protobuf::SingularField::some(v);
}
// Mutable pointer to the field.
// If field is not initialized, it is initialized with default value first.
pub fn mut_timeReceived(&mut self) -> &mut ::std::string::String {
if self.timeReceived.is_none() {
self.timeReceived.set_default();
}
self.timeReceived.as_mut().unwrap()
}
// Take field
pub fn take_timeReceived(&mut self) -> ::std::string::String {
self.timeReceived.take().unwrap_or_else(|| ::std::string::String::new())
}
pub fn get_timeReceived(&self) -> &str {
match self.timeReceived.as_ref() {
Some(v) => &v,
None => "",
}
}
}
impl ::protobuf::Message for Record {
fn is_initialized(&self) -> bool {
true
}
fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> {
while !is.eof()? {
let (field_number, wire_type) = is.read_tag_unpack()?;
match field_number {
1 => {
::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.key)?;
},
2 => {
::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.value)?;
},
3 => {
::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.author)?;
},
4 => {
::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.signature)?;
},
5 => {
::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.timeReceived)?;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
};
}
::std::result::Result::Ok(())
}
// Compute sizes of nested messages
#[allow(unused_variables)]
fn compute_size(&self) -> u32 {
let mut my_size = 0;
if let Some(ref v) = self.key.as_ref() {
my_size += ::protobuf::rt::string_size(1, &v);
}
if let Some(ref v) = self.value.as_ref() {
my_size += ::protobuf::rt::bytes_size(2, &v);
}
if let Some(ref v) = self.author.as_ref() {
my_size += ::protobuf::rt::string_size(3, &v);
}
if let Some(ref v) = self.signature.as_ref() {
my_size += ::protobuf::rt::bytes_size(4, &v);
}
if let Some(ref v) = self.timeReceived.as_ref() {
my_size += ::protobuf::rt::string_size(5, &v);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
}
fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> {
if let Some(ref v) = self.key.as_ref() {
os.write_string(1, &v)?;
}
if let Some(ref v) = self.value.as_ref() {
os.write_bytes(2, &v)?;
}
if let Some(ref v) = self.author.as_ref() {
os.write_string(3, &v)?;
}
if let Some(ref v) = self.signature.as_ref() {
os.write_bytes(4, &v)?;
}
if let Some(ref v) = self.timeReceived.as_ref() {
os.write_string(5, &v)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
fn get_cached_size(&self) -> u32 {
self.cached_size.get()
}
fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
&self.unknown_fields
}
fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
&mut self.unknown_fields
}
fn as_any(&self) -> &::std::any::Any {
self as &::std::any::Any
}
fn as_any_mut(&mut self) -> &mut ::std::any::Any {
self as &mut ::std::any::Any
}
fn into_any(self: Box<Self>) -> ::std::boxed::Box<::std::any::Any> {
self
}
fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
Self::descriptor_static()
}
fn new() -> Record {
Record::new()
}
fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::reflect::MessageDescriptor,
};
unsafe {
descriptor.get(|| {
let mut fields = ::std::vec::Vec::new();
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"key",
|m: &Record| { &m.key },
|m: &mut Record| { &mut m.key },
));
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"value",
|m: &Record| { &m.value },
|m: &mut Record| { &mut m.value },
));
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"author",
|m: &Record| { &m.author },
|m: &mut Record| { &mut m.author },
));
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
"signature",
|m: &Record| { &m.signature },
|m: &mut Record| { &mut m.signature },
));
fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
"timeReceived",
|m: &Record| { &m.timeReceived },
|m: &mut Record| { &mut m.timeReceived },
));
::protobuf::reflect::MessageDescriptor::new::<Record>(
"Record",
fields,
file_descriptor_proto()
)
})
}
}
fn default_instance() -> &'static Record {
static mut instance: ::protobuf::lazy::Lazy<Record> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const Record,
};
unsafe {
instance.get(Record::new)
}
}
}
impl ::protobuf::Clear for Record {
fn clear(&mut self) {
self.clear_key();
self.clear_value();
self.clear_author();
self.clear_signature();
self.clear_timeReceived();
self.unknown_fields.clear();
}
}
impl ::std::fmt::Debug for Record {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
::protobuf::text_format::fmt(self, f)
}
}
impl ::protobuf::reflect::ProtobufValue for Record {
fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef {
::protobuf::reflect::ProtobufValueRef::Message(self)
}
}
static file_descriptor_proto_data: &'static [u8] = b"\
\n\x0crecord.proto\x12\trecord.pb\"\x8a\x01\n\x06Record\x12\x10\n\x03key\
\x18\x01\x20\x01(\tR\x03key\x12\x14\n\x05value\x18\x02\x20\x01(\x0cR\x05\
value\x12\x16\n\x06author\x18\x03\x20\x01(\tR\x06author\x12\x1c\n\tsigna\
ture\x18\x04\x20\x01(\x0cR\tsignature\x12\"\n\x0ctimeReceived\x18\x05\
\x20\x01(\tR\x0ctimeReceivedJ\xac\x05\n\x06\x12\x04\0\0\x14\x01\n\x08\n\
\x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\x08\x11\nX\n\x02\
\x04\0\x12\x04\x05\0\x14\x01\x1aL\x20Record\x20represents\x20a\x20dht\
\x20record\x20that\x20contains\x20a\x20value\n\x20for\x20a\x20key\x20val\
ue\x20pair\n\n\n\n\x03\x04\0\x01\x12\x03\x05\x08\x0e\n2\n\x04\x04\0\x02\
\0\x12\x03\x07\x08\x20\x1a%\x20The\x20key\x20that\x20references\x20this\
\x20record\n\n\x0c\n\x05\x04\0\x02\0\x04\x12\x03\x07\x08\x10\n\x0c\n\x05\
\x04\0\x02\0\x05\x12\x03\x07\x11\x17\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\
\x07\x18\x1b\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x07\x1e\x1f\n6\n\x04\
\x04\0\x02\x01\x12\x03\n\x08!\x1a)\x20The\x20actual\x20value\x20this\x20\
record\x20is\x20storing\n\n\x0c\n\x05\x04\0\x02\x01\x04\x12\x03\n\x08\
\x10\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\n\x11\x16\n\x0c\n\x05\x04\0\
\x02\x01\x01\x12\x03\n\x17\x1c\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\n\
\x1f\x20\n-\n\x04\x04\0\x02\x02\x12\x03\r\x08#\x1a\x20\x20hash\x20of\x20\
the\x20authors\x20public\x20key\n\n\x0c\n\x05\x04\0\x02\x02\x04\x12\x03\
\r\x08\x10\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\r\x11\x17\n\x0c\n\x05\
\x04\0\x02\x02\x01\x12\x03\r\x18\x1e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\
\x03\r!\"\n7\n\x04\x04\0\x02\x03\x12\x03\x10\x08%\x1a*\x20A\x20PKI\x20si\
gnature\x20for\x20the\x20key+value+author\n\n\x0c\n\x05\x04\0\x02\x03\
\x04\x12\x03\x10\x08\x10\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x10\x11\
\x16\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x10\x17\x20\n\x0c\n\x05\x04\0\
\x02\x03\x03\x12\x03\x10#$\n<\n\x04\x04\0\x02\x04\x12\x03\x13\x08)\x1a/\
\x20Time\x20the\x20record\x20was\x20received,\x20set\x20by\x20receiver\n\
\n\x0c\n\x05\x04\0\x02\x04\x04\x12\x03\x13\x08\x10\n\x0c\n\x05\x04\0\x02\
\x04\x05\x12\x03\x13\x11\x17\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x13\
\x18$\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x13'(\
";
static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy {
lock: ::protobuf::lazy::ONCE_INIT,
ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto,
};
fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto {
::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap()
}
pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto {
unsafe {
file_descriptor_proto_lazy.get(|| {
parse_descriptor_proto()
})
}
}

View File

@ -0,0 +1,381 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Provides the `KadMsg` enum of all the possible messages transmitted with the Kademlia protocol,
//! and the `KademliaProtocolConfig` connection upgrade whose output is a
//! `Stream<Item = KadMsg> + Sink<SinkItem = KadMsg>`.
//!
//! The `Stream` component is used to poll the underlying transport, and the `Sink` component is
//! used to send messages.
use bytes::{Bytes, BytesMut};
use futures::{future, sink, Sink, stream, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr, PeerId};
use protobuf::{self, Message};
use protobuf_structs;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec;
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub enum KadConnectionType {
/// Sender hasn't tried to connect to peer.
NotConnected = 0,
/// Sender is currently connected to peer.
Connected = 1,
/// Sender was recently connected to peer.
CanConnect = 2,
/// Sender tried to connect to peer but failed.
CannotConnect = 3,
}
impl From<protobuf_structs::dht::Message_ConnectionType> for KadConnectionType {
#[inline]
fn from(raw: protobuf_structs::dht::Message_ConnectionType) -> KadConnectionType {
use protobuf_structs::dht::Message_ConnectionType::*;
match raw {
NOT_CONNECTED => KadConnectionType::NotConnected,
CONNECTED => KadConnectionType::Connected,
CAN_CONNECT => KadConnectionType::CanConnect,
CANNOT_CONNECT => KadConnectionType::CannotConnect,
}
}
}
impl Into<protobuf_structs::dht::Message_ConnectionType> for KadConnectionType {
#[inline]
fn into(self) -> protobuf_structs::dht::Message_ConnectionType {
use protobuf_structs::dht::Message_ConnectionType::*;
match self {
KadConnectionType::NotConnected => NOT_CONNECTED,
KadConnectionType::Connected => CONNECTED,
KadConnectionType::CanConnect => CAN_CONNECT,
KadConnectionType::CannotConnect => CANNOT_CONNECT,
}
}
}
/// Information about a peer, as known by the sender.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KadPeer {
pub node_id: PeerId,
/// The multiaddresses that are known for that peer.
pub multiaddrs: Vec<Multiaddr>,
pub connection_ty: KadConnectionType,
}
impl KadPeer {
// Builds a `KadPeer` from its raw protobuf equivalent.
// TODO: use TryFrom once stable
fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result<KadPeer, IoError> {
// TODO: this is in fact a CID ; not sure if this should be handled in `from_bytes` or
// as a special case here
let node_id = PeerId::from_bytes(peer.get_id().to_vec())
.map_err(|_| IoError::new(IoErrorKind::InvalidData, "invalid peer id"))?;
let mut addrs = Vec::with_capacity(peer.get_addrs().len());
for addr in peer.take_addrs().into_iter() {
let as_ma = Multiaddr::from_bytes(addr)
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?;
addrs.push(as_ma);
}
debug_assert_eq!(addrs.len(), addrs.capacity());
let connection_ty = peer.get_connection().into();
Ok(KadPeer {
node_id: node_id,
multiaddrs: addrs,
connection_ty: connection_ty,
})
}
}
impl Into<protobuf_structs::dht::Message_Peer> for KadPeer {
fn into(self) -> protobuf_structs::dht::Message_Peer {
let mut out = protobuf_structs::dht::Message_Peer::new();
out.set_id(self.node_id.into_bytes());
for addr in self.multiaddrs {
out.mut_addrs().push(addr.into_bytes());
}
out.set_connection(self.connection_ty.into());
out
}
}
/// Configuration for a Kademlia connection upgrade. When applied to a connection, turns this
/// connection into a `Stream + Sink` whose items are of type `KadMsg`.
#[derive(Debug, Default, Copy, Clone)]
pub struct KademliaProtocolConfig;
impl<C, Maf> ConnectionUpgrade<C, Maf> for KademliaProtocolConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
{
type Output = KadStreamSink<C>;
type MultiaddrFuture = Maf;
type Future = future::FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once(("/ipfs/kad/1.0.0".into(), ()))
}
#[inline]
fn upgrade(self, incoming: C, _: (), _: Endpoint, addr: Maf) -> Self::Future {
future::ok((kademlia_protocol(incoming), addr))
}
}
type KadStreamSink<S> = stream::AndThen<sink::With<stream::FromErr<Framed<S, codec::UviBytes<Vec<u8>>>, IoError>, KadMsg, fn(KadMsg) -> Result<Vec<u8>, IoError>, Result<Vec<u8>, IoError>>, fn(BytesMut) -> Result<KadMsg, IoError>, Result<KadMsg, IoError>>;
// Upgrades a socket to use the Kademlia protocol.
fn kademlia_protocol<S>(
socket: S,
) -> KadStreamSink<S>
where
S: AsyncRead + AsyncWrite,
{
Framed::new(socket, codec::UviBytes::default())
.from_err::<IoError>()
.with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> {
let proto_struct = msg_to_proto(request);
Ok(proto_struct.write_to_bytes().unwrap()) // TODO: error?
})
.and_then::<fn(_) -> _, _>(|bytes| {
let response = protobuf::parse_from_bytes(&bytes)?;
proto_to_msg(response)
})
}
/// Message that we can send to a peer or received from a peer.
// TODO: document the rest
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KadMsg {
/// Ping request or response.
Ping,
/// Target must save the given record, can be queried later with `GetValueReq`.
PutValue {
/// Identifier of the record.
key: Vec<u8>,
/// The record itself.
record: (), //record: protobuf_structs::record::Record, // TODO: no
},
GetValueReq {
/// Identifier of the record.
key: Vec<u8>,
},
GetValueRes {
/// Identifier of the returned record.
key: Vec<u8>,
record: (), //record: Option<protobuf_structs::record::Record>, // TODO: no
closer_peers: Vec<KadPeer>,
},
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
/// returned is not specified, but should be around 20.
FindNodeReq {
/// Identifier of the node.
key: Vec<u8>,
},
/// Response to a `FindNodeReq`.
FindNodeRes {
/// Results of the request.
closer_peers: Vec<KadPeer>,
},
}
// Turns a type-safe kadmelia message into the corresponding row protobuf message.
fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message {
match kad_msg {
KadMsg::Ping => {
let mut msg = protobuf_structs::dht::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::PING);
msg
}
KadMsg::PutValue { key, .. } => {
let mut msg = protobuf_structs::dht::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::PUT_VALUE);
msg.set_key(key);
//msg.set_record(record); // TODO:
msg
}
KadMsg::GetValueReq { key } => {
let mut msg = protobuf_structs::dht::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_VALUE);
msg.set_key(key);
msg.set_clusterLevelRaw(10);
msg
}
KadMsg::GetValueRes { .. } => unimplemented!(), // TODO:
KadMsg::FindNodeReq { key } => {
let mut msg = protobuf_structs::dht::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE);
msg.set_key(key);
msg.set_clusterLevelRaw(10);
msg
}
KadMsg::FindNodeRes { closer_peers } => {
// TODO: if empty, the remote will think it's a request
// TODO: not good, possibly exposed in the API
assert!(!closer_peers.is_empty());
let mut msg = protobuf_structs::dht::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE);
msg.set_clusterLevelRaw(9);
for peer in closer_peers {
msg.mut_closerPeers().push(peer.into());
}
msg
}
}
}
/// Turns a raw Kademlia message into a type-safe message.
fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result<KadMsg, IoError> {
match message.get_field_type() {
protobuf_structs::dht::Message_MessageType::PING => Ok(KadMsg::Ping),
protobuf_structs::dht::Message_MessageType::PUT_VALUE => {
let key = message.take_key();
let _record = message.take_record();
Ok(KadMsg::PutValue {
key: key,
record: (),
})
}
protobuf_structs::dht::Message_MessageType::GET_VALUE => {
let key = message.take_key();
Ok(KadMsg::GetValueReq { key: key })
}
protobuf_structs::dht::Message_MessageType::FIND_NODE => {
if message.get_closerPeers().is_empty() {
Ok(KadMsg::FindNodeReq {
key: message.take_key(),
})
} else {
// TODO: for now we don't parse the peer properly, so it is possible that we get
// parsing errors for peers even when they are valid ; we ignore these
// errors for now, but ultimately we should just error altogether
let closer_peers = message.mut_closerPeers()
.iter_mut()
.filter_map(|peer| KadPeer::from_peer(peer).ok())
.collect::<Vec<_>>();
Ok(KadMsg::FindNodeRes {
closer_peers,
})
}
}
protobuf_structs::dht::Message_MessageType::GET_PROVIDERS
| protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => {
// These messages don't seem to be used in the protocol in practice, so if we receive
// them we suppose that it's a mistake in the protocol usage.
Err(IoError::new(
IoErrorKind::InvalidData,
"received an unsupported kad message type",
))
}
}
}
#[cfg(test)]
mod tests {
extern crate libp2p_tcp_transport;
extern crate tokio_current_thread;
use self::libp2p_tcp_transport::TcpConfig;
use futures::{Future, Sink, Stream};
use libp2p_core::{Transport, PeerId, PublicKey};
use protocol::{KadConnectionType, KadMsg, KademliaProtocolConfig, KadPeer};
use std::sync::mpsc;
use std::thread;
#[test]
fn correct_transfer() {
// We open a server and a client, send a message between the two, and check that they were
// successfully received.
test_one(KadMsg::Ping);
test_one(KadMsg::PutValue {
key: vec![1, 2, 3, 4],
record: (),
});
test_one(KadMsg::GetValueReq {
key: vec![10, 11, 12],
});
test_one(KadMsg::FindNodeReq {
key: vec![9, 12, 0, 245, 245, 201, 28, 95],
});
test_one(KadMsg::FindNodeRes {
closer_peers: vec![
KadPeer {
node_id: PeerId::from_public_key(PublicKey::Rsa(vec![93, 80, 12, 250])),
multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
connection_ty: KadConnectionType::Connected,
},
],
});
// TODO: all messages
fn test_one(msg_server: KadMsg) {
let msg_client = msg_server.clone();
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let transport = TcpConfig::new().with_upgrade(KademliaProtocolConfig);
let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
tx.send(addr).unwrap();
let future = listener
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().map(|v| v.0))
.and_then(|proto| proto.into_future().map_err(|(err, _)| err).map(|(v, _)| v))
.map(|recv_msg| {
assert_eq!(recv_msg.unwrap(), msg_server);
()
});
let _ = tokio_current_thread::block_on_all(future).unwrap();
});
let transport = TcpConfig::new().with_upgrade(KademliaProtocolConfig);
let future = transport
.dial(rx.recv().unwrap())
.unwrap_or_else(|_| panic!())
.and_then(|proto| proto.0.send(msg_client))
.map(|_| ());
let _ = tokio_current_thread::block_on_all(future).unwrap();
bg_thread.join().unwrap();
}
}
}

365
protocols/kad/src/query.rs Normal file
View File

@ -0,0 +1,365 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! This module handles performing iterative queries about the network.
use fnv::FnvHashSet;
use futures::{future, Future, stream, Stream};
use kbucket::KBucketsPeerId;
use libp2p_core::PeerId;
use multiaddr::{AddrComponent, Multiaddr};
use protocol;
use rand;
use smallvec::SmallVec;
use std::cmp::Ordering;
use std::io::Error as IoError;
use std::mem;
/// Parameters of a query. Allows plugging the query-related code with the rest of the
/// infrastructure.
pub struct QueryParams<FBuckets, FFindNode> {
/// Identifier of the local peer.
pub local_id: PeerId,
/// Called whenever we need to obtain the peers closest to a certain peer.
pub kbuckets_find_closest: FBuckets,
/// Level of parallelism for networking. If this is `N`, then we can dial `N` nodes at a time.
pub parallelism: usize,
/// Called whenever we want to send a `FIND_NODE` RPC query.
pub find_node: FFindNode,
}
/// Event that happens during a query.
#[derive(Debug, Clone)]
pub enum QueryEvent<TOut> {
/// Learned about new mutiaddresses for the given peers.
NewKnownMultiaddrs(Vec<(PeerId, Vec<Multiaddr>)>),
/// Finished the processing of the query. Contains the result.
Finished(TOut),
}
/// Starts a query for an iterative `FIND_NODE` request.
#[inline]
pub fn find_node<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
searched_key: PeerId,
) -> Box<Stream<Item = QueryEvent<Vec<PeerId>>, Error = IoError> + 'a>
where
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
{
query(query_params, searched_key, 20) // TODO: constant
}
/// Refreshes a specific bucket by performing an iterative `FIND_NODE` on a random ID of this
/// bucket.
///
/// Returns a dummy no-op future if `bucket_num` is out of range.
pub fn refresh<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
bucket_num: usize,
) -> Box<Stream<Item = QueryEvent<()>, Error = IoError> + 'a>
where
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
{
let peer_id = match gen_random_id(&query_params.local_id, bucket_num) {
Ok(p) => p,
Err(()) => return Box::new(stream::once(Ok(QueryEvent::Finished(())))),
};
let stream = find_node(query_params, peer_id).map(|event| {
match event {
QueryEvent::NewKnownMultiaddrs(peers) => QueryEvent::NewKnownMultiaddrs(peers),
QueryEvent::Finished(_) => QueryEvent::Finished(()),
}
});
Box::new(stream) as Box<_>
}
// Generates a random `PeerId` that belongs to the given bucket.
//
// Returns an error if `bucket_num` is out of range.
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
let my_id_len = my_id.as_bytes().len();
// TODO: this 2 is magic here ; it is the length of the hash of the multihash
let bits_diff = bucket_num + 1;
if bits_diff > 8 * (my_id_len - 2) {
return Err(());
}
let mut random_id = [0; 64];
for byte in 0..my_id_len {
match byte.cmp(&(my_id_len - bits_diff / 8 - 1)) {
Ordering::Less => {
random_id[byte] = my_id.as_bytes()[byte];
}
Ordering::Equal => {
let mask: u8 = (1 << (bits_diff % 8)) - 1;
random_id[byte] = (my_id.as_bytes()[byte] & !mask) | (rand::random::<u8>() & mask);
}
Ordering::Greater => {
random_id[byte] = rand::random();
}
}
}
let peer_id = PeerId::from_bytes(random_id[..my_id_len].to_owned())
.expect("randomly-generated peer ID should always be valid");
Ok(peer_id)
}
// Generic query-performing function.
fn query<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
searched_key: PeerId,
num_results: usize,
) -> Box<Stream<Item = QueryEvent<Vec<PeerId>>, Error = IoError> + 'a>
where
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
{
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
// State of the current iterative process.
struct State<'a> {
// At which stage we are.
stage: Stage,
// Final output of the iteration.
result: Vec<PeerId>,
// For each open connection, a future with the response of the remote.
// Note that don't use a `SmallVec` here because `select_all` produces a `Vec`.
current_attempts_fut: Vec<Box<Future<Item = Vec<protocol::Peer>, Error = IoError> + 'a>>,
// For each open connection, the peer ID that we are connected to.
// Must always have the same length as `current_attempts_fut`.
current_attempts_addrs: SmallVec<[PeerId; 32]>,
// Nodes that need to be attempted.
pending_nodes: Vec<PeerId>,
// Peers that we tried to contact but failed.
failed_to_contact: FnvHashSet<PeerId>,
}
// General stage of the state.
#[derive(Copy, Clone, PartialEq, Eq)]
enum Stage {
// We are still in the first step of the algorithm where we try to find the closest node.
FirstStep,
// We are contacting the k closest nodes in order to fill the list with enough results.
SecondStep,
// The results are complete, and the next stream iteration will produce the outcome.
FinishingNextIter,
// We are finished and the stream shouldn't return anything anymore.
Finished,
}
let initial_state = State {
stage: Stage::FirstStep,
result: Vec::with_capacity(num_results),
current_attempts_fut: Vec::new(),
current_attempts_addrs: SmallVec::new(),
pending_nodes: {
let kbuckets_find_closest = query_params.kbuckets_find_closest.clone();
kbuckets_find_closest(searched_key.clone()) // TODO: suboptimal
},
failed_to_contact: Default::default(),
};
let parallelism = query_params.parallelism;
// Start of the iterative process.
let stream = stream::unfold(initial_state, move |mut state| -> Option<_> {
match state.stage {
Stage::FinishingNextIter => {
let result = mem::replace(&mut state.result, Vec::new());
debug!("Query finished with {} results", result.len());
state.stage = Stage::Finished;
let future = future::ok((Some(QueryEvent::Finished(result)), state));
return Some(future::Either::A(future));
},
Stage::Finished => {
return None;
},
_ => ()
};
let searched_key = searched_key.clone();
let find_node_rpc = query_params.find_node.clone();
// Find out which nodes to contact at this iteration.
let to_contact = {
let wanted_len = if state.stage == Stage::FirstStep {
parallelism.saturating_sub(state.current_attempts_fut.len())
} else {
num_results.saturating_sub(state.current_attempts_fut.len())
};
let mut to_contact = SmallVec::<[_; 16]>::new();
while to_contact.len() < wanted_len && !state.pending_nodes.is_empty() {
// Move the first element of `pending_nodes` to `to_contact`, but ignore nodes that
// are already part of the results or of a current attempt or if we failed to
// contact it before.
let peer = state.pending_nodes.remove(0);
if state.result.iter().any(|p| p == &peer) {
continue;
}
if state.current_attempts_addrs.iter().any(|p| p == &peer) {
continue;
}
if state.failed_to_contact.iter().any(|p| p == &peer) {
continue;
}
to_contact.push(peer);
}
to_contact
};
debug!("New query round ; {} queries in progress ; contacting {} new peers",
state.current_attempts_fut.len(),
to_contact.len());
// For each node in `to_contact`, start an RPC query and a corresponding entry in the two
// `state.current_attempts_*` fields.
for peer in to_contact {
let multiaddr: Multiaddr = AddrComponent::P2P(peer.clone().into_bytes()).into();
let searched_key2 = searched_key.clone();
let current_attempt = find_node_rpc(multiaddr.clone(), searched_key2); // TODO: suboptimal
state.current_attempts_addrs.push(peer.clone());
state
.current_attempts_fut
.push(Box::new(current_attempt) as Box<_>);
}
debug_assert_eq!(
state.current_attempts_addrs.len(),
state.current_attempts_fut.len()
);
// Extract `current_attempts_fut` so that we can pass it to `select_all`. We will push the
// values back when inside the loop.
let current_attempts_fut = mem::replace(&mut state.current_attempts_fut, Vec::new());
if current_attempts_fut.is_empty() {
// If `current_attempts_fut` is empty, then `select_all` would panic. It happens
// when we have no additional node to query.
debug!("Finishing query early because no additional node available");
state.stage = Stage::FinishingNextIter;
let future = future::ok((None, state));
return Some(future::Either::A(future));
}
// This is the future that continues or breaks the `loop_fn`.
let future = future::select_all(current_attempts_fut.into_iter()).then(move |result| {
let (message, trigger_idx, other_current_attempts) = match result {
Err((err, trigger_idx, other_current_attempts)) => {
(Err(err), trigger_idx, other_current_attempts)
}
Ok((message, trigger_idx, other_current_attempts)) => {
(Ok(message), trigger_idx, other_current_attempts)
}
};
// Putting back the extracted elements in `state`.
let remote_id = state.current_attempts_addrs.remove(trigger_idx);
debug_assert!(state.current_attempts_fut.is_empty());
state.current_attempts_fut = other_current_attempts;
// `message` contains the reason why the current future was woken up.
let closer_peers = match message {
Ok(msg) => msg,
Err(err) => {
trace!("RPC query failed for {:?}: {:?}", remote_id, err);
state.failed_to_contact.insert(remote_id);
return future::ok((None, state));
}
};
// Inserting the node we received a response from into `state.result`.
// The code is non-trivial because `state.result` is ordered by distance and is limited
// by `num_results` elements.
if let Some(insert_pos) = state.result.iter().position(|e| {
e.distance_with(&searched_key) >= remote_id.distance_with(&searched_key)
}) {
if state.result[insert_pos] != remote_id {
if state.result.len() >= num_results {
state.result.pop();
}
state.result.insert(insert_pos, remote_id);
}
} else if state.result.len() < num_results {
state.result.push(remote_id);
}
// The loop below will set this variable to `true` if we find a new element to put at
// the top of the result. This would mean that we have to continue looping.
let mut local_nearest_node_updated = false;
// Update `state` with the actual content of the message.
let mut new_known_multiaddrs = Vec::with_capacity(closer_peers.len());
for mut peer in closer_peers {
// Update the peerstore with the information sent by
// the remote.
{
let multiaddrs = mem::replace(&mut peer.multiaddrs, Vec::new());
trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, multiaddrs);
new_known_multiaddrs.push((peer.node_id.clone(), multiaddrs));
}
if peer.node_id.distance_with(&searched_key)
<= state.result[0].distance_with(&searched_key)
{
local_nearest_node_updated = true;
}
if state.result.iter().any(|ma| ma == &peer.node_id) {
continue;
}
// Insert the node into `pending_nodes` at the right position, or do not
// insert it if it is already in there.
if let Some(insert_pos) = state.pending_nodes.iter().position(|e| {
e.distance_with(&searched_key) >= peer.node_id.distance_with(&searched_key)
}) {
if state.pending_nodes[insert_pos] != peer.node_id {
state.pending_nodes.insert(insert_pos, peer.node_id.clone());
}
} else {
state.pending_nodes.push(peer.node_id.clone());
}
}
if state.result.len() >= num_results
|| (state.stage != Stage::FirstStep && state.current_attempts_fut.is_empty())
{
state.stage = Stage::FinishingNextIter;
} else {
if !local_nearest_node_updated {
trace!("Loop didn't update closer node ; jumping to step 2");
state.stage = Stage::SecondStep;
}
}
future::ok((Some(QueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state))
});
Some(future::Either::B(future))
}).filter_map(|val| val);
Box::new(stream) as Box<_>
}