Lots of improvements to kademlia code (#243)

* No longer panic when updating self peer ID in kbuckets

* Minor code improvement in flush()

* Small improvement to handle_find_node_req

* expected_pongs no longer mut

* Clean up KadServerInterface trait

* find_node() returns an impl Future

* Rework kad_server's API to remove the interface

* Remove the error mapping in kad_bistream.split()

* Use a name type in protocol.rs

* respond() now takes an iter of Peers + add tests

* Use concrete Future type in kad_server upgrade

* Let the high level code decide the TTL of the addrs

* Replace QueryInterface::send with find_node_rpc

* Replace KademliaProcessingFuture with KademliaPeerReqStream

* requested_peers() now returns an iter

* gen_random_id() only requires &PeerId

* Remove QueryInterface and return stream of events

* Remove add_peer_addrs from query

* Remove the peer_store and record_store params

* Tweak multiaddresses reportin

* Remove dependency on peerstore

* Fix tests
This commit is contained in:
Pierre Krieger
2018-06-07 17:15:19 +02:00
committed by GitHub
parent c4a92e2493
commit 6897eca91f
8 changed files with 681 additions and 579 deletions

View File

@ -12,7 +12,6 @@ datastore = { path = "../datastore" }
fnv = "1.0"
futures = "0.1"
libp2p-identify = { path = "../identify" }
libp2p-peerstore = { path = "../peerstore" }
libp2p-ping = { path = "../ping" }
libp2p-core = { path = "../core" }
log = "0.4"

View File

@ -25,20 +25,19 @@
use bytes::Bytes;
use fnv::FnvHashMap;
use futures::sync::oneshot;
use futures::{self, future, Future};
use kad_server::{KadServerInterface, KademliaServerConfig, KademliaServerController};
use futures::{self, future, Future, Stream};
use kad_server::{KademliaServerConfig, KademliaServerController, KademliaIncomingRequest, KademliaFindNodeRespond};
use kbucket::{KBucketsPeerId, KBucketsTable, UpdateOutcome};
use libp2p_peerstore::{PeerAccess, PeerId, Peerstore};
use libp2p_core::{ConnectionUpgrade, Endpoint, MuxedTransport, SwarmController, Transport};
use multiaddr::Multiaddr;
use libp2p_core::{ConnectionUpgrade, Endpoint, MuxedTransport, PeerId, SwarmController, Transport};
use multiaddr::{AddrComponent, Multiaddr};
use parking_lot::Mutex;
use protocol::ConnectionType;
use protocol::Peer;
use query;
use std::collections::hash_map::Entry;
use std::fmt;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use std::ops::Deref;
use std::slice::Iter as SliceIter;
use std::sync::Arc;
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};
@ -46,45 +45,60 @@ use tokio_timer;
/// Prototype for a future Kademlia protocol running on a socket.
#[derive(Debug, Clone)]
pub struct KademliaConfig<P, R> {
pub struct KademliaConfig {
/// Degree of parallelism on the network. Often called `alpha` in technical papers.
/// No more than this number of remotes will be used at a given time for any given operation.
// TODO: ^ share this number between operations? or does each operation use `alpha` remotes?
pub parallelism: u32,
/// Used to load and store data requests of peers.
// TODO: say that must implement the `Recordstore` trait.
pub record_store: R,
/// Used to load and store information about peers.
pub peer_store: P,
/// Id of the local peer.
pub local_peer_id: PeerId,
/// When contacting a node, duration after which we consider it unresponsive.
pub timeout: Duration,
}
/// Object that allows one to make queries on the Kademlia system.
#[derive(Debug)]
pub struct KademliaControllerPrototype<P, R> {
inner: Arc<Inner<P, R>>,
// Builds a `QueryParams` that fetches information from `$controller`.
//
// Because of lifetime issues and type naming issues, a macro is the most convenient solution.
macro_rules! gen_query_params {
($controller:expr) => {{
let controller = $controller;
query::QueryParams {
local_id: $controller.inner.kbuckets.my_id().clone(),
kbuckets_find_closest: {
let controller = controller.clone();
move |addr| controller.inner.kbuckets.find_closest(&addr).collect()
},
parallelism: $controller.inner.parallelism,
find_node: {
let controller = controller.clone();
move |addr, searched| {
// TODO: rewrite to be more direct
Box::new(controller.send(addr, move |ctl| ctl.find_node(&searched)).flatten()) as Box<_>
}
},
}
}};
}
impl<P, Pc, R> KademliaControllerPrototype<P, R>
where
P: Deref<Target = Pc>,
for<'r> &'r Pc: Peerstore,
{
/// Object that allows one to make queries on the Kademlia system.
#[derive(Debug)]
pub struct KademliaControllerPrototype {
inner: Arc<Inner>,
}
impl KademliaControllerPrototype {
/// Creates a new controller from that configuration.
pub fn new(config: KademliaConfig<P, R>) -> KademliaControllerPrototype<P, R> {
pub fn new<I>(config: KademliaConfig, initial_peers: I) -> KademliaControllerPrototype
where I: IntoIterator<Item = PeerId>
{
let buckets = KBucketsTable::new(config.local_peer_id.clone(), config.timeout);
for peer_id in config.peer_store.deref().peers() {
for peer_id in initial_peers {
let _ = buckets.update(peer_id, ());
}
let inner = Arc::new(Inner {
kbuckets: buckets,
timer: tokio_timer::wheel().build(),
record_store: config.record_store,
peer_store: config.peer_store,
connections: Default::default(),
timeout: config.timeout,
parallelism: config.parallelism as usize,
@ -96,24 +110,21 @@ where
/// Turns the prototype into an actual controller by feeding it a swarm controller.
///
/// You must pass to this function the transport to use to dial and obtain
/// `KademliaProcessingFuture`, plus a mapping function that will turn the
/// `KademliaProcessingFuture` into whatever the swarm expects.
/// `KademliaPeerReqStream`, plus a mapping function that will turn the
/// `KademliaPeerReqStream` into whatever the swarm expects.
pub fn start<T, K, M>(
self,
swarm: SwarmController<T>,
kademlia_transport: K,
map: M,
) -> (
KademliaController<P, R, T, K, M>,
KademliaController<T, K, M>,
Box<Future<Item = (), Error = IoError>>,
)
where
P: Clone + Deref<Target = Pc> + 'static, // TODO: 'static :-/
for<'r> &'r Pc: Peerstore,
R: Clone + 'static, // TODO: 'static :-/
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
K: Transport<Output = KademliaProcessingFuture> + Clone + 'static, // TODO: 'static :-/
M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static,
K: Transport<Output = KademliaPeerReqStream> + Clone + 'static, // TODO: 'static :-/
M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static,
{
// TODO: initialization
@ -126,7 +137,13 @@ where
let init_future = {
let futures: Vec<_> = (0..256)
.map(|n| query::refresh(controller.clone(), n))
.map({
let controller = controller.clone();
move |n| query::refresh(gen_query_params!(controller.clone()), n)
})
.map(|stream| {
stream.for_each(|_| Ok(()))
})
.collect();
future::loop_fn(futures, |futures| {
@ -148,17 +165,17 @@ where
/// Object that allows one to make queries on the Kademlia system.
#[derive(Debug)]
pub struct KademliaController<P, R, T, K, M>
pub struct KademliaController<T, K, M>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
{
inner: Arc<Inner<P, R>>,
inner: Arc<Inner>,
swarm_controller: SwarmController<T>,
kademlia_transport: K,
map: M,
}
impl<P, R, T, K, M> Clone for KademliaController<P, R, T, K, M>
impl<T, K, M> Clone for KademliaController<T, K, M>
where
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
K: Clone,
@ -175,11 +192,8 @@ where
}
}
impl<P, Pc, R, T, K, M> KademliaController<P, R, T, K, M>
impl<T, K, M> KademliaController<T, K, M>
where
P: Deref<Target = Pc>,
for<'r> &'r Pc: Peerstore,
R: Clone,
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
{
/// Performs an iterative find node query on the network.
@ -193,55 +207,51 @@ where
pub fn find_node(
&self,
searched_key: PeerId,
) -> Box<Future<Item = Vec<PeerId>, Error = IoError>>
) -> Box<Stream<Item = query::QueryEvent<Vec<PeerId>>, Error = IoError>>
where
P: Clone + 'static,
R: 'static,
K: Transport<Output = KademliaProcessingFuture> + Clone + 'static,
M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static, // TODO: 'static :-/
K: Transport<Output = KademliaPeerReqStream> + Clone + 'static,
M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/
{
query::find_node(self.clone(), searched_key)
let me = self.clone();
query::find_node(gen_query_params!(me.clone()), searched_key)
}
}
/// Connection upgrade to the Kademlia protocol.
#[derive(Clone)]
pub struct KademliaUpgrade<P, R> {
inner: Arc<Inner<P, R>>,
upgrade: KademliaServerConfig<Arc<Inner<P, R>>>,
pub struct KademliaUpgrade {
inner: Arc<Inner>,
upgrade: KademliaServerConfig,
}
impl<P, R> KademliaUpgrade<P, R> {
impl KademliaUpgrade {
/// Builds a connection upgrade from the controller.
#[inline]
pub fn from_prototype(proto: &KademliaControllerPrototype<P, R>) -> Self {
pub fn from_prototype(proto: &KademliaControllerPrototype) -> Self {
KademliaUpgrade {
inner: proto.inner.clone(),
upgrade: KademliaServerConfig::new(proto.inner.clone()),
upgrade: KademliaServerConfig::new(),
}
}
/// Builds a connection upgrade from the controller.
#[inline]
pub fn from_controller<T, K, M>(ctl: &KademliaController<P, R, T, K, M>) -> Self
pub fn from_controller<T, K, M>(ctl: &KademliaController<T, K, M>) -> Self
where
T: MuxedTransport,
{
KademliaUpgrade {
inner: ctl.inner.clone(),
upgrade: KademliaServerConfig::new(ctl.inner.clone()),
upgrade: KademliaServerConfig::new(),
}
}
}
impl<C, P, Pc, R> ConnectionUpgrade<C> for KademliaUpgrade<P, R>
impl<C> ConnectionUpgrade<C> for KademliaUpgrade
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
P: Deref<Target = Pc> + Clone + 'static, // TODO: 'static :-/
for<'r> &'r Pc: Peerstore,
R: 'static, // TODO: 'static :-/
{
type Output = KademliaProcessingFuture;
type Output = KademliaPeerReqStream;
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@ -256,8 +266,33 @@ where
let inner = self.inner;
let client_addr = addr.clone();
let peer_id = {
let mut iter = addr.iter();
let protocol = iter.next();
let after_proto = iter.next();
match (protocol, after_proto) {
(Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => {
match PeerId::from_bytes(key) {
Ok(id) => id,
Err(_) => {
let err = IoError::new(
IoErrorKind::InvalidData,
"invalid peer ID sent by remote identification",
);
return Box::new(future::err(err));
}
}
}
_ => {
let err =
IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node");
return Box::new(future::err(err));
}
}
};
let future = self.upgrade.upgrade(incoming, id, endpoint, addr).map(
move |(controller, future)| {
move |(controller, stream)| {
match inner.connections.lock().entry(client_addr) {
Entry::Occupied(mut entry) => {
match entry.insert(Connection::Active(controller)) {
@ -285,7 +320,43 @@ where
}
};
KademliaProcessingFuture { inner: future }
let stream = stream.map(move |query| {
match inner.kbuckets.update(peer_id.clone(), ()) {
UpdateOutcome::NeedPing(node_to_ping) => {
// TODO: do something with this info
println!("need to ping {:?}", node_to_ping);
}
_ => (),
}
match query {
KademliaIncomingRequest::FindNode { searched, responder } => {
let mut intermediate: Vec<_> = inner.kbuckets.find_closest(&searched).collect();
let my_id = inner.kbuckets.my_id().clone();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(&searched) >= my_id.distance_with(&searched))
{
if intermediate[pos] != my_id {
intermediate.insert(pos, my_id);
}
} else {
intermediate.push(my_id);
}
Some(KademliaPeerReq {
requested_peers: intermediate,
inner: responder,
})
},
KademliaIncomingRequest::PingPong => {
// We updated the k-bucket above.
None
},
}
}).filter_map(|val| val);
KademliaPeerReqStream { inner: Box::new(stream) }
},
);
@ -293,24 +364,49 @@ where
}
}
/// Future that must be processed for the Kademlia system to work.
pub struct KademliaProcessingFuture {
inner: Box<Future<Item = (), Error = IoError>>,
/// Stream that must be processed for the Kademlia system to work.
///
/// Produces requests for peer information. These requests should be answered for the stream to
/// continue to progress.
pub struct KademliaPeerReqStream {
inner: Box<Stream<Item = KademliaPeerReq, Error = IoError>>,
}
impl Future for KademliaProcessingFuture {
type Item = ();
impl Stream for KademliaPeerReqStream {
type Item = KademliaPeerReq;
type Error = IoError;
#[inline]
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
self.inner.poll()
}
}
/// Request for information about some peers.
pub struct KademliaPeerReq {
inner: KademliaFindNodeRespond,
requested_peers: Vec<PeerId>,
}
impl KademliaPeerReq {
/// Returns a list of the IDs of the peers that were requested.
#[inline]
pub fn requested_peers(&self) -> SliceIter<PeerId> {
self.requested_peers.iter()
}
/// Responds to the request.
#[inline]
pub fn respond<I>(self, peers: I)
where I: IntoIterator<Item = Peer>
{
self.inner.respond(peers);
}
}
// Inner struct shared throughout the Kademlia system.
#[derive(Debug)]
struct Inner<P, R> {
struct Inner {
// The remotes are identified by their public keys.
kbuckets: KBucketsTable<PeerId, ()>,
@ -323,12 +419,6 @@ struct Inner<P, R> {
// Same as in the config.
parallelism: usize,
// Same as in the config.
record_store: R,
// Same as in the config.
peer_store: P,
// List of open connections with remotes.
//
// Since the keys are the nodes' multiaddress, it is expected that each node only has one
@ -363,94 +453,12 @@ impl fmt::Debug for Connection {
}
}
impl<P, Pc, R> KadServerInterface for Arc<Inner<P, R>>
impl<T, K, M> KademliaController<T, K, M>
where
P: Deref<Target = Pc>,
for<'r> &'r Pc: Peerstore,
{
#[inline]
fn local_id(&self) -> &PeerId {
self.kbuckets.my_id()
}
fn peer_info(&self, peer_id: &PeerId) -> (Vec<Multiaddr>, ConnectionType) {
let addrs = self.peer_store
.peer(peer_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>();
(addrs, ConnectionType::Connected) // ConnectionType meh :-/
}
#[inline]
fn kbuckets_update(&self, peer: &PeerId) {
// TODO: is this the right place for this check?
if peer == self.kbuckets.my_id() {
return;
}
match self.kbuckets.update(peer.clone(), ()) {
UpdateOutcome::NeedPing(node_to_ping) => {
// TODO: return this info somehow
println!("need to ping {:?}", node_to_ping);
}
_ => (),
}
}
#[inline]
fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec<PeerId> {
let mut intermediate: Vec<_> = self.kbuckets.find_closest(addr).collect();
let my_id = self.kbuckets.my_id().clone();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(&addr) >= my_id.distance_with(&addr))
{
if intermediate[pos] != my_id {
intermediate.insert(pos, my_id);
}
} else {
intermediate.push(my_id);
}
intermediate
}
}
impl<R, P, Pc, T, K, M> query::QueryInterface for KademliaController<P, R, T, K, M>
where
P: Clone + Deref<Target = Pc> + 'static, // TODO: 'static :-/
for<'r> &'r Pc: Peerstore,
R: Clone + 'static, // TODO: 'static :-/
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
K: Transport<Output = KademliaProcessingFuture> + Clone + 'static, // TODO: 'static
M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static, // TODO: 'static :-/
K: Transport<Output = KademliaPeerReqStream> + Clone + 'static, // TODO: 'static
M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/
{
#[inline]
fn local_id(&self) -> &PeerId {
self.inner.kbuckets.my_id()
}
#[inline]
fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec<PeerId> {
self.inner.kbuckets.find_closest(addr).collect()
}
#[inline]
fn peer_add_addrs<I>(&self, peer: &PeerId, multiaddrs: I, ttl: Duration)
where
I: Iterator<Item = Multiaddr>,
{
self.inner
.peer_store
.peer_or_create(peer)
.add_addrs(multiaddrs, ttl);
}
#[inline]
fn parallelism(&self) -> usize {
self.inner.parallelism
}
#[inline]
fn send<F, FRet>(
&self,

View File

@ -23,79 +23,57 @@
//!
//! # Usage
//!
//! - Implement the `KadServerInterface` trait on something clonable (usually an `Arc`).
//!
//! - Create a `KademliaServerConfig` object from that interface. This struct implements
//! `ConnectionUpgrade`.
//! - Create a `KademliaServerConfig` object. This struct implements `ConnectionUpgrade`.
//!
//! - Update a connection through that `KademliaServerConfig`. The output yields you a
//! `KademliaServerController` and a future that must be driven to completion. The controller
//! allows you to perform queries and receive responses.
//! `KademliaServerController` and a stream that must be driven to completion. The controller
//! allows you to perform queries and receive responses. The stream produces incoming requests
//! from the remote.
//!
//! This `KademliaServerController` is usually extracted and stored in some sort of hash map in an
//! `Arc` in order to be available whenever we need to request something from a node.
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::{future, Future, Sink, Stream};
use libp2p_peerstore::PeerId;
use libp2p_core::ConnectionUpgrade;
use libp2p_core::Endpoint;
use multiaddr::{AddrComponent, Multiaddr};
use futures::{future, Future, Sink, stream, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
use multiaddr::Multiaddr;
use protocol::{self, KadMsg, KademliaProtocolConfig, Peer};
use std::collections::VecDeque;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use std::sync::{Arc, atomic};
use tokio_io::{AsyncRead, AsyncWrite};
/// Interface that this server system uses to communicate with the rest of the system.
pub trait KadServerInterface: Clone {
/// Returns the peer ID of the local node.
fn local_id(&self) -> &PeerId;
/// Returns known information about the peer. Not atomic/thread-safe in the sense that
/// information can change immediately after being returned and before they are processed.
fn peer_info(&self, _: &PeerId) -> (Vec<Multiaddr>, protocol::ConnectionType);
/// Updates an entry in the K-Buckets. Called whenever that peer sends us a message.
fn kbuckets_update(&self, peer: &PeerId);
/// Finds the nodes closest to a peer ID.
fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec<PeerId>;
}
/// Configuration for a Kademlia server.
///
/// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KademliaServerController`
/// and a `Future`. The controller lets you send queries to the remote and receive answers, while
/// the `Future` must be driven to completion in order for things to work.
#[derive(Debug, Clone)]
pub struct KademliaServerConfig<I> {
pub struct KademliaServerConfig {
raw_proto: KademliaProtocolConfig,
interface: I,
}
impl<I> KademliaServerConfig<I> {
impl KademliaServerConfig {
/// Builds a configuration object for an upcoming Kademlia server.
#[inline]
pub fn new(interface: I) -> Self {
pub fn new() -> Self {
KademliaServerConfig {
raw_proto: KademliaProtocolConfig,
interface: interface,
}
}
}
impl<C, I> ConnectionUpgrade<C> for KademliaServerConfig<I>
impl<C> ConnectionUpgrade<C> for KademliaServerConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
I: KadServerInterface + 'static, // TODO: 'static :-/
{
type Output = (
KademliaServerController,
Box<Future<Item = (), Error = IoError>>,
Box<Stream<Item = KademliaIncomingRequest, Error = IoError>>,
);
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
type Future = future::Map<<KademliaProtocolConfig as ConnectionUpgrade<C>>::Future, fn(<KademliaProtocolConfig as ConnectionUpgrade<C>>::Output) -> Self::Output>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@ -106,42 +84,9 @@ where
#[inline]
fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: &Multiaddr) -> Self::Future {
let peer_id = {
let mut iter = addr.iter();
let protocol = iter.next();
let after_proto = iter.next();
match (protocol, after_proto) {
(Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => {
match PeerId::from_bytes(key) {
Ok(id) => id,
Err(_) => {
let err = IoError::new(
IoErrorKind::InvalidData,
"invalid peer ID sent by remote identification",
);
return Box::new(future::err(err));
}
}
}
_ => {
let err =
IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node");
return Box::new(future::err(err));
}
}
};
let interface = self.interface;
let future = self.raw_proto
self.raw_proto
.upgrade(incoming, id, endpoint, addr)
.map(move |connec| {
let (tx, rx) = mpsc::unbounded();
let future = kademlia_handler(connec, peer_id, rx, interface);
let controller = KademliaServerController { inner: tx };
(controller, future)
});
Box::new(future) as Box<_>
.map(build_from_sink_stream)
}
}
@ -161,7 +106,7 @@ impl KademliaServerController {
pub fn find_node(
&self,
searched_key: &PeerId,
) -> Box<Future<Item = Vec<Peer>, Error = IoError>> {
) -> impl Future<Item = Vec<Peer>, Error = IoError> {
let message = protocol::KadMsg::FindNodeReq {
key: searched_key.clone().into_bytes(),
};
@ -175,7 +120,8 @@ impl KademliaServerController {
IoErrorKind::ConnectionAborted,
"connection to remote has aborted",
));
return Box::new(fut) as Box<_>;
return future::Either::B(fut);
}
};
@ -192,15 +138,14 @@ impl KademliaServerController {
)),
});
Box::new(future) as Box<_>
future::Either::A(future)
}
/// Sends a `PING` query to the node. Because of the way the protocol is designed, there is
/// no way to differentiate between a ping and a pong. Therefore this function doesn't return a
/// future, and the only way to be notified of the result is through the `kbuckets_update`
/// method in the `KadServerInterface` trait.
/// future, and the only way to be notified of the result is through the stream.
pub fn ping(&self) -> Result<(), IoError> {
// Dummy channel.
// Dummy channel, as the `tx` is going to be dropped anyway.
let (tx, _rx) = oneshot::channel();
match self.inner.unbounded_send((protocol::KadMsg::Ping, tx)) {
Ok(()) => Ok(()),
@ -212,248 +157,359 @@ impl KademliaServerController {
}
}
/// Request received from the remote.
pub enum KademliaIncomingRequest {
/// Find the nodes closest to `searched`.
FindNode {
/// The value being searched.
searched: PeerId,
/// Object to use to respond to the request.
responder: KademliaFindNodeRespond,
},
// TODO: PutValue and FindValue
/// Received either a ping or a pong.
PingPong,
}
/// Object used to respond to `FindNode` queries from remotes.
pub struct KademliaFindNodeRespond {
inner: oneshot::Sender<KadMsg>,
}
impl KademliaFindNodeRespond {
/// Respond to the `FindNode` request.
pub fn respond<I>(self, peers: I)
where I: IntoIterator<Item = protocol::Peer>
{
let _ = self.inner.send(KadMsg::FindNodeRes {
closer_peers: peers.into_iter().collect()
});
}
}
// Builds a controller and stream from a stream/sink of raw messages.
fn build_from_sink_stream<'a, S>(connec: S) -> (KademliaServerController, Box<Stream<Item = KademliaIncomingRequest, Error = IoError> + 'a>)
where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Error = IoError> + 'a
{
let (tx, rx) = mpsc::unbounded();
let future = kademlia_handler(connec, rx);
let controller = KademliaServerController { inner: tx };
(controller, future)
}
// Handles a newly-opened Kademlia stream with a remote peer.
//
// Takes a `Stream` and `Sink` of Kademlia messages representing the connection to the client,
// plus the ID of the peer that we are handling, plus a `Receiver` that will receive messages to
// transmit to that connection, plus the interface.
// plus a `Receiver` that will receive messages to transmit to that connection.
//
// Returns a `Future` that must be resolved in order for progress to work. It will never yield any
// item (unless both `rx` and `kad_bistream` are closed) but will propagate any I/O of protocol
// error that could happen. If the `Receiver` closes, no error is generated.
fn kademlia_handler<'a, S, I>(
// Returns a `Stream` that must be resolved in order for progress to work. The `Stream` will
// produce objects that represent the requests sent by the remote. These requests must be answered
// immediately before the stream continues to produce items.
fn kademlia_handler<'a, S>(
kad_bistream: S,
peer_id: PeerId,
rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender<KadMsg>)>,
interface: I,
) -> Box<Future<Item = (), Error = IoError> + 'a>
rq_rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender<KadMsg>)>,
) -> Box<Stream<Item = KademliaIncomingRequest, Error = IoError> + 'a>
where
S: Stream<Item = KadMsg, Error = IoError> + Sink<SinkItem = KadMsg, SinkError = IoError> + 'a,
I: KadServerInterface + Clone + 'a,
{
let (kad_sink, kad_stream) = kad_bistream
.sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
.split();
let (kad_sink, kad_stream) = kad_bistream.split();
// We combine `kad_stream` and `rx` into one so that the loop wakes up whenever either
// generates something.
let messages = rx.map(|(m, o)| (m, Some(o)))
.map_err(|_| unreachable!())
.select(kad_stream.map(|m| (m, None)));
// This is a stream of futures containing local responses.
// Every time we receive a request from the remote, we create a `oneshot::channel()` and send
// the receiving end to `responders_tx`.
// This way, if a future is available on `responders_rx`, we block until it produces the
// response.
let (responders_tx, responders_rx) = mpsc::unbounded();
// Loop forever.
let future = future::loop_fn(
(kad_sink, messages, VecDeque::new(), 0),
move |(kad_sink, messages, mut send_back_queue, mut expected_pongs)| {
let interface = interface.clone();
let peer_id = peer_id.clone();
// Will be set to true if either `kad_stream` or `rq_rx` is closed.
let finished = Arc::new(atomic::AtomicBool::new(false));
// The `send_back_queue` is a queue of `UnboundedSender`s in the correct order of
// expected answers.
// Whenever we send a message to the remote and this message expects a response, we
// push the sender to the end of `send_back_queue`. Whenever a remote sends us a
// response, we pop the first element of `send_back_queue`.
// We combine all the streams into one so that the loop wakes up whenever any generates
// something.
enum EventSource {
Remote(KadMsg),
LocalRequest(KadMsg, oneshot::Sender<KadMsg>),
LocalResponse(oneshot::Receiver<KadMsg>),
Finished,
}
// The value of `expected_pongs` is the number of PING requests that we sent and that
// haven't been answered by the remote yet. Because of the way the protocol is designed,
// there is no way to differentiate between a ping and a pong. Therefore whenever we
// send a ping request we suppose that the next ping we receive is an answer, even
// though that may not be the case in reality.
// Because of this behaviour, pings do not pop from the `send_back_queue`.
let events = {
let responders = responders_rx
.map(|m| EventSource::LocalResponse(m))
.map_err(|_| unreachable!());
let rq_rx = rq_rx
.map(|(m, o)| EventSource::LocalRequest(m, o))
.map_err(|_| unreachable!())
.chain({
let finished = finished.clone();
future::lazy(move || {
finished.store(true, atomic::Ordering::SeqCst);
Ok(EventSource::Finished)
}).into_stream()
});
let kad_stream = kad_stream
.map(|m| EventSource::Remote(m))
.chain({
let finished = finished.clone();
future::lazy(move || {
finished.store(true, atomic::Ordering::SeqCst);
Ok(EventSource::Finished)
}).into_stream()
});
responders.select(rq_rx).select(kad_stream)
};
messages
let stream = stream::unfold((events, kad_sink, responders_tx, VecDeque::new(), 0u32),
move |(events, kad_sink, responders_tx, mut send_back_queue, expected_pongs)| {
if finished.load(atomic::Ordering::SeqCst) {
return None;
}
Some(events
.into_future()
.map_err(|(err, _)| err)
.and_then(move |(message, rest)| {
if let Some((_, None)) = message {
// If we received a message from the remote (as opposed to a message from
// `rx`) then we update the k-buckets.
interface.kbuckets_update(&peer_id);
}
.and_then(move |(message, events)| -> Box<Future<Item = _, Error = _>> {
match message {
None => {
// Both the connection stream and `rx` are empty, so we break the loop.
let future = future::ok(future::Loop::Break(()));
Box::new(future) as Box<Future<Item = _, Error = _>>
}
Some((message @ KadMsg::PutValue { .. }, Some(_))) => {
// A `PutValue` message has been received on `rx`. Contrary to other
// types of messages, this one doesn't expect any answer and therefore
// we ignore the sender.
let future = kad_sink.send(message).map(move |kad_sink| {
future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
))
});
Some(EventSource::Finished) | None => {
// `finished` should have been set to true earlier, causing this
// function to return `None`.
unreachable!()
},
Some(EventSource::LocalResponse(message)) => {
let future = message
.map_err(|_| {
// The user destroyed the responder without responding.
warn!("Kad responder object destroyed without responding");
panic!() // TODO: what to do here? we have to close the connection
})
.and_then(move |message| {
kad_sink
.send(message)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
(None, state)
})
});
Box::new(future)
},
Some(EventSource::LocalRequest(message @ KadMsg::PutValue { .. }, _)) => {
// A `PutValue` request. Contrary to other types of messages, this one
// doesn't expect any answer and therefore we ignore the sender.
let future = kad_sink
.send(message)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
(None, state)
});
Box::new(future) as Box<_>
}
Some((message @ KadMsg::Ping { .. }, Some(_))) => {
// A `Ping` message has been received on `rx`.
expected_pongs += 1;
let future = kad_sink.send(message).map(move |kad_sink| {
future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
))
});
Some(EventSource::LocalRequest(message @ KadMsg::Ping { .. }, _)) => {
// A local `Ping` request.
let expected_pongs = expected_pongs.checked_add(1)
.expect("overflow in number of simultaneous pings");
let future = kad_sink
.send(message)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
(None, state)
});
Box::new(future) as Box<_>
}
Some((message, Some(send_back))) => {
// Any message other than `PutValue` or `Ping` has been received on
// `rx`. Send it to the remote.
let future = kad_sink.send(message).map(move |kad_sink| {
send_back_queue.push_back(send_back);
future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
))
});
Some(EventSource::LocalRequest(message, send_back)) => {
// Any local request other than `PutValue` or `Ping`.
send_back_queue.push_back(send_back);
let future = kad_sink
.send(message)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
(None, state)
});
Box::new(future) as Box<_>
}
Some((KadMsg::Ping, None)) => {
// Note: The way the protocol was designed, there is no way to
// differentiate between a ping and a pong.
if expected_pongs == 0 {
let message = KadMsg::Ping;
let future = kad_sink.send(message).map(move |kad_sink| {
future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
))
Some(EventSource::Remote(KadMsg::Ping)) => {
// The way the protocol was designed, there is no way to differentiate
// between a ping and a pong.
if let Some(expected_pongs) = expected_pongs.checked_sub(1) {
// Maybe we received a PONG, or maybe we received a PONG, no way
// to tell. If it was a PING and we expected a PONG, then the
// remote will see its PING answered only when it PONGs us.
let future = future::ok({
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
let rq = KademliaIncomingRequest::PingPong;
(Some(rq), state)
});
Box::new(future) as Box<_>
} else {
expected_pongs -= 1;
let future = future::ok({
future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
))
});
let future = kad_sink
.send(KadMsg::Ping)
.map(move |kad_sink| {
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
let rq = KademliaIncomingRequest::PingPong;
(Some(rq), state)
});
Box::new(future) as Box<_>
}
}
Some((message @ KadMsg::FindNodeRes { .. }, None))
| Some((message @ KadMsg::GetValueRes { .. }, None)) => {
Some(EventSource::Remote(message @ KadMsg::FindNodeRes { .. }))
| Some(EventSource::Remote(message @ KadMsg::GetValueRes { .. })) => {
// `FindNodeRes` or `GetValueRes` received on the socket.
// Send it back through `send_back_queue`.
if let Some(send_back) = send_back_queue.pop_front() {
let _ = send_back.send(message);
let future = future::ok(future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
)));
return Box::new(future) as Box<_>;
let future = future::ok({
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
(None, state)
});
Box::new(future)
} else {
debug!("Remote sent a Kad response but we didn't request anything");
let future = future::err(IoErrorKind::InvalidData.into());
return Box::new(future) as Box<_>;
Box::new(future)
}
}
Some((KadMsg::FindNodeReq { key, .. }, None)) => {
// `FindNodeReq` received on the socket.
let message = handle_find_node_req(&interface, &key);
let future = kad_sink.send(message).map(move |kad_sink| {
future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
))
});
Box::new(future) as Box<_>
}
Some((KadMsg::GetValueReq { key, .. }, None)) => {
// `GetValueReq` received on the socket.
let message = handle_get_value_req(&interface, &key);
let future = kad_sink.send(message).map(move |kad_sink| {
future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
))
});
Box::new(future) as Box<_>
}
Some((KadMsg::PutValue { .. }, None)) => {
// `PutValue` received on the socket.
handle_put_value_req(&interface);
Some(EventSource::Remote(KadMsg::FindNodeReq { key, .. })) => {
let peer_id = match PeerId::from_bytes(key) {
Ok(id) => id,
Err(key) => {
debug!("Ignoring FIND_NODE request with invalid key: {:?}", key);
let future = future::err(IoError::new(IoErrorKind::InvalidData, "invalid key in FIND_NODE"));
return Box::new(future);
}
};
let (tx, rx) = oneshot::channel();
let _ = responders_tx.unbounded_send(rx);
let future = future::ok({
future::Loop::Continue((
kad_sink,
rest,
send_back_queue,
expected_pongs,
))
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
let rq = KademliaIncomingRequest::FindNode {
searched: peer_id,
responder: KademliaFindNodeRespond {
inner: tx
}
};
(Some(rq), state)
});
Box::new(future) as Box<_>
Box::new(future)
}
Some(EventSource::Remote(KadMsg::GetValueReq { .. })) => {
unimplemented!() // FIXME:
}
Some(EventSource::Remote(KadMsg::PutValue { .. })) => {
unimplemented!() // FIXME:
}
}
})
},
);
}))
}).filter_map(|val| val);
Box::new(future) as Box<Future<Item = (), Error = IoError>>
Box::new(stream) as Box<Stream<Item = _, Error = IoError>>
}
// Builds a `KadMsg` that handles a `FIND_NODE` request received from the remote.
fn handle_find_node_req<I>(interface: &I, requested_key: &[u8]) -> KadMsg
where
I: ?Sized + KadServerInterface,
{
let peer_id = match PeerId::from_bytes(requested_key.to_vec()) {
// TODO: suboptimal
Ok(id) => id,
Err(_) => {
return KadMsg::FindNodeRes {
closer_peers: vec![],
}
#[cfg(test)]
mod tests {
use std::io::Error as IoError;
use std::iter;
use futures::{Future, Poll, Sink, StartSend, Stream};
use futures::sync::mpsc;
use kad_server::{self, KademliaIncomingRequest, KademliaServerController};
use libp2p_core::PublicKeyBytes;
use protocol::{ConnectionType, Peer};
use rand;
// This struct merges a stream and a sink and is quite useful for tests.
struct Wrapper<St, Si>(St, Si);
impl<St, Si> Stream for Wrapper<St, Si>
where
St: Stream,
{
type Item = St::Item;
type Error = St::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.0.poll()
}
};
}
impl<St, Si> Sink for Wrapper<St, Si>
where
Si: Sink,
{
type SinkItem = Si::SinkItem;
type SinkError = Si::SinkError;
fn start_send(
&mut self,
item: Self::SinkItem,
) -> StartSend<Self::SinkItem, Self::SinkError> {
self.1.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.1.poll_complete()
}
}
let closer_peers = interface
.kbuckets_find_closest(&peer_id)
.into_iter()
.map(|peer| {
let (multiaddrs, connection_ty) = interface.peer_info(&peer);
protocol::Peer {
node_id: peer,
multiaddrs: multiaddrs,
connection_ty: connection_ty,
}
})
.collect();
fn build_test() -> (KademliaServerController, impl Stream<Item = KademliaIncomingRequest, Error = IoError>, KademliaServerController, impl Stream<Item = KademliaIncomingRequest, Error = IoError>) {
let (a_to_b, b_from_a) = mpsc::unbounded();
let (b_to_a, a_from_b) = mpsc::unbounded();
KadMsg::FindNodeRes { closer_peers }
}
// Builds a `KadMsg` that handles a `FIND_VALUE` request received from the remote.
fn handle_get_value_req<I>(_interface: &I, _requested_key: &[u8]) -> KadMsg
where
I: ?Sized + KadServerInterface,
{
unimplemented!()
}
// Handles a `STORE` request received from the remote.
fn handle_put_value_req<I>(_interface: &I)
where
I: ?Sized + KadServerInterface,
{
unimplemented!()
let sink_stream_a = Wrapper(a_from_b, a_to_b)
.map_err(|_| panic!()).sink_map_err(|_| panic!());
let sink_stream_b = Wrapper(b_from_a, b_to_a)
.map_err(|_| panic!()).sink_map_err(|_| panic!());
let (controller_a, stream_events_a) = kad_server::build_from_sink_stream(sink_stream_a);
let (controller_b, stream_events_b) = kad_server::build_from_sink_stream(sink_stream_b);
(controller_a, stream_events_a, controller_b, stream_events_b)
}
#[test]
fn ping_response() {
let (controller_a, stream_events_a, _controller_b, stream_events_b) = build_test();
controller_a.ping().unwrap();
let streams = stream_events_a.map(|ev| (ev, "a"))
.select(stream_events_b.map(|ev| (ev, "b")));
match streams.into_future().map_err(|(err, _)| err).wait().unwrap() {
(Some((KademliaIncomingRequest::PingPong, "b")), _) => {},
_ => panic!()
}
}
#[test]
fn find_node_response() {
let (controller_a, stream_events_a, _controller_b, stream_events_b) = build_test();
let random_peer_id = {
let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::<Vec<_>>();
PublicKeyBytes(buf).to_peer_id()
};
let find_node_fut = controller_a.find_node(&random_peer_id);
let example_response = Peer {
node_id: {
let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::<Vec<_>>();
PublicKeyBytes(buf).to_peer_id()
},
multiaddrs: Vec::new(),
connection_ty: ConnectionType::Connected,
};
let streams = stream_events_a.map(|ev| (ev, "a"))
.select(stream_events_b.map(|ev| (ev, "b")));
let streams = match streams.into_future().map_err(|(err, _)| err).wait().unwrap() {
(Some((KademliaIncomingRequest::FindNode { searched, responder }, "b")), streams) => {
assert_eq!(searched, random_peer_id);
responder.respond(iter::once(example_response.clone()));
streams
},
_ => panic!()
};
let resp = streams.into_future().map_err(|(err, _)| err).map(|_| unreachable!())
.select(find_node_fut)
.map_err(|_| -> IoError { panic!() });
assert_eq!(resp.wait().unwrap().0, vec![example_response]);
}
}

View File

@ -29,7 +29,7 @@
use arrayvec::ArrayVec;
use bigint::U512;
use libp2p_peerstore::PeerId;
use libp2p_core::PeerId;
use parking_lot::{Mutex, MutexGuard};
use std::mem;
use std::slice::Iter as SliceIter;
@ -89,11 +89,12 @@ impl<Id, Val> KBucket<Id, Val> {
// If a node is pending and the timeout has expired, removes the first element of `nodes`
// and pushes back the node in `pending_node`.
fn flush(&mut self, timeout: Duration) {
if let Some((_, instant)) = self.pending_node {
if let Some((pending_node, instant)) = self.pending_node.take() {
if instant.elapsed() >= timeout {
let (pending_node, _) = self.pending_node.take().unwrap();
let _ = self.nodes.remove(0);
self.nodes.push(pending_node);
} else {
self.pending_node = Some((pending_node, instant));
}
}
}
@ -209,15 +210,10 @@ where
/// Marks the node as "most recent" in its bucket and modifies the value associated to it.
/// This function should be called whenever we receive a communication from a node.
///
/// # Panic
///
/// Panics if `id` is equal to the local node ID.
///
pub fn update(&self, id: Id, value: Val) -> UpdateOutcome<Id, Val> {
let table = match self.bucket_num(&id) {
Some(n) => &self.tables[n],
None => panic!("tried to update our own node in the kbuckets table"),
None => return UpdateOutcome::FailSelfUpdate,
};
let mut table = table.lock();
@ -272,10 +268,13 @@ pub enum UpdateOutcome<Id, Val> {
Added,
/// The node was already in the bucket and has been refreshed.
Refreshed(Val),
/// The node wasn't added. Instead we need to ping the node passed as parameter.
/// The node wasn't added. Instead we need to ping the node passed as parameter, and call
/// `update` if it responds.
NeedPing(Id),
/// The node wasn't added at all because a node was already pending.
Discarded,
/// Tried to update the local peer ID. This is an invalid operation.
FailSelfUpdate,
}
/// Iterator giving access to a bucket.
@ -335,7 +334,7 @@ mod tests {
extern crate rand;
use self::rand::random;
use kbucket::{KBucketsTable, UpdateOutcome, MAX_NODES_PER_BUCKET};
use libp2p_peerstore::PeerId;
use libp2p_core::PeerId;
use std::thread;
use std::time::Duration;
@ -364,8 +363,7 @@ mod tests {
}
#[test]
#[should_panic(expected = "tried to update our own node in the kbuckets table")]
fn update_local_id_panic() {
fn update_local_id_fails() {
let my_id = {
let mut bytes = vec![random(); 34];
bytes[0] = 18;
@ -374,7 +372,10 @@ mod tests {
};
let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5));
let _ = table.update(my_id, ());
match table.update(my_id, ()) {
UpdateOutcome::FailSelfUpdate => (),
_ => panic!()
}
}
#[test]

View File

@ -69,7 +69,6 @@ extern crate datastore;
extern crate fnv;
extern crate futures;
extern crate libp2p_identify;
extern crate libp2p_peerstore;
extern crate libp2p_ping;
extern crate libp2p_core;
#[macro_use]
@ -84,7 +83,9 @@ extern crate tokio_timer;
extern crate varint;
pub use self::high_level::{KademliaConfig, KademliaController, KademliaControllerPrototype};
pub use self::high_level::{KademliaProcessingFuture, KademliaUpgrade};
pub use self::high_level::{KademliaPeerReqStream, KademliaUpgrade, KademliaPeerReq};
pub use self::protocol::{ConnectionType, Peer};
pub use self::query::QueryEvent;
mod high_level;
mod kad_server;

View File

@ -25,16 +25,14 @@
//! The `Stream` component is used to poll the underlying transport, and the `Sink` component is
//! used to send messages.
use bytes::Bytes;
use futures::future;
use futures::{Sink, Stream};
use libp2p_peerstore::PeerId;
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr};
use bytes::{Bytes, BytesMut};
use futures::{future, sink, Sink, stream, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr, PeerId};
use protobuf::{self, Message};
use protobuf_structs;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::{AsyncRead, AsyncWrite, codec::Framed};
use varint::VarintCodec;
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
@ -122,8 +120,7 @@ impl<C> ConnectionUpgrade<C> for KademliaProtocolConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
{
type Output =
Box<KadStreamSink<Item = KadMsg, Error = IoError, SinkItem = KadMsg, SinkError = IoError>>;
type Output = KadStreamSink<C>;
type Future = future::FutureResult<Self::Output, IoError>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@ -139,37 +136,26 @@ where
}
}
type KadStreamSink<S> = stream::AndThen<sink::With<stream::FromErr<Framed<S, VarintCodec<Vec<u8>>>, IoError>, KadMsg, fn(KadMsg) -> Result<Vec<u8>, IoError>, Result<Vec<u8>, IoError>>, fn(BytesMut) -> Result<KadMsg, IoError>, Result<KadMsg, IoError>>;
// Upgrades a socket to use the Kademlia protocol.
fn kademlia_protocol<'a, S>(
fn kademlia_protocol<S>(
socket: S,
) -> Box<KadStreamSink<Item = KadMsg, Error = IoError, SinkItem = KadMsg, SinkError = IoError> + 'a>
) -> KadStreamSink<S>
where
S: AsyncRead + AsyncWrite + 'a,
S: AsyncRead + AsyncWrite,
{
let wrapped = socket
socket
.framed(VarintCodec::default())
.from_err::<IoError>()
.with(|request| -> Result<_, IoError> {
.with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> {
let proto_struct = msg_to_proto(request);
Ok(proto_struct.write_to_bytes().unwrap()) // TODO: error?
})
.and_then(|bytes| {
.and_then::<fn(_) -> _, _>(|bytes| {
let response = protobuf::parse_from_bytes(&bytes)?;
proto_to_msg(response)
});
Box::new(wrapped)
}
/// Custom trait that derives `Sink` and `Stream`, so that we can box it.
pub trait KadStreamSink:
Stream<Item = KadMsg, Error = IoError> + Sink<SinkItem = KadMsg, SinkError = IoError>
{
}
impl<T> KadStreamSink for T
where
T: Stream<Item = KadMsg, Error = IoError> + Sink<SinkItem = KadMsg, SinkError = IoError>,
{
})
}
/// Message that we can send to a peer or received from a peer.
@ -307,8 +293,7 @@ mod tests {
use self::libp2p_tcp_transport::TcpConfig;
use self::tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
use libp2p_peerstore::PeerId;
use libp2p_core::{Transport, PublicKeyBytesSlice};
use libp2p_core::{Transport, PeerId, PublicKeyBytesSlice};
use protocol::{ConnectionType, KadMsg, KademliaProtocolConfig, Peer};
use std::sync::mpsc;
use std::thread;

View File

@ -21,10 +21,9 @@
//! This module handles performing iterative queries about the network.
use fnv::FnvHashSet;
use futures::{future, Future};
use kad_server::KademliaServerController;
use futures::{future, Future, stream, Stream};
use kbucket::KBucketsPeerId;
use libp2p_peerstore::PeerId;
use libp2p_core::PeerId;
use multiaddr::{AddrComponent, Multiaddr};
use protocol;
use rand;
@ -32,78 +31,73 @@ use smallvec::SmallVec;
use std::cmp::Ordering;
use std::io::Error as IoError;
use std::mem;
use std::time::Duration;
/// Interface that the query uses to communicate with the rest of the system.
pub trait QueryInterface: Clone {
/// Returns the peer ID of the local node.
fn local_id(&self) -> &PeerId;
/// Parameters of a query. Allows plugging the query-related code with the rest of the
/// infrastructure.
pub struct QueryParams<FBuckets, FFindNode> {
/// Identifier of the local peer.
pub local_id: PeerId,
/// Called whenever we need to obtain the peers closest to a certain peer.
pub kbuckets_find_closest: FBuckets,
/// Level of parallelism for networking. If this is `N`, then we can dial `N` nodes at a time.
pub parallelism: usize,
/// Called whenever we want to send a `FIND_NODE` RPC query.
pub find_node: FFindNode,
}
/// Finds the nodes closest to a peer ID.
fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec<PeerId>;
/// Adds new known multiaddrs for the given peer.
fn peer_add_addrs<I>(&self, peer: &PeerId, multiaddrs: I, ttl: Duration)
where
I: Iterator<Item = Multiaddr>;
/// Returns the level of parallelism wanted for this query.
fn parallelism(&self) -> usize;
/// Attempts to contact the given multiaddress, then calls `and_then` on success. Returns a
/// future that contains the output of `and_then`, or an error if we failed to contact the
/// remote.
// TODO: use HKTB once Rust supports that, to avoid boxing the future
fn send<F, FRet>(
&self,
addr: Multiaddr,
and_then: F,
) -> Box<Future<Item = FRet, Error = IoError>>
where
F: FnOnce(&KademliaServerController) -> FRet + 'static,
FRet: 'static;
/// Event that happens during a query.
#[derive(Debug, Clone)]
pub enum QueryEvent<TOut> {
/// Learned about new mutiaddresses for the given peers.
NewKnownMultiaddrs(Vec<(PeerId, Vec<Multiaddr>)>),
/// Finished the processing of the query. Contains the result.
Finished(TOut),
}
/// Starts a query for an iterative `FIND_NODE` request.
#[inline]
pub fn find_node<'a, I>(
query_interface: I,
pub fn find_node<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
searched_key: PeerId,
) -> Box<Future<Item = Vec<PeerId>, Error = IoError> + 'a>
) -> Box<Stream<Item = QueryEvent<Vec<PeerId>>, Error = IoError> + 'a>
where
I: QueryInterface + 'a,
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
{
query(query_interface, searched_key, 20) // TODO: constant
query(query_params, searched_key, 20) // TODO: constant
}
/// Refreshes a specific bucket by performing an iterative `FIND_NODE` on a random ID of this
/// bucket.
///
/// Returns a dummy no-op future if `bucket_num` is out of range.
pub fn refresh<'a, I>(
query_interface: I,
pub fn refresh<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
bucket_num: usize,
) -> Box<Future<Item = (), Error = IoError> + 'a>
) -> Box<Stream<Item = QueryEvent<()>, Error = IoError> + 'a>
where
I: QueryInterface + 'a,
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
{
let peer_id = match gen_random_id(&query_interface, bucket_num) {
let peer_id = match gen_random_id(&query_params.local_id, bucket_num) {
Ok(p) => p,
Err(()) => return Box::new(future::ok(())),
Err(()) => return Box::new(stream::once(Ok(QueryEvent::Finished(())))),
};
let future = find_node(query_interface, peer_id).map(|_| ());
Box::new(future) as Box<_>
let stream = find_node(query_params, peer_id).map(|event| {
match event {
QueryEvent::NewKnownMultiaddrs(peers) => QueryEvent::NewKnownMultiaddrs(peers),
QueryEvent::Finished(_) => QueryEvent::Finished(()),
}
});
Box::new(stream) as Box<_>
}
// Generates a random `PeerId` that belongs to the given bucket.
//
// Returns an error if `bucket_num` is out of range.
fn gen_random_id<I>(query_interface: &I, bucket_num: usize) -> Result<PeerId, ()>
where
I: ?Sized + QueryInterface,
{
let my_id = query_interface.local_id();
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
let my_id_len = my_id.as_bytes().len();
// TODO: this 2 is magic here ; it is the length of the hash of the multihash
@ -134,22 +128,21 @@ where
}
// Generic query-performing function.
fn query<'a, I>(
query_interface: I,
fn query<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
searched_key: PeerId,
num_results: usize,
) -> Box<Future<Item = Vec<PeerId>, Error = IoError> + 'a>
) -> Box<Stream<Item = QueryEvent<Vec<PeerId>>, Error = IoError> + 'a>
where
I: QueryInterface + 'a,
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
{
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
// State of the current iterative process.
struct State<'a> {
// If true, we are still in the first step of the algorithm where we try to find the
// closest node. If false, then we are contacting the k closest nodes in order to fill the
// list with enough results.
looking_for_closer: bool,
// At which stage we are.
stage: Stage,
// Final output of the iteration.
result: Vec<PeerId>,
// For each open connection, a future with the response of the remote.
@ -164,26 +157,55 @@ where
failed_to_contact: FnvHashSet<PeerId>,
}
// General stage of the state.
#[derive(Copy, Clone, PartialEq, Eq)]
enum Stage {
// We are still in the first step of the algorithm where we try to find the closest node.
FirstStep,
// We are contacting the k closest nodes in order to fill the list with enough results.
SecondStep,
// The results are complete, and the next stream iteration will produce the outcome.
FinishingNextIter,
// We are finished and the stream shouldn't return anything anymore.
Finished,
}
let initial_state = State {
looking_for_closer: true,
stage: Stage::FirstStep,
result: Vec::with_capacity(num_results),
current_attempts_fut: Vec::new(),
current_attempts_addrs: SmallVec::new(),
pending_nodes: query_interface.kbuckets_find_closest(&searched_key),
pending_nodes: {
let kbuckets_find_closest = query_params.kbuckets_find_closest.clone();
kbuckets_find_closest(searched_key.clone()) // TODO: suboptimal
},
failed_to_contact: Default::default(),
};
let parallelism = query_interface.parallelism();
let parallelism = query_params.parallelism;
// Start of the iterative process.
let stream = future::loop_fn(initial_state, move |mut state| {
let stream = stream::unfold(initial_state, move |mut state| -> Option<_> {
match state.stage {
Stage::FinishingNextIter => {
let result = mem::replace(&mut state.result, Vec::new());
debug!("Query finished with {} results", result.len());
state.stage = Stage::Finished;
let future = future::ok((Some(QueryEvent::Finished(result)), state));
return Some(future::Either::A(future));
},
Stage::Finished => {
return None;
},
_ => ()
};
let searched_key = searched_key.clone();
let query_interface = query_interface.clone();
let query_interface2 = query_interface.clone();
let find_node_rpc = query_params.find_node.clone();
// Find out which nodes to contact at this iteration.
let to_contact = {
let wanted_len = if state.looking_for_closer {
let wanted_len = if state.stage == Stage::FirstStep {
parallelism.saturating_sub(state.current_attempts_fut.len())
} else {
num_results.saturating_sub(state.current_attempts_fut.len())
@ -218,10 +240,8 @@ where
let multiaddr: Multiaddr = AddrComponent::P2P(peer.clone().into_bytes()).into();
let searched_key2 = searched_key.clone();
let resp_rx =
query_interface.send(multiaddr.clone(), move |ctl| ctl.find_node(&searched_key2));
let current_attempt = find_node_rpc(multiaddr.clone(), searched_key2); // TODO: suboptimal
state.current_attempts_addrs.push(peer.clone());
let current_attempt = resp_rx.flatten();
state
.current_attempts_fut
.push(Box::new(current_attempt) as Box<_>);
@ -237,8 +257,10 @@ where
if current_attempts_fut.is_empty() {
// If `current_attempts_fut` is empty, then `select_all` would panic. It happens
// when we have no additional node to query.
let future = future::ok(future::Loop::Break(state));
return future::Either::A(future);
debug!("Finishing query early because no additional node available");
state.stage = Stage::FinishingNextIter;
let future = future::ok((None, state));
return Some(future::Either::A(future));
}
// This is the future that continues or breaks the `loop_fn`.
@ -263,7 +285,7 @@ where
Err(err) => {
trace!("RPC query failed for {:?}: {:?}", remote_id, err);
state.failed_to_contact.insert(remote_id);
return Ok(future::Loop::Continue(state));
return future::ok((None, state));
}
};
@ -288,17 +310,14 @@ where
let mut local_nearest_node_updated = false;
// Update `state` with the actual content of the message.
let mut new_known_multiaddrs = Vec::with_capacity(closer_peers.len());
for mut peer in closer_peers {
// Update the peerstore with the information sent by
// the remote.
{
let valid_multiaddrs = peer.multiaddrs.drain(..);
trace!("Adding multiaddresses to {:?}: {:?}", peer.node_id, valid_multiaddrs);
query_interface2.peer_add_addrs(
&peer.node_id,
valid_multiaddrs,
Duration::from_secs(3600),
); // TODO: which TTL?
let multiaddrs = mem::replace(&mut peer.multiaddrs, Vec::new());
trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, multiaddrs);
new_known_multiaddrs.push((peer.node_id.clone(), multiaddrs));
}
if peer.node_id.distance_with(&searched_key)
@ -325,28 +344,22 @@ where
}
if state.result.len() >= num_results
|| (!state.looking_for_closer && state.current_attempts_fut.is_empty())
|| (state.stage != Stage::FirstStep && state.current_attempts_fut.is_empty())
{
// Check that our `Vec::with_capacity` is correct.
debug_assert_eq!(state.result.capacity(), num_results);
Ok(future::Loop::Break(state))
state.stage = Stage::FinishingNextIter;
} else {
if !local_nearest_node_updated {
trace!("Loop didn't update closer node ; jumping to step 2");
state.looking_for_closer = false;
state.stage = Stage::SecondStep;
}
Ok(future::Loop::Continue(state))
}
future::ok((Some(QueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state))
});
future::Either::B(future)
});
let stream = stream.map(|state| {
debug!("Query finished with {} results", state.result.len());
state.result
});
Some(future::Either::B(future))
}).filter_map(|val| val);
Box::new(stream) as Box<_>
}

