Rework the Kademlia high-level system (#282)

* Kademlia high-level rework

* Some changes in the rework

* Some additional tweaks to kad rework

* Add update_kbuckets

* Rename a bunch of Kademlia stuff

* Add KadSystem::local_peer_id

* Some documentation update

* Concern

* Make the example compile

* Make things nicer

* Fix bug in UniqueConnec

* Add clear() to UniqueConnec

* Add UniqueConnec::poll

* Fix potential deadlock in UniqueConnec

* Add UniqueConnec::state()

* The future of get now contains a Weak

* Fix concerns
This commit is contained in:
Pierre Krieger
2018-07-17 15:51:11 +02:00
committed by GitHub
parent 6bda589389
commit 053197bd1c
9 changed files with 907 additions and 586 deletions

View File

@ -223,6 +223,7 @@ mod connection_reuse;
mod keys_proto;
mod peer_id;
mod public_key;
mod unique;
pub mod either;
pub mod muxing;
@ -237,4 +238,5 @@ pub use self::peer_id::PeerId;
pub use self::public_key::PublicKey;
pub use self::swarm::{swarm, SwarmController, SwarmFuture};
pub use self::transport::{MuxedTransport, Transport};
pub use self::unique::{UniqueConnec, UniqueConnecFuture, UniqueConnecState};
pub use self::upgrade::{ConnectionUpgrade, Endpoint};

333
core/src/unique.rs Normal file
View File

@ -0,0 +1,333 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{future, sync::oneshot, task, Async, Future, Poll, IntoFuture};
use parking_lot::Mutex;
use {Multiaddr, MuxedTransport, SwarmController, Transport};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use std::sync::{Arc, Weak};
/// Storage for a unique connection with a remote.
pub struct UniqueConnec<T> {
inner: Arc<Mutex<UniqueConnecInner<T>>>,
}
enum UniqueConnecInner<T> {
/// The `UniqueConnec` was created, but nothing is in it.
Empty,
/// We started dialing, but no response has been obtained so far.
Pending {
/// Tasks that need to be awakened when the content of this object is set.
tasks_waiting: Vec<task::Task>,
/// Future that represents when `set_until` should have been called.
// TODO: Send + Sync bound is meh
dial_fut: Box<Future<Item = (), Error = IoError> + Send + Sync>,
},
/// The value of this unique connec has been set.
/// Can only transition to `Empty` when the future has expired.
Full {
/// Content of the object.
value: T,
/// Sender to trigger if the content gets cleared.
on_clear: oneshot::Sender<()>,
},
/// The `dial_fut` has errored.
Errored(IoError),
}
impl<T> UniqueConnec<T> {
/// Builds a new empty `UniqueConnec`.
#[inline]
pub fn empty() -> Self {
UniqueConnec {
inner: Arc::new(Mutex::new(UniqueConnecInner::Empty)),
}
}
/// Builds a new `UniqueConnec` that contains a value.
#[inline]
pub fn with_value(value: T) -> Self {
let (on_clear, _) = oneshot::channel();
UniqueConnec {
inner: Arc::new(Mutex::new(UniqueConnecInner::Full { value, on_clear })),
}
}
/// Instantly returns the value from the object if there is any.
pub fn poll(&self) -> Option<T>
where T: Clone,
{
let inner = self.inner.lock();
if let UniqueConnecInner::Full { ref value, .. } = &*inner {
Some(value.clone())
} else {
None
}
}
/// Loads the value from the object.
///
/// If the object is empty, dials the given multiaddress with the given transport.
///
/// The closure of the `swarm` is expected to call `set_until()` on the `UniqueConnec`. Failure
/// to do so will make the `UniqueConnecFuture` produce an error.
pub fn get_or_dial<S, Du>(&self, swarm: &SwarmController<S>, multiaddr: &Multiaddr,
transport: Du) -> UniqueConnecFuture<T>
where T: Clone,
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<S::Output>,
S: Clone + MuxedTransport,
{
self.get(|| {
swarm.dial(multiaddr.clone(), transport)
.map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported"))
.into_future()
.flatten()
})
}
/// Loads the value from the object.
///
/// If the object is empty, calls the closure. The closure should return a future that
/// should be signaled after `set_until` has been called. If the future produces an error,
/// then the object will empty itself again and the `UniqueConnecFuture` will return an error.
/// If the future is finished and `set_until` hasn't been called, then the `UniqueConnecFuture`
/// will return an error.
pub fn get<F, Fut>(&self, or: F) -> UniqueConnecFuture<T>
where F: FnOnce() -> Fut,
T: Clone,
Fut: IntoFuture<Item = (), Error = IoError>,
Fut::Future: Send + Sync + 'static, // TODO: 'static :-/
{
match &*self.inner.lock() {
UniqueConnecInner::Empty => (),
_ => return UniqueConnecFuture { inner: Arc::downgrade(&self.inner) },
};
// The mutex is unlocked when we call `or`, in order to avoid potential deadlocks.
let dial_fut = or().into_future();
let mut inner = self.inner.lock();
// Since we unlocked the mutex, it's possible that the object was filled in the meanwhile.
// Therefore we check again whether it's still `Empty`.
if let UniqueConnecInner::Empty = &mut *inner {
*inner = UniqueConnecInner::Pending {
tasks_waiting: Vec::new(),
dial_fut: Box::new(dial_fut),
};
}
UniqueConnecFuture { inner: Arc::downgrade(&self.inner) }
}
/// Puts `value` inside the object. The second parameter is a future whose completion will
/// clear up the content. Returns an adjusted version of that same future.
///
/// If `clear()` is called, the returned future will automatically complete with an error.
///
/// Has no effect if the object already contains something.
pub fn set_until<F>(&self, value: T, until: F) -> impl Future<Item = (), Error = F::Error>
where F: Future<Item = ()>
{
let mut tasks_to_notify = Vec::new();
let mut inner = self.inner.lock();
let (on_clear, on_clear_rx) = oneshot::channel();
match mem::replace(&mut *inner, UniqueConnecInner::Full { value, on_clear }) {
UniqueConnecInner::Empty => {},
UniqueConnecInner::Errored(_) => {},
UniqueConnecInner::Pending { tasks_waiting, .. } => {
tasks_to_notify = tasks_waiting;
},
old @ UniqueConnecInner::Full { .. } => {
// Keep the old value.
*inner = old;
return future::Either::B(until);
},
};
drop(inner);
// The mutex is unlocked when we notify the pending tasks.
for task in tasks_to_notify {
task.notify();
}
let inner = self.inner.clone();
let fut = until
.select(on_clear_rx.then(|_| Ok(())))
.map(|((), _)| ())
.map_err(|(err, _)| err)
.then(move |val| {
*inner.lock() = UniqueConnecInner::Empty;
val
});
future::Either::A(fut)
}
/// Clears the content of the object.
///
/// Has no effect if the content is empty or pending.
/// If the node was full, calling `clear` will stop the future returned by `set_until`.
pub fn clear(&self) {
let mut inner = self.inner.lock();
match mem::replace(&mut *inner, UniqueConnecInner::Empty) {
UniqueConnecInner::Empty => {},
UniqueConnecInner::Errored(_) => {},
pending @ UniqueConnecInner::Pending { .. } => {
*inner = pending;
},
UniqueConnecInner::Full { on_clear, .. } => {
let _ = on_clear.send(());
},
};
}
/// Returns the state of the object.
///
/// Note that this can be racy, as the object can be used at the same time. In other words,
/// the returned value may no longer reflect the actual state.
pub fn state(&self) -> UniqueConnecState {
match *self.inner.lock() {
UniqueConnecInner::Empty => UniqueConnecState::Empty,
UniqueConnecInner::Errored(_) => UniqueConnecState::Errored,
UniqueConnecInner::Pending { .. } => UniqueConnecState::Pending,
UniqueConnecInner::Full { .. } => UniqueConnecState::Full,
}
}
}
impl<T> Clone for UniqueConnec<T> {
#[inline]
fn clone(&self) -> UniqueConnec<T> {
UniqueConnec {
inner: self.inner.clone(),
}
}
}
impl<T> Default for UniqueConnec<T> {
#[inline]
fn default() -> Self {
UniqueConnec::empty()
}
}
/// Future returned by `UniqueConnec::get()`.
pub struct UniqueConnecFuture<T> {
inner: Weak<Mutex<UniqueConnecInner<T>>>,
}
impl<T> Future for UniqueConnecFuture<T>
where T: Clone
{
type Item = T;
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = match self.inner.upgrade() {
Some(inner) => inner,
// All the `UniqueConnec` have been destroyed.
None => return Err(IoErrorKind::ConnectionAborted.into()),
};
let mut inner = inner.lock();
match mem::replace(&mut *inner, UniqueConnecInner::Empty) {
UniqueConnecInner::Empty => {
// This can happen if `set_until()` is called, and the future expires before the
// future returned by `get()` gets polled. This means that the connection has been
// closed.
Err(IoErrorKind::ConnectionAborted.into())
},
UniqueConnecInner::Pending { mut tasks_waiting, mut dial_fut } => {
match dial_fut.poll() {
Ok(Async::Ready(())) => {
// This happens if we successfully dialed a remote, but the callback
// doesn't call `set_until`. This can be a logic error by the user,
// but could also indicate that the user decided to filter out this
// connection for whatever reason.
*inner = UniqueConnecInner::Errored(IoErrorKind::ConnectionAborted.into());
Err(IoErrorKind::ConnectionAborted.into())
},
Ok(Async::NotReady) => {
tasks_waiting.push(task::current());
*inner = UniqueConnecInner::Pending { tasks_waiting, dial_fut };
Ok(Async::NotReady)
}
Err(err) => {
let tr = IoError::new(IoErrorKind::ConnectionAborted, err.to_string());
*inner = UniqueConnecInner::Errored(err);
Err(tr)
},
}
},
UniqueConnecInner::Full { value, on_clear } => {
*inner = UniqueConnecInner::Full {
value: value.clone(),
on_clear
};
Ok(Async::Ready(value))
},
UniqueConnecInner::Errored(err) => {
let tr = IoError::new(IoErrorKind::ConnectionAborted, err.to_string());
*inner = UniqueConnecInner::Errored(err);
Err(tr)
},
}
}
}
/// State of a `UniqueConnec`.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum UniqueConnecState {
/// The object is empty.
Empty,
/// `get_*` has been called and we are waiting for `set_until` to be called.
Pending,
/// `set_until` has been called.
Full,
/// The future returned by the closure of `get_*` has errored or has finished before
/// `set_until` has been called.
Errored,
}
#[cfg(test)]
mod tests {
use futures::{future, Future};
use transport::DeniedTransport;
use {UniqueConnec, UniqueConnecState};
use swarm;
#[test]
fn invalid_multiaddr_produces_error() {
let unique = UniqueConnec::empty();
assert_eq!(unique.state(), UniqueConnecState::Empty);
let unique2 = unique.clone();
let (swarm_ctrl, _swarm_fut) = swarm(DeniedTransport, |_, _| {
unique2.set_until((), future::empty())
});
let fut = unique.get_or_dial(&swarm_ctrl, &"/ip4/1.2.3.4".parse().unwrap(),
DeniedTransport);
assert!(fut.wait().is_err());
assert_eq!(unique.state(), UniqueConnecState::Errored);
}
// TODO: more tests
}