View File

@ -27,7 +27,7 @@ extern crate tokio_core;
extern crate tokio_io;
use bigint::U512;
use futures::future::Future;
use futures::{Future, Stream};
use libp2p::peerstore::{PeerAccess, PeerId, Peerstore};
use libp2p::Multiaddr;
use std::env;
@ -35,6 +35,7 @@ use std::sync::Arc;
use std::time::Duration;
use libp2p::core::{Transport, PublicKeyBytesSlice};
use libp2p::core::{upgrade, either::EitherOutput};
use libp2p::kad::{ConnectionType, Peer, QueryEvent};
use libp2p::tcp::TcpConfig;
use tokio_core::reactor::Core;
@ -103,13 +104,11 @@ fn main() {
// and outgoing connections for us.
let kad_config = libp2p::kad::KademliaConfig {
parallelism: 3,
record_store: (),
peer_store: peer_store,
local_peer_id: my_peer_id.clone(),
timeout: Duration::from_secs(2),
};
let kad_ctl_proto = libp2p::kad::KademliaControllerPrototype::new(kad_config);
let kad_ctl_proto = libp2p::kad::KademliaControllerPrototype::new(kad_config, peer_store.peers());
let proto = libp2p::kad::KademliaUpgrade::from_prototype(&kad_ctl_proto);
@ -117,7 +116,32 @@ fn main() {
// outgoing connections for us.
let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(proto.clone()),
|upgrade, _| upgrade,
{
let peer_store = peer_store.clone();
move |kademlia_stream, _| {
let peer_store = peer_store.clone();
kademlia_stream.for_each(move |req| {
let peer_store = peer_store.clone();
let result = req
.requested_peers()
.map(move |peer_id| {
let addrs = peer_store
.peer(peer_id)
.into_iter()
.flat_map(|p| p.addrs())
.collect::<Vec<_>>();
Peer {
node_id: peer_id.clone(),
multiaddrs: addrs,
connection_ty: ConnectionType::Connected, // meh :-/
}
})
.collect::<Vec<_>>();
req.respond(result);
Ok(())
})
}
}
);
let (kad_controller, _kad_init) =
@ -132,6 +156,21 @@ fn main() {
let finish_enum = kad_controller
.find_node(my_peer_id.clone())
.filter_map(move |event| {
match event {
QueryEvent::NewKnownMultiaddrs(peers) => {
for (peer, addrs) in peers {
peer_store.peer_or_create(&peer)
.add_addrs(addrs, Duration::from_secs(3600));
}
None
},
QueryEvent::Finished(out) => Some(out),
}
})
.into_future()
.map_err(|(err, _)| err)
.map(|(out, _)| out.unwrap())
.and_then(|out| {
let local_hash = U512::from(my_peer_id.hash());
println!("Results of peer discovery for {:?}:", my_peer_id);