View File

@ -22,7 +22,7 @@ rand = "0.4.2"
smallvec = "0.5"
tokio-codec = "0.1"
tokio-io = "0.1"
tokio-timer = "0.1.2"
tokio-timer = "0.2"
varint = { path = "../varint-rs" }
[dev-dependencies]

View File

@ -18,132 +18,99 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! High-level structs/traits of the crate.
//!
//! Lies on top of the `kad_server` module.
use bytes::Bytes;
use fnv::FnvHashMap;
use futures::sync::oneshot;
use futures::{self, future, Future, Stream};
use kad_server::{KademliaServerConfig, KademliaServerController, KademliaIncomingRequest, KademliaFindNodeRespond};
use kbucket::{KBucketsPeerId, KBucketsTable, UpdateOutcome};
use libp2p_core::{ConnectionUpgrade, Endpoint, MuxedTransport, PeerId, SwarmController, Transport};
use multiaddr::{AddrComponent, Multiaddr};
use parking_lot::Mutex;
use protocol::Peer;
use query;
use std::collections::hash_map::Entry;
use std::fmt;
use fnv::FnvHashSet;
use futures::{future, Future, IntoFuture, stream, Stream};
use kad_server::KadConnecController;
use kbucket::{KBucketsTable, KBucketsPeerId};
use libp2p_core::PeerId;
use multiaddr::Multiaddr;
use protocol;
use rand;
use smallvec::SmallVec;
use std::cmp::Ordering;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use std::slice::Iter as SliceIter;
use std::sync::Arc;
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer;
use std::mem;
use std::time::{Duration, Instant};
use tokio_timer::Deadline;
/// Prototype for a future Kademlia protocol running on a socket.
#[derive(Debug, Clone)]
pub struct KademliaConfig {
pub struct KadSystemConfig<I> {
/// Degree of parallelism on the network. Often called `alpha` in technical papers.
/// No more than this number of remotes will be used at a given time for any given operation.
// TODO: ^ share this number between operations? or does each operation use `alpha` remotes?
pub parallelism: u32,
/// Id of the local peer.
pub local_peer_id: PeerId,
/// List of peers initially known.
pub known_initial_peers: I,
/// Duration after which a node in the k-buckets needs to be pinged again.
pub kbuckets_timeout: Duration,
/// When contacting a node, duration after which we consider it unresponsive.
pub timeout: Duration,
pub request_timeout: Duration,
}
// Builds a `QueryParams` that fetches information from `$controller`.
//
// Because of lifetime issues and type naming issues, a macro is the most convenient solution.
macro_rules! gen_query_params {
($controller:expr) => {{
let controller = $controller;
query::QueryParams {
local_id: $controller.inner.kbuckets.my_id().clone(),
kbuckets_find_closest: {
let controller = controller.clone();
move |addr| controller.inner.kbuckets.find_closest(&addr).collect()
},
parallelism: $controller.inner.parallelism,
find_node: {
let controller = controller.clone();
move |addr, searched| {
// TODO: rewrite to be more direct
Box::new(controller.send(addr, move |ctl| ctl.find_node(&searched)).flatten()) as Box<_>
}
},
}
}};
/// System that drives the whole Kademlia process.
pub struct KadSystem {
// The actual DHT.
kbuckets: KBucketsTable<PeerId, ()>,
// Same as in the config.
parallelism: u32,
// Same as in the config.
request_timeout: Duration,
}
/// Object that allows one to make queries on the Kademlia system.
#[derive(Debug)]
pub struct KademliaControllerPrototype {
inner: Arc<Inner>,
/// Event that happens during a query.
#[derive(Debug, Clone)]
pub enum KadQueryEvent<TOut> {
/// Learned about new mutiaddresses for the given peers.
NewKnownMultiaddrs(Vec<(PeerId, Vec<Multiaddr>)>),
/// Finished the processing of the query. Contains the result.
Finished(TOut),
}
impl KademliaControllerPrototype {
/// Creates a new controller from that configuration.
pub fn new<I>(config: KademliaConfig, initial_peers: I) -> KademliaControllerPrototype
where I: IntoIterator<Item = PeerId>
{
let buckets = KBucketsTable::new(config.local_peer_id.clone(), config.timeout);
for peer_id in initial_peers {
let _ = buckets.update(peer_id, ());
}
let inner = Arc::new(Inner {
kbuckets: buckets,
timer: tokio_timer::wheel().build(),
connections: Default::default(),
timeout: config.timeout,
parallelism: config.parallelism as usize,
});
KademliaControllerPrototype { inner: inner }
}
/// Turns the prototype into an actual controller by feeding it a swarm controller.
impl KadSystem {
/// Starts a new Kademlia system.
///
/// You must pass to this function the transport to use to dial and obtain
/// `KademliaPeerReqStream`, plus a mapping function that will turn the
/// `KademliaPeerReqStream` into whatever the swarm expects.
pub fn start<T, K, M>(
self,
swarm: SwarmController<T>,
kademlia_transport: K,
map: M,
) -> (
KademliaController<T, K, M>,
Box<Future<Item = (), Error = IoError>>,
)
where
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
K: Transport<Output = KademliaPeerReqStream> + Clone + 'static, // TODO: 'static :-/
M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static,
/// Also produces a `Future` that drives a Kademlia initialization process.
/// This future should be driven to completion by the caller.
pub fn start<'a, F, Fut>(config: KadSystemConfig<impl Iterator<Item = PeerId>>, access: F)
-> (KadSystem, impl Future<Item = (), Error = IoError> + 'a)
where F: FnMut(&PeerId) -> Fut + Clone + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
// TODO: initialization
let system = KadSystem::without_init(config);
let init_future = system.perform_initialization(access);
(system, init_future)
}
let controller = KademliaController {
inner: self.inner.clone(),
swarm_controller: swarm,
kademlia_transport,
map,
/// Same as `start`, but doesn't perform the initialization process.
pub fn without_init(config: KadSystemConfig<impl Iterator<Item = PeerId>>) -> KadSystem {
let kbuckets = KBucketsTable::new(config.local_peer_id.clone(), config.kbuckets_timeout);
for peer in config.known_initial_peers {
let _ = kbuckets.update(peer, ());
}
let system = KadSystem {
kbuckets: kbuckets,
parallelism: config.parallelism,
request_timeout: config.request_timeout,
};
let init_future = {
let futures: Vec<_> = (0..256)
.map({
let controller = controller.clone();
move |n| query::refresh(gen_query_params!(controller.clone()), n)
})
.map(|stream| {
stream.for_each(|_| Ok(()))
system
}
/// Starts an initialization process.
pub fn perform_initialization<'a, F, Fut>(&self, access: F) -> impl Future<Item = (), Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + Clone + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
let futures: Vec<_> = (0..256) // TODO: 256 is arbitrary
.map(|n| {
refresh(n, access.clone(), &self.kbuckets,
self.parallelism as usize, self.request_timeout)
})
.map(|stream| stream.for_each(|_| Ok(())))
.collect();
future::loop_fn(futures, |futures| {
@ -157,374 +124,341 @@ impl KademliaControllerPrototype {
.map(|(_, _, rest)| future::Loop::Continue(rest));
future::Either::B(fut)
})
}
/// Updates the k-buckets with the specific peer.
///
/// Should be called whenever we receive a message from a peer.
pub fn update_kbuckets(&self, peer: PeerId) {
// TODO: ping system
let _ = self.kbuckets.update(peer, ());
}
/// Returns the local peer ID, as passed in the configuration.
pub fn local_peer_id(&self) -> &PeerId {
self.kbuckets.my_id()
}
/// Finds the known nodes closest to `id`, ordered by distance.
pub fn known_closest_peers(&self, id: &PeerId) -> impl Iterator<Item = PeerId> {
self.kbuckets.find_closest_with_self(id)
}
/// Starts a query for an iterative `FIND_NODE` request.
pub fn find_node<'a, F, Fut>(&self, searched_key: PeerId, access: F)
-> impl Stream<Item = KadQueryEvent<Vec<PeerId>>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
query(access, &self.kbuckets, searched_key, self.parallelism as usize,
20, self.request_timeout) // TODO: arbitrary const
}
}
// Refreshes a specific bucket by performing an iterative `FIND_NODE` on a random ID of this
// bucket.
//
// Returns a dummy no-op future if `bucket_num` is out of range.
fn refresh<'a, F, Fut>(bucket_num: usize, access: F, kbuckets: &KBucketsTable<PeerId, ()>,
parallelism: usize, request_timeout: Duration)
-> impl Stream<Item = KadQueryEvent<()>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
let peer_id = match gen_random_id(kbuckets.my_id(), bucket_num) {
Ok(p) => p,
Err(()) => {
let stream = stream::once(Ok(KadQueryEvent::Finished(())));
return Box::new(stream) as Box<Stream<Item = _, Error = _>>;
},
};
(controller, Box::new(init_future))
let stream = query(access, kbuckets, peer_id, parallelism, 20, request_timeout) // TODO: 20 is arbitrary
.map(|event| {
match event {
KadQueryEvent::NewKnownMultiaddrs(peers) => KadQueryEvent::NewKnownMultiaddrs(peers),
KadQueryEvent::Finished(_) => KadQueryEvent::Finished(()),
}
});
Box::new(stream) as Box<Stream<Item = _, Error = _>>
}
/// Object that allows one to make queries on the Kademlia system.
#[derive(Debug)]
pub struct KademliaController<T, K, M>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
{
inner: Arc<Inner>,
swarm_controller: SwarmController<T>,
kademlia_transport: K,
map: M,
}
// Generates a random `PeerId` that belongs to the given bucket.
//
// Returns an error if `bucket_num` is out of range.
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
let my_id_len = my_id.as_bytes().len();
impl<T, K, M> Clone for KademliaController<T, K, M>
where
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
K: Clone,
M: Clone,
{
#[inline]
fn clone(&self) -> Self {
KademliaController {
inner: self.inner.clone(),
swarm_controller: self.swarm_controller.clone(),
kademlia_transport: self.kademlia_transport.clone(),
map: self.map.clone(),
// TODO: this 2 is magic here ; it is the length of the hash of the multihash
let bits_diff = bucket_num + 1;
if bits_diff > 8 * (my_id_len - 2) {
return Err(());
}
let mut random_id = [0; 64];
for byte in 0..my_id_len {
match byte.cmp(&(my_id_len - bits_diff / 8 - 1)) {
Ordering::Less => {
random_id[byte] = my_id.as_bytes()[byte];
}
Ordering::Equal => {
let mask: u8 = (1 << (bits_diff % 8)) - 1;
random_id[byte] = (my_id.as_bytes()[byte] & !mask) | (rand::random::<u8>() & mask);
}
Ordering::Greater => {
random_id[byte] = rand::random();
}
}
}
let peer_id = PeerId::from_bytes(random_id[..my_id_len].to_owned())
.expect("randomly-generated peer ID should always be valid");
Ok(peer_id)
}
impl<T, K, M> KademliaController<T, K, M>
where
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
{
/// Performs an iterative find node query on the network.
///
/// Will query the network for the peers that4 are the closest to `searched_key` and return
/// the results.
///
/// The algorithm used is a standard Kademlia algorithm. The details are not documented, so
/// that the implementation is free to modify them.
#[inline]
pub fn find_node(
&self,
// Generic query-performing function.
fn query<'a, F, Fut>(
access: F,
kbuckets: &KBucketsTable<PeerId, ()>,
searched_key: PeerId,
) -> Box<Stream<Item = query::QueryEvent<Vec<PeerId>>, Error = IoError>>
where
K: Transport<Output = KademliaPeerReqStream> + Clone + 'static,
M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/
{
let me = self.clone();
query::find_node(gen_query_params!(me.clone()), searched_key)
}
}
/// Connection upgrade to the Kademlia protocol.
#[derive(Clone)]
pub struct KademliaUpgrade {
inner: Arc<Inner>,
upgrade: KademliaServerConfig,
}
impl KademliaUpgrade {
/// Builds a connection upgrade from the controller.
#[inline]
pub fn from_prototype(proto: &KademliaControllerPrototype) -> Self {
KademliaUpgrade {
inner: proto.inner.clone(),
upgrade: KademliaServerConfig::new(),
}
}
/// Builds a connection upgrade from the controller.
#[inline]
pub fn from_controller<T, K, M>(ctl: &KademliaController<T, K, M>) -> Self
where
T: MuxedTransport,
{
KademliaUpgrade {
inner: ctl.inner.clone(),
upgrade: KademliaServerConfig::new(),
}
}
}
impl<C, Maf> ConnectionUpgrade<C, Maf> for KademliaUpgrade
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :(
parallelism: usize,
num_results: usize,
request_timeout: Duration,
) -> impl Stream<Item = KadQueryEvent<Vec<PeerId>>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
{
type Output = KademliaPeerReqStream;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Future = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
ConnectionUpgrade::<C, Maf>::protocol_names(&self.upgrade)
// State of the current iterative process.
struct State<'a, F> {
// At which stage we are.
stage: Stage,
// The `access` parameter.
access: F,
// Final output of the iteration.
result: Vec<PeerId>,
// For each open connection, a future with the response of the remote.
// Note that don't use a `SmallVec` here because `select_all` produces a `Vec`.
current_attempts_fut: Vec<Box<Future<Item = Vec<protocol::KadPeer>, Error = IoError> + 'a>>,
// For each open connection, the peer ID that we are connected to.
// Must always have the same length as `current_attempts_fut`.
current_attempts_addrs: SmallVec<[PeerId; 32]>,
// Nodes that need to be attempted.
pending_nodes: Vec<PeerId>,
// Peers that we tried to contact but failed.
failed_to_contact: FnvHashSet<PeerId>,
}
#[inline]
fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: Maf) -> Self::Future {
let future = addr.and_then(move |addr| {
let inner = self.inner;
let client_addr = addr.clone();
// General stage of the state.
#[derive(Copy, Clone, PartialEq, Eq)]
enum Stage {
// We are still in the first step of the algorithm where we try to find the closest node.
FirstStep,
// We are contacting the k closest nodes in order to fill the list with enough results.
SecondStep,
// The results are complete, and the next stream iteration will produce the outcome.
FinishingNextIter,
// We are finished and the stream shouldn't return anything anymore.
Finished,
}
let peer_id = {
let mut iter = addr.iter();
let protocol = iter.next();
let after_proto = iter.next();
match (protocol, after_proto) {
(Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => {
match PeerId::from_bytes(key) {
Ok(id) => id,
Err(_) => {
let err = IoError::new(
IoErrorKind::InvalidData,
"invalid peer ID sent by remote identification",
let initial_state = State {
stage: Stage::FirstStep,
access: access,
result: Vec::with_capacity(num_results),
current_attempts_fut: Vec::new(),
current_attempts_addrs: SmallVec::new(),
pending_nodes: kbuckets.find_closest(&searched_key).collect(),
failed_to_contact: Default::default(),
};
// Start of the iterative process.
let stream = stream::unfold(initial_state, move |mut state| -> Option<_> {
match state.stage {
Stage::FinishingNextIter => {
let result = mem::replace(&mut state.result, Vec::new());
debug!("Query finished with {} results", result.len());
state.stage = Stage::Finished;
let future = future::ok((Some(KadQueryEvent::Finished(result)), state));
return Some(future::Either::A(future));
},
Stage::Finished => {
return None;
},
_ => ()
};
let searched_key = searched_key.clone();
// Find out which nodes to contact at this iteration.
let to_contact = {
let wanted_len = if state.stage == Stage::FirstStep {
parallelism.saturating_sub(state.current_attempts_fut.len())
} else {
num_results.saturating_sub(state.current_attempts_fut.len())
};
let mut to_contact = SmallVec::<[_; 16]>::new();
while to_contact.len() < wanted_len && !state.pending_nodes.is_empty() {
// Move the first element of `pending_nodes` to `to_contact`, but ignore nodes that
// are already part of the results or of a current attempt or if we failed to
// contact it before.
let peer = state.pending_nodes.remove(0);
if state.result.iter().any(|p| p == &peer) {
continue;
}
if state.current_attempts_addrs.iter().any(|p| p == &peer) {
continue;
}
if state.failed_to_contact.iter().any(|p| p == &peer) {
continue;
}
to_contact.push(peer);
}
to_contact
};
debug!("New query round ; {} queries in progress ; contacting {} new peers",
state.current_attempts_fut.len(),
to_contact.len());
// For each node in `to_contact`, start an RPC query and a corresponding entry in the two
// `state.current_attempts_*` fields.
for peer in to_contact {
let searched_key2 = searched_key.clone();
let current_attempt = (state.access)(&peer)
.into_future()
.and_then(move |controller| {
controller.find_node(&searched_key2)
});
let with_deadline = Deadline::new(current_attempt, Instant::now() + request_timeout)
.map_err(|err| {
if let Some(err) = err.into_inner() {
err
} else {
IoError::new(IoErrorKind::ConnectionAborted, "kademlia request timeout")
}
});
state.current_attempts_addrs.push(peer.clone());
state
.current_attempts_fut
.push(Box::new(with_deadline) as Box<_>);
}
debug_assert_eq!(
state.current_attempts_addrs.len(),
state.current_attempts_fut.len()
);
return Box::new(future::err(err)) as Box<Future<Item = _, Error = _>>;
// Extract `current_attempts_fut` so that we can pass it to `select_all`. We will push the
// values back when inside the loop.
let current_attempts_fut = mem::replace(&mut state.current_attempts_fut, Vec::new());
if current_attempts_fut.is_empty() {
// If `current_attempts_fut` is empty, then `select_all` would panic. It happens
// when we have no additional node to query.
debug!("Finishing query early because no additional node available");
state.stage = Stage::FinishingNextIter;
let future = future::ok((None, state));
return Some(future::Either::A(future));
}
// This is the future that continues or breaks the `loop_fn`.
let future = future::select_all(current_attempts_fut.into_iter()).then(move |result| {
let (message, trigger_idx, other_current_attempts) = match result {
Err((err, trigger_idx, other_current_attempts)) => {
(Err(err), trigger_idx, other_current_attempts)
}
}
_ => {
let err =
IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node");
return Box::new(future::err(err));
}
Ok((message, trigger_idx, other_current_attempts)) => {
(Ok(message), trigger_idx, other_current_attempts)
}
};
let future = self.upgrade.upgrade(incoming, id, endpoint, future::ok::<_, IoError>(addr)).map(
move |((controller, stream), _)| {
match inner.connections.lock().entry(client_addr.clone()) {
Entry::Occupied(mut entry) => {
match entry.insert(Connection::Active(controller)) {
// If there was already an active connection to this remote, it gets
// replaced by the new more recent one.
Connection::Active(_old_connection) => {}
Connection::Pending(closures) => {
let new_ctl = match entry.get_mut() {
&mut Connection::Active(ref mut ctl) => ctl,
_ => unreachable!(
"logic error: an Active enum variant was \
inserted, but reading back didn't give \
an Active"
),
};
// Putting back the extracted elements in `state`.
let remote_id = state.current_attempts_addrs.remove(trigger_idx);
debug_assert!(state.current_attempts_fut.is_empty());
state.current_attempts_fut = other_current_attempts;
for mut closure in closures {
closure(new_ctl);
}
}
};
}
Entry::Vacant(entry) => {
entry.insert(Connection::Active(controller));
// `message` contains the reason why the current future was woken up.
let closer_peers = match message {
Ok(msg) => msg,
Err(err) => {
trace!("RPC query failed for {:?}: {:?}", remote_id, err);
state.failed_to_contact.insert(remote_id);
return future::ok((None, state));
}
};
let stream = stream.map(move |query| {
match inner.kbuckets.update(peer_id.clone(), ()) {
UpdateOutcome::NeedPing(node_to_ping) => {
// TODO: do something with this info
println!("need to ping {:?}", node_to_ping);
// Inserting the node we received a response from into `state.result`.
// The code is non-trivial because `state.result` is ordered by distance and is limited
// by `num_results` elements.
if let Some(insert_pos) = state.result.iter().position(|e| {
e.distance_with(&searched_key) >= remote_id.distance_with(&searched_key)
}) {
if state.result[insert_pos] != remote_id {
if state.result.len() >= num_results {
state.result.pop();
}
_ => (),
state.result.insert(insert_pos, remote_id);
}
} else if state.result.len() < num_results {
state.result.push(remote_id);
}
match query {
KademliaIncomingRequest::FindNode { searched, responder } => {
let mut intermediate: Vec<_> = inner.kbuckets.find_closest(&searched).collect();
let my_id = inner.kbuckets.my_id().clone();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(&searched) >= my_id.distance_with(&searched))
// The loop below will set this variable to `true` if we find a new element to put at
// the top of the result. This would mean that we have to continue looping.
let mut local_nearest_node_updated = false;
// Update `state` with the actual content of the message.
let mut new_known_multiaddrs = Vec::with_capacity(closer_peers.len());
for mut peer in closer_peers {
// Update the peerstore with the information sent by
// the remote.
{
if intermediate[pos] != my_id {
intermediate.insert(pos, my_id);
let multiaddrs = mem::replace(&mut peer.multiaddrs, Vec::new());
trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, multiaddrs);
new_known_multiaddrs.push((peer.node_id.clone(), multiaddrs));
}
if peer.node_id.distance_with(&searched_key)
<= state.result[0].distance_with(&searched_key)
{
local_nearest_node_updated = true;
}
if state.result.iter().any(|ma| ma == &peer.node_id) {
continue;
}
// Insert the node into `pending_nodes` at the right position, or do not
// insert it if it is already in there.
if let Some(insert_pos) = state.pending_nodes.iter().position(|e| {
e.distance_with(&searched_key) >= peer.node_id.distance_with(&searched_key)
}) {
if state.pending_nodes[insert_pos] != peer.node_id {
state.pending_nodes.insert(insert_pos, peer.node_id.clone());
}
} else {
intermediate.push(my_id);
state.pending_nodes.push(peer.node_id.clone());
}
}
Some(KademliaPeerReq {
requested_peers: intermediate,
inner: responder,
})
},
KademliaIncomingRequest::PingPong => {
// We updated the k-bucket above.
None
},
if state.result.len() >= num_results
|| (state.stage != Stage::FirstStep && state.current_attempts_fut.is_empty())
{
state.stage = Stage::FinishingNextIter;
} else {
if !local_nearest_node_updated {
trace!("Loop didn't update closer node ; jumping to step 2");
state.stage = Stage::SecondStep;
}
}
}).filter_map(|val| val);
(KademliaPeerReqStream { inner: Box::new(stream) }, future::ok(client_addr))
},
);
Box::new(future) as Box<_>
future::ok((Some(KadQueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state))
});
Box::new(future) as Box<_>
}
}
/// Stream that must be processed for the Kademlia system to work.
///
/// Produces requests for peer information. These requests should be answered for the stream to
/// continue to progress.
pub struct KademliaPeerReqStream {
inner: Box<Stream<Item = KademliaPeerReq, Error = IoError>>,
}
impl Stream for KademliaPeerReqStream {
type Item = KademliaPeerReq;
type Error = IoError;
#[inline]
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
self.inner.poll()
}
}
/// Request for information about some peers.
pub struct KademliaPeerReq {
inner: KademliaFindNodeRespond,
requested_peers: Vec<PeerId>,
}
impl KademliaPeerReq {
/// Returns a list of the IDs of the peers that were requested.
#[inline]
pub fn requested_peers(&self) -> SliceIter<PeerId> {
self.requested_peers.iter()
}
/// Responds to the request.
#[inline]
pub fn respond<I>(self, peers: I)
where I: IntoIterator<Item = Peer>
{
self.inner.respond(peers);
}
}
// Inner struct shared throughout the Kademlia system.
#[derive(Debug)]
struct Inner {
// The remotes are identified by their public keys.
kbuckets: KBucketsTable<PeerId, ()>,
// Timer used for building the timeouts.
timer: tokio_timer::Timer,
// Same as in the config.
timeout: Duration,
// Same as in the config.
parallelism: usize,
// List of open connections with remotes.
//
// Since the keys are the nodes' multiaddress, it is expected that each node only has one
// multiaddress. This should be the case if the user uses the identify transport that
// automatically maps peer IDs to multiaddresses.
// TODO: is it correct to use FnvHashMap with a Multiaddr? needs benchmarks
connections: Mutex<FnvHashMap<Multiaddr, Connection>>,
}
// Current state of a connection to a specific multiaddr.
//
// There is no `Inactive` entry, as an inactive connection corresponds to no entry in the
// `connections` hash map.
enum Connection {
// The connection has already been opened and is ready to be controlled through the given
// controller.
Active(KademliaServerController),
// The connection is in the process of being opened. Any closure added to this `Vec` will be
// executed on the controller once it is available.
// Once the connection is open, it will be replaced with `Active`.
// TODO: should be FnOnce once Rust allows that
Pending(Vec<Box<FnMut(&mut KademliaServerController)>>),
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Connection::Active(_) => write!(f, "Connection::Active"),
Connection::Pending(_) => write!(f, "Connection::Pending"),
}
}
}
impl<T, K, M> KademliaController<T, K, M>
where
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
K: Transport<Output = KademliaPeerReqStream> + Clone + 'static, // TODO: 'static
M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/
{
#[inline]
fn send<F, FRet>(
&self,
addr: Multiaddr,
and_then: F,
) -> Box<Future<Item = FRet, Error = IoError>>
where
F: FnOnce(&KademliaServerController) -> FRet + 'static,
FRet: 'static,
{
let mut lock = self.inner.connections.lock();
let pending_list = match lock.entry(addr.clone()) {
Entry::Occupied(entry) => {
match entry.into_mut() {
&mut Connection::Pending(ref mut list) => list,
&mut Connection::Active(ref mut ctrl) => {
// If we have an active connection, entirely short-circuit the function.
let output = future::ok(and_then(ctrl));
return Box::new(output) as Box<_>;
}
}
}
Entry::Vacant(entry) => {
// Need to open a connection.
let map = self.map.clone();
match self.swarm_controller
.dial(addr, self.kademlia_transport.clone().map(move |out, _| map(out)))
{
Ok(_) => (),
Err(_addr) => {
let fut = future::err(IoError::new(
IoErrorKind::InvalidData,
"unsupported multiaddress",
));
return Box::new(fut) as Box<_>;
}
}
match entry.insert(Connection::Pending(Vec::with_capacity(1))) {
&mut Connection::Pending(ref mut list) => list,
_ => unreachable!("we just inserted a Pending variant"),
}
}
};
let (tx, rx) = oneshot::channel();
let mut tx = Some(tx);
let mut and_then = Some(and_then);
pending_list.push(Box::new(move |ctrl: &mut KademliaServerController| {
let and_then = and_then
.take()
.expect("Programmer error: 'pending' closure was called multiple times");
let tx = tx.take()
.expect("Programmer error: 'pending' closure was called multiple times");
let ret = and_then(ctrl);
let _ = tx.send(ret);
}) as Box<_>);
let future = rx.map_err(|_| IoErrorKind::ConnectionAborted.into());
let future = self.inner.timer.timeout(future, self.inner.timeout);
Box::new(future) as Box<_>
}
Some(future::Either::B(future))
}).filter_map(|val| val);
// Boxing the stream is not necessary, but we do it in order to improve compilation time.
Box::new(stream) as Box<_>
}

View File

@ -23,21 +23,21 @@
//!
//! # Usage
//!
//! - Create a `KademliaServerConfig` object. This struct implements `ConnectionUpgrade`.
//! - Create a `KadConnecConfig` object. This struct implements `ConnectionUpgrade`.
//!
//! - Update a connection through that `KademliaServerConfig`. The output yields you a
//! `KademliaServerController` and a stream that must be driven to completion. The controller
//! - Update a connection through that `KadConnecConfig`. The output yields you a
//! `KadConnecController` and a stream that must be driven to completion. The controller
//! allows you to perform queries and receive responses. The stream produces incoming requests
//! from the remote.
//!
//! This `KademliaServerController` is usually extracted and stored in some sort of hash map in an
//! This `KadConnecController` is usually extracted and stored in some sort of hash map in an
//! `Arc` in order to be available whenever we need to request something from a node.
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::{future, Future, Sink, stream, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
use protocol::{self, KadMsg, KademliaProtocolConfig, Peer};
use protocol::{self, KadMsg, KademliaProtocolConfig, KadPeer};
use std::collections::VecDeque;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
@ -45,31 +45,31 @@ use tokio_io::{AsyncRead, AsyncWrite};
/// Configuration for a Kademlia server.
///
/// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KademliaServerController`
/// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KadConnecController`
/// and a `Future`. The controller lets you send queries to the remote and receive answers, while
/// the `Future` must be driven to completion in order for things to work.
#[derive(Debug, Clone)]
pub struct KademliaServerConfig {
pub struct KadConnecConfig {
raw_proto: KademliaProtocolConfig,
}
impl KademliaServerConfig {
impl KadConnecConfig {
/// Builds a configuration object for an upcoming Kademlia server.
#[inline]
pub fn new() -> Self {
KademliaServerConfig {
KadConnecConfig {
raw_proto: KademliaProtocolConfig,
}
}
}
impl<C, Maf> ConnectionUpgrade<C, Maf> for KademliaServerConfig
impl<C, Maf> ConnectionUpgrade<C, Maf> for KadConnecConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
{
type Output = (
KademliaServerController,
Box<Stream<Item = KademliaIncomingRequest, Error = IoError>>,
KadConnecController,
Box<Stream<Item = KadIncomingRequest, Error = IoError>>,
);
type MultiaddrFuture = Maf;
type Future = future::Map<<KademliaProtocolConfig as ConnectionUpgrade<C, Maf>>::Future, fn((<KademliaProtocolConfig as ConnectionUpgrade<C, Maf>>::Output, Maf)) -> (Self::Output, Maf)>;
@ -93,7 +93,7 @@ where
/// Allows sending Kademlia requests and receiving responses.
#[derive(Debug, Clone)]
pub struct KademliaServerController {
pub struct KadConnecController {
// In order to send a request, we use this sender to send a tuple. The first element of the
// tuple is the message to send to the remote, and the second element is what is used to
// receive the response. If the query doesn't expect a response (eg. `PUT_VALUE`), then the
@ -101,13 +101,13 @@ pub struct KademliaServerController {
inner: mpsc::UnboundedSender<(KadMsg, oneshot::Sender<KadMsg>)>,
}
impl KademliaServerController {
impl KadConnecController {
/// Sends a `FIND_NODE` query to the node and provides a future that will contain the response.
// TODO: future item could be `impl Iterator` instead
pub fn find_node(
&self,
searched_key: &PeerId,
) -> impl Future<Item = Vec<Peer>, Error = IoError> {
) -> impl Future<Item = Vec<KadPeer>, Error = IoError> {
let message = protocol::KadMsg::FindNodeReq {
key: searched_key.clone().into_bytes(),
};
@ -159,13 +159,13 @@ impl KademliaServerController {
}
/// Request received from the remote.
pub enum KademliaIncomingRequest {
pub enum KadIncomingRequest {
/// Find the nodes closest to `searched`.
FindNode {
/// The value being searched.
searched: PeerId,
/// Object to use to respond to the request.
responder: KademliaFindNodeRespond,
responder: KadFindNodeRespond,
},
// TODO: PutValue and FindValue
@ -175,14 +175,14 @@ pub enum KademliaIncomingRequest {
}
/// Object used to respond to `FindNode` queries from remotes.
pub struct KademliaFindNodeRespond {
pub struct KadFindNodeRespond {
inner: oneshot::Sender<KadMsg>,
}
impl KademliaFindNodeRespond {
impl KadFindNodeRespond {
/// Respond to the `FindNode` request.
pub fn respond<I>(self, peers: I)
where I: IntoIterator<Item = protocol::Peer>
where I: IntoIterator<Item = protocol::KadPeer>
{
let _ = self.inner.send(KadMsg::FindNodeRes {
closer_peers: peers.into_iter().collect()
@ -191,12 +191,12 @@ impl KademliaFindNodeRespond {
}
// Builds a controller and stream from a stream/sink of raw messages.
fn build_from_sink_stream<'a, S>(connec: S) -> (KademliaServerController, Box<Stream<Item = KademliaIncomingRequest, Error = IoError> + 'a>)
fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + 'a>)
where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Error = IoError> + 'a
{
let (tx, rx) = mpsc::unbounded();
let future = kademlia_handler(connec, rx);
let controller = KademliaServerController { inner: tx };
let controller = KadConnecController { inner: tx };
(controller, future)
}
@ -211,7 +211,7 @@ where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Er
fn kademlia_handler<'a, S>(
kad_bistream: S,
rq_rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender<KadMsg>)>,
) -> Box<Stream<Item = KademliaIncomingRequest, Error = IoError> + 'a>
) -> Box<Stream<Item = KadIncomingRequest, Error = IoError> + 'a>
where
S: Stream<Item = KadMsg, Error = IoError> + Sink<SinkItem = KadMsg, SinkError = IoError> + 'a,
{
@ -239,8 +239,7 @@ where
.map_err(|_| unreachable!());
let rq_rx = rq_rx
.map(|(m, o)| EventSource::LocalRequest(m, o))
.map_err(|_| unreachable!())
.chain(future::ok(EventSource::Finished).into_stream());
.map_err(|_| unreachable!());
let kad_stream = kad_stream
.map(|m| EventSource::Remote(m))
.chain(future::ok(EventSource::Finished).into_stream());
@ -325,7 +324,7 @@ where
// remote will see its PING answered only when it PONGs us.
let future = future::ok({
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
let rq = KademliaIncomingRequest::PingPong;
let rq = KadIncomingRequest::PingPong;
(Some(rq), state)
});
Box::new(future) as Box<_>
@ -334,7 +333,7 @@ where
.send(KadMsg::Ping)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
let rq = KademliaIncomingRequest::PingPong;
let rq = KadIncomingRequest::PingPong;
(Some(rq), state)
});
Box::new(future) as Box<_>
@ -371,9 +370,9 @@ where
let _ = responders_tx.unbounded_send(rx);
let future = future::ok({
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
let rq = KademliaIncomingRequest::FindNode {
let rq = KadIncomingRequest::FindNode {
searched: peer_id,
responder: KademliaFindNodeRespond {
responder: KadFindNodeRespond {
inner: tx
}
};
@ -407,9 +406,9 @@ mod tests {
use std::iter;
use futures::{Future, Poll, Sink, StartSend, Stream};
use futures::sync::mpsc;
use kad_server::{self, KademliaIncomingRequest, KademliaServerController};
use kad_server::{self, KadIncomingRequest, KadConnecController};
use libp2p_core::PublicKey;
use protocol::{ConnectionType, Peer};
use protocol::{KadConnectionType, KadPeer};
use rand;
// This struct merges a stream and a sink and is quite useful for tests.
@ -441,7 +440,7 @@ mod tests {
}
}
fn build_test() -> (KademliaServerController, impl Stream<Item = KademliaIncomingRequest, Error = IoError>, KademliaServerController, impl Stream<Item = KademliaIncomingRequest, Error = IoError>) {
fn build_test() -> (KadConnecController, impl Stream<Item = KadIncomingRequest, Error = IoError>, KadConnecController, impl Stream<Item = KadIncomingRequest, Error = IoError>) {
let (a_to_b, b_from_a) = mpsc::unbounded();
let (b_to_a, a_from_b) = mpsc::unbounded();
@ -464,7 +463,7 @@ mod tests {
let streams = stream_events_a.map(|ev| (ev, "a"))
.select(stream_events_b.map(|ev| (ev, "b")));
match streams.into_future().map_err(|(err, _)| err).wait().unwrap() {
(Some((KademliaIncomingRequest::PingPong, "b")), _) => {},
(Some((KadIncomingRequest::PingPong, "b")), _) => {},
_ => panic!()
}
}
@ -480,20 +479,20 @@ mod tests {
let find_node_fut = controller_a.find_node(&random_peer_id);
let example_response = Peer {
let example_response = KadPeer {
node_id: {
let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::<Vec<_>>();
PublicKey::Rsa(buf).into_peer_id()
},
multiaddrs: Vec::new(),
connection_ty: ConnectionType::Connected,
connection_ty: KadConnectionType::Connected,
};
let streams = stream_events_a.map(|ev| (ev, "a"))
.select(stream_events_b.map(|ev| (ev, "b")));
let streams = match streams.into_future().map_err(|(err, _)| err).wait().unwrap() {
(Some((KademliaIncomingRequest::FindNode { searched, responder }, "b")), streams) => {
(Some((KadIncomingRequest::FindNode { searched, responder }, "b")), streams) => {
assert_eq!(searched, random_peer_id);
responder.respond(iter::once(example_response.clone()));
streams

View File

@ -208,6 +208,26 @@ where
out.into_iter()
}
/// Same as `find_closest`, but includes the local peer as well.
pub fn find_closest_with_self(&self, id: &Id) -> VecIntoIter<Id>
where
Id: Clone,
{
// TODO: optimize
let mut intermediate: Vec<_> = self.find_closest(&id).collect();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(&id) >= self.my_id.distance_with(&id))
{
if intermediate[pos] != self.my_id {
intermediate.insert(pos, self.my_id.clone());
}
} else {
intermediate.push(self.my_id.clone());
}
intermediate.into_iter()
}
/// Marks the node as "most recent" in its bucket and modifies the value associated to it.
/// This function should be called whenever we receive a communication from a node.
pub fn update(&self, id: Id, value: Val) -> UpdateOutcome<Id, Val> {

View File

@ -24,17 +24,15 @@
//!
//! Usage is done in the following steps:
//!
//! - Build a `KademliaConfig` that contains the way you want the Kademlia protocol to behave.
//! - Build a `KadSystemConfig` and a `KadConnecConfig` object that contain the way you want the
//! Kademlia protocol to behave.
//!
//! - Build a `KademliaControllerPrototype` from that configuration object.
//! - Create a swarm that upgrades incoming connections with the `KadConnecConfig`.
//!
//! - Build a `KademliaUpgrade` from that prototype. Then create a swarm (from the *swarm* crate)
//! and pass the `KademliaUpgrade` you built as part of the list of protocols.
//! - Build a `KadSystem` from the `KadSystemConfig`. This requires passing a closure that provides
//! the Kademlia controller of a peer.
//!
//! - Then turn the controller prototype into an actual `KademliaController` by passing to it the
//! swarm controller you got.
//!
//! - You can now perform operations using that controller.
//! - You can perform queries using the `KadSystem`.
//!
// TODO: we allow dead_code for now because this library contains a lot of unused code that will
@ -54,11 +52,8 @@
// will automatically respond to Kad requests received by the remote. The controller lets you
// send your own requests to this remote and obtain strongly-typed responses.
//
// - The third level of abstraction is in `high_level`. This module also provides a
// `ConnectionUpgrade`, but all the upgraded connections share informations through a struct in
// an `Arc`. The user has a single clonable controller that operates on all the upgraded
// connections. This controller lets you perform peer discovery and record load operations over
// the whole network.
// - The third level of abstraction is in `high_level`. This module only provides the
// `KademliaSystem`.
//
extern crate arrayvec;
@ -83,14 +78,12 @@ extern crate tokio_io;
extern crate tokio_timer;
extern crate varint;
pub use self::high_level::{KademliaConfig, KademliaController, KademliaControllerPrototype};
pub use self::high_level::{KademliaPeerReqStream, KademliaUpgrade, KademliaPeerReq};
pub use self::protocol::{ConnectionType, Peer};
pub use self::query::QueryEvent;
pub use self::high_level::{KadSystemConfig, KadSystem, KadQueryEvent};
pub use self::kad_server::{KadConnecController, KadConnecConfig, KadIncomingRequest, KadFindNodeRespond};
pub use self::protocol::{KadConnectionType, KadPeer};
mod high_level;
mod kad_server;
mod kbucket;
mod protobuf_structs;
mod protocol;
mod query;

View File

@ -37,7 +37,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec;
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub enum ConnectionType {
pub enum KadConnectionType {
/// Sender hasn't tried to connect to peer.
NotConnected = 0,
/// Sender is currently connected to peer.
@ -48,45 +48,45 @@ pub enum ConnectionType {
CannotConnect = 3,
}
impl From<protobuf_structs::dht::Message_ConnectionType> for ConnectionType {
impl From<protobuf_structs::dht::Message_ConnectionType> for KadConnectionType {
#[inline]
fn from(raw: protobuf_structs::dht::Message_ConnectionType) -> ConnectionType {
fn from(raw: protobuf_structs::dht::Message_ConnectionType) -> KadConnectionType {
use protobuf_structs::dht::Message_ConnectionType::*;
match raw {
NOT_CONNECTED => ConnectionType::NotConnected,
CONNECTED => ConnectionType::Connected,
CAN_CONNECT => ConnectionType::CanConnect,
CANNOT_CONNECT => ConnectionType::CannotConnect,
NOT_CONNECTED => KadConnectionType::NotConnected,
CONNECTED => KadConnectionType::Connected,
CAN_CONNECT => KadConnectionType::CanConnect,
CANNOT_CONNECT => KadConnectionType::CannotConnect,
}
}
}
impl Into<protobuf_structs::dht::Message_ConnectionType> for ConnectionType {
impl Into<protobuf_structs::dht::Message_ConnectionType> for KadConnectionType {
#[inline]
fn into(self) -> protobuf_structs::dht::Message_ConnectionType {
use protobuf_structs::dht::Message_ConnectionType::*;
match self {
ConnectionType::NotConnected => NOT_CONNECTED,
ConnectionType::Connected => CONNECTED,
ConnectionType::CanConnect => CAN_CONNECT,
ConnectionType::CannotConnect => CANNOT_CONNECT,
KadConnectionType::NotConnected => NOT_CONNECTED,
KadConnectionType::Connected => CONNECTED,
KadConnectionType::CanConnect => CAN_CONNECT,
KadConnectionType::CannotConnect => CANNOT_CONNECT,
}
}
}
/// Information about a peer, as known by the sender.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Peer {
pub struct KadPeer {
pub node_id: PeerId,
/// The multiaddresses that are known for that peer.
pub multiaddrs: Vec<Multiaddr>,
pub connection_ty: ConnectionType,
pub connection_ty: KadConnectionType,
}
impl Peer {
// Builds a `Peer` from its raw protobuf equivalent.
impl KadPeer {
// Builds a `KadPeer` from its raw protobuf equivalent.
// TODO: use TryFrom once stable
fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result<Peer, IoError> {
fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result<KadPeer, IoError> {
// TODO: this is in fact a CID ; not sure if this should be handled in `from_bytes` or
// as a special case here
let node_id = PeerId::from_bytes(peer.get_id().to_vec())
@ -102,7 +102,7 @@ impl Peer {
let connection_ty = peer.get_connection().into();
Ok(Peer {
Ok(KadPeer {
node_id: node_id,
multiaddrs: addrs,
connection_ty: connection_ty,
@ -110,7 +110,7 @@ impl Peer {
}
}
impl Into<protobuf_structs::dht::Message_Peer> for Peer {
impl Into<protobuf_structs::dht::Message_Peer> for KadPeer {
fn into(self) -> protobuf_structs::dht::Message_Peer {
let mut out = protobuf_structs::dht::Message_Peer::new();
out.set_id(self.node_id.into_bytes());
@ -190,7 +190,7 @@ pub enum KadMsg {
/// Identifier of the returned record.
key: Vec<u8>,
record: (), //record: Option<protobuf_structs::record::Record>, // TODO: no
closer_peers: Vec<Peer>,
closer_peers: Vec<KadPeer>,
},
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
/// returned is not specified, but should be around 20.
@ -201,7 +201,7 @@ pub enum KadMsg {
/// Response to a `FindNodeReq`.
FindNodeRes {
/// Results of the request.
closer_peers: Vec<Peer>,
closer_peers: Vec<KadPeer>,
},
}
@ -237,6 +237,7 @@ fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message {
}
KadMsg::FindNodeRes { closer_peers } => {
// TODO: if empty, the remote will think it's a request
// TODO: not good, possibly exposed in the API
assert!(!closer_peers.is_empty());
let mut msg = protobuf_structs::dht::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE);
@ -280,7 +281,7 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result<KadMsg, I
// errors for now, but ultimately we should just error altogether
let closer_peers = message.mut_closerPeers()
.iter_mut()
.filter_map(|peer| Peer::from_peer(peer).ok())
.filter_map(|peer| KadPeer::from_peer(peer).ok())
.collect::<Vec<_>>();
Ok(KadMsg::FindNodeRes {
@ -309,7 +310,7 @@ mod tests {
use self::libp2p_tcp_transport::TcpConfig;
use futures::{Future, Sink, Stream};
use libp2p_core::{Transport, PeerId, PublicKey};
use protocol::{ConnectionType, KadMsg, KademliaProtocolConfig, Peer};
use protocol::{KadConnectionType, KadMsg, KademliaProtocolConfig, KadPeer};
use std::sync::mpsc;
use std::thread;
@ -331,10 +332,10 @@ mod tests {
});
test_one(KadMsg::FindNodeRes {
closer_peers: vec![
Peer {
KadPeer {
node_id: PeerId::from_public_key(PublicKey::Rsa(vec![93, 80, 12, 250])),
multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
connection_ty: ConnectionType::Connected,
connection_ty: KadConnectionType::Connected,
},
],
});

View File

@ -30,12 +30,14 @@ use bigint::U512;
use futures::{Future, Stream};
use libp2p::peerstore::{PeerAccess, PeerId, Peerstore};
use libp2p::Multiaddr;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use libp2p::core::{Transport, PublicKey};
use libp2p::core::{Transport, PublicKey, UniqueConnec};
use libp2p::core::{upgrade, either::EitherOutput};
use libp2p::kad::{ConnectionType, Peer, QueryEvent};
use libp2p::kad::{KadConnecConfig, KadConnectionType, KadPeer, KadQueryEvent, KadSystem};
use libp2p::kad::{KadSystemConfig, KadIncomingRequest};
use libp2p::tcp::TcpConfig;
fn main() {
@ -116,53 +118,69 @@ fn main() {
let my_peer_id = PeerId::from_public_key(PublicKey::Rsa(include_bytes!("test-rsa-public-key.der").to_vec()));
println!("Local peer id is: {:?}", my_peer_id);
// Let's put this `transport` into a Kademlia *swarm*. The swarm will handle all the incoming
// and outgoing connections for us.
let kad_config = libp2p::kad::KademliaConfig {
let kad_system = Arc::new(KadSystem::without_init(KadSystemConfig {
parallelism: 3,
local_peer_id: my_peer_id.clone(),
timeout: Duration::from_secs(2),
};
kbuckets_timeout: Duration::from_secs(10),
request_timeout: Duration::from_secs(10),
known_initial_peers: peer_store.peers(),
}));
let kad_ctl_proto = libp2p::kad::KademliaControllerPrototype::new(kad_config, peer_store.peers());
let proto = libp2p::kad::KademliaUpgrade::from_prototype(&kad_ctl_proto);
let active_kad_connections = Arc::new(Mutex::new(HashMap::<_, UniqueConnec<_>>::new()));
// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and
// outgoing connections for us.
let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(proto.clone()),
transport.clone().with_upgrade(KadConnecConfig::new()),
{
let peer_store = peer_store.clone();
move |kademlia_stream, _| {
let kad_system = kad_system.clone();
let active_kad_connections = active_kad_connections.clone();
move |(kad_ctrl, kad_stream), node_addr| {
let peer_store = peer_store.clone();
kademlia_stream.for_each(move |req| {
let kad_system = kad_system.clone();
let active_kad_connections = active_kad_connections.clone();
node_addr.and_then(move |node_addr| {
let node_id = p2p_multiaddr_to_node_id(node_addr);
let node_id2 = node_id.clone();
let fut = kad_stream.for_each(move |req| {
let peer_store = peer_store.clone();
let result = req
.requested_peers()
kad_system.update_kbuckets(node_id2.clone());
match req {
KadIncomingRequest::FindNode { searched, responder } => {
let result = kad_system
.known_closest_peers(&searched)
.map(move |peer_id| {
let addrs = peer_store
.peer(peer_id)
.peer(&peer_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>();
Peer {
KadPeer {
node_id: peer_id.clone(),
multiaddrs: addrs,
connection_ty: ConnectionType::Connected, // meh :-/
connection_ty: KadConnectionType::Connected, // meh :-/
}
})
.collect::<Vec<_>>();
req.respond(result);
responder.respond(result);
},
KadIncomingRequest::PingPong => {
}
};
Ok(())
});
let mut active_kad_connections = active_kad_connections.lock().unwrap();
active_kad_connections
.entry(node_id)
.or_insert_with(Default::default)
.set_until(kad_ctrl, fut)
})
}
}
);
let (kad_controller, _kad_init) =
kad_ctl_proto.start(swarm_controller.clone(), transport.with_upgrade(proto), |out| out);
for listen_addr in listen_addrs {
let addr = swarm_controller
.listen_on(listen_addr.parse().expect("invalid multiaddr"))
@ -170,18 +188,23 @@ fn main() {
println!("Now listening on {:?}", addr);
}
let finish_enum = kad_controller
.find_node(my_peer_id.clone())
let finish_enum = kad_system
.find_node(my_peer_id.clone(), |peer| {
let addr = Multiaddr::from(libp2p::multiaddr::AddrComponent::P2P(peer.clone().into_bytes()));
active_kad_connections.lock().unwrap().entry(peer.clone())
.or_insert_with(Default::default)
.get_or_dial(&swarm_controller, &addr, transport.clone().with_upgrade(KadConnecConfig::new()))
})
.filter_map(move |event| {
match event {
QueryEvent::NewKnownMultiaddrs(peers) => {
KadQueryEvent::NewKnownMultiaddrs(peers) => {
for (peer, addrs) in peers {
peer_store.peer_or_create(&peer)
.add_addrs(addrs, Duration::from_secs(3600));
}
None
},
QueryEvent::Finished(out) => Some(out),
KadQueryEvent::Finished(out) => Some(out),
}
})
.into_future()
@ -209,13 +232,29 @@ fn main() {
).unwrap();
}
/// Expects a multiaddr of the format `/p2p/<node_id>` and returns the node ID.
/// Panics if the format is not correct.
fn p2p_multiaddr_to_node_id(client_addr: Multiaddr) -> PeerId {
let (first, second);
{
let mut iter = client_addr.iter();
first = iter.next();
second = iter.next();
}
match (first, second) {
(Some(libp2p::multiaddr::AddrComponent::P2P(node_id)), None) =>
PeerId::from_bytes(node_id).expect("libp2p always reports a valid node id"),
_ => panic!("Reported multiaddress is in the wrong format ; programmer error")
}
}
/// Stores initial addresses on the given peer store. Uses a very large timeout.
pub fn ipfs_bootstrap<P>(peer_store: P)
where
P: Peerstore + Clone,
{
const ADDRESSES: &[&str] = &[
"/ip4/127.0.0.1/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
"/ip4/127.0.0.1/tcp/4001/ipfs/QmQRx32wQkw3hB45j4UDw8V9Ju4mGbxMyhs2m8mpFrFkur",
// TODO: add some bootstrap nodes here
];