diff --git a/kad/Cargo.toml b/kad/Cargo.toml
index 0660f65f..d8ac4475 100644
--- a/kad/Cargo.toml
+++ b/kad/Cargo.toml
@@ -12,7 +12,6 @@ datastore = { path = "../datastore" }
fnv = "1.0"
futures = "0.1"
libp2p-identify = { path = "../identify" }
-libp2p-peerstore = { path = "../peerstore" }
libp2p-ping = { path = "../ping" }
libp2p-core = { path = "../core" }
log = "0.4"
diff --git a/kad/src/high_level.rs b/kad/src/high_level.rs
index 6266ff4e..a4e26628 100644
--- a/kad/src/high_level.rs
+++ b/kad/src/high_level.rs
@@ -25,20 +25,19 @@
use bytes::Bytes;
use fnv::FnvHashMap;
use futures::sync::oneshot;
-use futures::{self, future, Future};
-use kad_server::{KadServerInterface, KademliaServerConfig, KademliaServerController};
+use futures::{self, future, Future, Stream};
+use kad_server::{KademliaServerConfig, KademliaServerController, KademliaIncomingRequest, KademliaFindNodeRespond};
use kbucket::{KBucketsPeerId, KBucketsTable, UpdateOutcome};
-use libp2p_peerstore::{PeerAccess, PeerId, Peerstore};
-use libp2p_core::{ConnectionUpgrade, Endpoint, MuxedTransport, SwarmController, Transport};
-use multiaddr::Multiaddr;
+use libp2p_core::{ConnectionUpgrade, Endpoint, MuxedTransport, PeerId, SwarmController, Transport};
+use multiaddr::{AddrComponent, Multiaddr};
use parking_lot::Mutex;
-use protocol::ConnectionType;
+use protocol::Peer;
use query;
use std::collections::hash_map::Entry;
use std::fmt;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
-use std::ops::Deref;
+use std::slice::Iter as SliceIter;
use std::sync::Arc;
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};
@@ -46,45 +45,60 @@ use tokio_timer;
/// Prototype for a future Kademlia protocol running on a socket.
#[derive(Debug, Clone)]
-pub struct KademliaConfig
{
+pub struct KademliaConfig {
/// Degree of parallelism on the network. Often called `alpha` in technical papers.
/// No more than this number of remotes will be used at a given time for any given operation.
// TODO: ^ share this number between operations? or does each operation use `alpha` remotes?
pub parallelism: u32,
- /// Used to load and store data requests of peers.
- // TODO: say that must implement the `Recordstore` trait.
- pub record_store: R,
- /// Used to load and store information about peers.
- pub peer_store: P,
/// Id of the local peer.
pub local_peer_id: PeerId,
/// When contacting a node, duration after which we consider it unresponsive.
pub timeout: Duration,
}
-/// Object that allows one to make queries on the Kademlia system.
-#[derive(Debug)]
-pub struct KademliaControllerPrototype
{
- inner: Arc>,
+// Builds a `QueryParams` that fetches information from `$controller`.
+//
+// Because of lifetime issues and type naming issues, a macro is the most convenient solution.
+macro_rules! gen_query_params {
+ ($controller:expr) => {{
+ let controller = $controller;
+ query::QueryParams {
+ local_id: $controller.inner.kbuckets.my_id().clone(),
+ kbuckets_find_closest: {
+ let controller = controller.clone();
+ move |addr| controller.inner.kbuckets.find_closest(&addr).collect()
+ },
+ parallelism: $controller.inner.parallelism,
+ find_node: {
+ let controller = controller.clone();
+ move |addr, searched| {
+ // TODO: rewrite to be more direct
+ Box::new(controller.send(addr, move |ctl| ctl.find_node(&searched)).flatten()) as Box<_>
+ }
+ },
+ }
+ }};
}
-impl KademliaControllerPrototype
-where
- P: Deref,
- for<'r> &'r Pc: Peerstore,
-{
+/// Object that allows one to make queries on the Kademlia system.
+#[derive(Debug)]
+pub struct KademliaControllerPrototype {
+ inner: Arc,
+}
+
+impl KademliaControllerPrototype {
/// Creates a new controller from that configuration.
- pub fn new(config: KademliaConfig) -> KademliaControllerPrototype
{
+ pub fn new(config: KademliaConfig, initial_peers: I) -> KademliaControllerPrototype
+ where I: IntoIterator-
+ {
let buckets = KBucketsTable::new(config.local_peer_id.clone(), config.timeout);
- for peer_id in config.peer_store.deref().peers() {
+ for peer_id in initial_peers {
let _ = buckets.update(peer_id, ());
}
let inner = Arc::new(Inner {
kbuckets: buckets,
timer: tokio_timer::wheel().build(),
- record_store: config.record_store,
- peer_store: config.peer_store,
connections: Default::default(),
timeout: config.timeout,
parallelism: config.parallelism as usize,
@@ -96,24 +110,21 @@ where
/// Turns the prototype into an actual controller by feeding it a swarm controller.
///
/// You must pass to this function the transport to use to dial and obtain
- /// `KademliaProcessingFuture`, plus a mapping function that will turn the
- /// `KademliaProcessingFuture` into whatever the swarm expects.
+ /// `KademliaPeerReqStream`, plus a mapping function that will turn the
+ /// `KademliaPeerReqStream` into whatever the swarm expects.
pub fn start
(
self,
swarm: SwarmController,
kademlia_transport: K,
map: M,
) -> (
- KademliaController,
+ KademliaController,
Box>,
)
where
- P: Clone + Deref + 'static, // TODO: 'static :-/
- for<'r> &'r Pc: Peerstore,
- R: Clone + 'static, // TODO: 'static :-/
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
- K: Transport + Clone + 'static, // TODO: 'static :-/
- M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static,
+ K: Transport + Clone + 'static, // TODO: 'static :-/
+ M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static,
{
// TODO: initialization
@@ -126,7 +137,13 @@ where
let init_future = {
let futures: Vec<_> = (0..256)
- .map(|n| query::refresh(controller.clone(), n))
+ .map({
+ let controller = controller.clone();
+ move |n| query::refresh(gen_query_params!(controller.clone()), n)
+ })
+ .map(|stream| {
+ stream.for_each(|_| Ok(()))
+ })
.collect();
future::loop_fn(futures, |futures| {
@@ -148,17 +165,17 @@ where
/// Object that allows one to make queries on the Kademlia system.
#[derive(Debug)]
-pub struct KademliaController
+pub struct KademliaController
where
T: MuxedTransport + 'static, // TODO: 'static :-/
{
- inner: Arc>,
+ inner: Arc,
swarm_controller: SwarmController,
kademlia_transport: K,
map: M,
}
-impl Clone for KademliaController
+impl Clone for KademliaController
where
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
K: Clone,
@@ -175,11 +192,8 @@ where
}
}
-impl KademliaController
+impl KademliaController
where
- P: Deref,
- for<'r> &'r Pc: Peerstore,
- R: Clone,
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
{
/// Performs an iterative find node query on the network.
@@ -193,55 +207,51 @@ where
pub fn find_node(
&self,
searched_key: PeerId,
- ) -> Box, Error = IoError>>
+ ) -> Box>, Error = IoError>>
where
- P: Clone + 'static,
- R: 'static,
- K: Transport + Clone + 'static,
- M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static, // TODO: 'static :-/
+ K: Transport + Clone + 'static,
+ M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/
{
- query::find_node(self.clone(), searched_key)
+ let me = self.clone();
+ query::find_node(gen_query_params!(me.clone()), searched_key)
}
}
/// Connection upgrade to the Kademlia protocol.
#[derive(Clone)]
-pub struct KademliaUpgrade {
- inner: Arc>,
- upgrade: KademliaServerConfig>>,
+pub struct KademliaUpgrade {
+ inner: Arc,
+ upgrade: KademliaServerConfig,
}
-impl KademliaUpgrade
{
+impl KademliaUpgrade {
/// Builds a connection upgrade from the controller.
#[inline]
- pub fn from_prototype(proto: &KademliaControllerPrototype
) -> Self {
+ pub fn from_prototype(proto: &KademliaControllerPrototype) -> Self {
KademliaUpgrade {
inner: proto.inner.clone(),
- upgrade: KademliaServerConfig::new(proto.inner.clone()),
+ upgrade: KademliaServerConfig::new(),
}
}
/// Builds a connection upgrade from the controller.
#[inline]
- pub fn from_controller(ctl: &KademliaController) -> Self
+ pub fn from_controller(ctl: &KademliaController) -> Self
where
T: MuxedTransport,
{
KademliaUpgrade {
inner: ctl.inner.clone(),
- upgrade: KademliaServerConfig::new(ctl.inner.clone()),
+ upgrade: KademliaServerConfig::new(),
}
}
}
-impl ConnectionUpgrade for KademliaUpgrade
+impl ConnectionUpgrade for KademliaUpgrade
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
- P: Deref + Clone + 'static, // TODO: 'static :-/
- for<'r> &'r Pc: Peerstore,
- R: 'static, // TODO: 'static :-/
{
- type Output = KademliaProcessingFuture;
+ type Output = KademliaPeerReqStream;
type Future = Box>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@@ -256,8 +266,33 @@ where
let inner = self.inner;
let client_addr = addr.clone();
+ let peer_id = {
+ let mut iter = addr.iter();
+ let protocol = iter.next();
+ let after_proto = iter.next();
+ match (protocol, after_proto) {
+ (Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => {
+ match PeerId::from_bytes(key) {
+ Ok(id) => id,
+ Err(_) => {
+ let err = IoError::new(
+ IoErrorKind::InvalidData,
+ "invalid peer ID sent by remote identification",
+ );
+ return Box::new(future::err(err));
+ }
+ }
+ }
+ _ => {
+ let err =
+ IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node");
+ return Box::new(future::err(err));
+ }
+ }
+ };
+
let future = self.upgrade.upgrade(incoming, id, endpoint, addr).map(
- move |(controller, future)| {
+ move |(controller, stream)| {
match inner.connections.lock().entry(client_addr) {
Entry::Occupied(mut entry) => {
match entry.insert(Connection::Active(controller)) {
@@ -285,7 +320,43 @@ where
}
};
- KademliaProcessingFuture { inner: future }
+ let stream = stream.map(move |query| {
+ match inner.kbuckets.update(peer_id.clone(), ()) {
+ UpdateOutcome::NeedPing(node_to_ping) => {
+ // TODO: do something with this info
+ println!("need to ping {:?}", node_to_ping);
+ }
+ _ => (),
+ }
+
+ match query {
+ KademliaIncomingRequest::FindNode { searched, responder } => {
+ let mut intermediate: Vec<_> = inner.kbuckets.find_closest(&searched).collect();
+ let my_id = inner.kbuckets.my_id().clone();
+ if let Some(pos) = intermediate
+ .iter()
+ .position(|e| e.distance_with(&searched) >= my_id.distance_with(&searched))
+ {
+ if intermediate[pos] != my_id {
+ intermediate.insert(pos, my_id);
+ }
+ } else {
+ intermediate.push(my_id);
+ }
+
+ Some(KademliaPeerReq {
+ requested_peers: intermediate,
+ inner: responder,
+ })
+ },
+ KademliaIncomingRequest::PingPong => {
+ // We updated the k-bucket above.
+ None
+ },
+ }
+ }).filter_map(|val| val);
+
+ KademliaPeerReqStream { inner: Box::new(stream) }
},
);
@@ -293,24 +364,49 @@ where
}
}
-/// Future that must be processed for the Kademlia system to work.
-pub struct KademliaProcessingFuture {
- inner: Box>,
+/// Stream that must be processed for the Kademlia system to work.
+///
+/// Produces requests for peer information. These requests should be answered for the stream to
+/// continue to progress.
+pub struct KademliaPeerReqStream {
+ inner: Box>,
}
-impl Future for KademliaProcessingFuture {
- type Item = ();
+impl Stream for KademliaPeerReqStream {
+ type Item = KademliaPeerReq;
type Error = IoError;
#[inline]
- fn poll(&mut self) -> futures::Poll {
+ fn poll(&mut self) -> futures::Poll, Self::Error> {
self.inner.poll()
}
}
+/// Request for information about some peers.
+pub struct KademliaPeerReq {
+ inner: KademliaFindNodeRespond,
+ requested_peers: Vec,
+}
+
+impl KademliaPeerReq {
+ /// Returns a list of the IDs of the peers that were requested.
+ #[inline]
+ pub fn requested_peers(&self) -> SliceIter {
+ self.requested_peers.iter()
+ }
+
+ /// Responds to the request.
+ #[inline]
+ pub fn respond(self, peers: I)
+ where I: IntoIterator-
+ {
+ self.inner.respond(peers);
+ }
+}
+
// Inner struct shared throughout the Kademlia system.
#[derive(Debug)]
-struct Inner
{
+struct Inner {
// The remotes are identified by their public keys.
kbuckets: KBucketsTable,
@@ -323,12 +419,6 @@ struct Inner {
// Same as in the config.
parallelism: usize,
- // Same as in the config.
- record_store: R,
-
- // Same as in the config.
- peer_store: P,
-
// List of open connections with remotes.
//
// Since the keys are the nodes' multiaddress, it is expected that each node only has one
@@ -363,94 +453,12 @@ impl fmt::Debug for Connection {
}
}
-impl
KadServerInterface for Arc>
+impl KademliaController
where
- P: Deref,
- for<'r> &'r Pc: Peerstore,
-{
- #[inline]
- fn local_id(&self) -> &PeerId {
- self.kbuckets.my_id()
- }
-
- fn peer_info(&self, peer_id: &PeerId) -> (Vec, ConnectionType) {
- let addrs = self.peer_store
- .peer(peer_id)
- .into_iter()
- .flat_map(|p| p.addrs())
- .collect::>();
- (addrs, ConnectionType::Connected) // ConnectionType meh :-/
- }
-
- #[inline]
- fn kbuckets_update(&self, peer: &PeerId) {
- // TODO: is this the right place for this check?
- if peer == self.kbuckets.my_id() {
- return;
- }
-
- match self.kbuckets.update(peer.clone(), ()) {
- UpdateOutcome::NeedPing(node_to_ping) => {
- // TODO: return this info somehow
- println!("need to ping {:?}", node_to_ping);
- }
- _ => (),
- }
- }
-
- #[inline]
- fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec {
- let mut intermediate: Vec<_> = self.kbuckets.find_closest(addr).collect();
- let my_id = self.kbuckets.my_id().clone();
- if let Some(pos) = intermediate
- .iter()
- .position(|e| e.distance_with(&addr) >= my_id.distance_with(&addr))
- {
- if intermediate[pos] != my_id {
- intermediate.insert(pos, my_id);
- }
- } else {
- intermediate.push(my_id);
- }
- intermediate
- }
-}
-
-impl query::QueryInterface for KademliaController
-where
- P: Clone + Deref + 'static, // TODO: 'static :-/
- for<'r> &'r Pc: Peerstore,
- R: Clone + 'static, // TODO: 'static :-/
T: Clone + MuxedTransport + 'static, // TODO: 'static :-/
- K: Transport + Clone + 'static, // TODO: 'static
- M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static, // TODO: 'static :-/
+ K: Transport + Clone + 'static, // TODO: 'static
+ M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/
{
- #[inline]
- fn local_id(&self) -> &PeerId {
- self.inner.kbuckets.my_id()
- }
-
- #[inline]
- fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec {
- self.inner.kbuckets.find_closest(addr).collect()
- }
-
- #[inline]
- fn peer_add_addrs(&self, peer: &PeerId, multiaddrs: I, ttl: Duration)
- where
- I: Iterator- ,
- {
- self.inner
- .peer_store
- .peer_or_create(peer)
- .add_addrs(multiaddrs, ttl);
- }
-
- #[inline]
- fn parallelism(&self) -> usize {
- self.inner.parallelism
- }
-
#[inline]
fn send
(
&self,
diff --git a/kad/src/kad_server.rs b/kad/src/kad_server.rs
index eb5b6f72..b37b45a4 100644
--- a/kad/src/kad_server.rs
+++ b/kad/src/kad_server.rs
@@ -23,79 +23,57 @@
//!
//! # Usage
//!
-//! - Implement the `KadServerInterface` trait on something clonable (usually an `Arc`).
-//!
-//! - Create a `KademliaServerConfig` object from that interface. This struct implements
-//! `ConnectionUpgrade`.
+//! - Create a `KademliaServerConfig` object. This struct implements `ConnectionUpgrade`.
//!
//! - Update a connection through that `KademliaServerConfig`. The output yields you a
-//! `KademliaServerController` and a future that must be driven to completion. The controller
-//! allows you to perform queries and receive responses.
+//! `KademliaServerController` and a stream that must be driven to completion. The controller
+//! allows you to perform queries and receive responses. The stream produces incoming requests
+//! from the remote.
//!
//! This `KademliaServerController` is usually extracted and stored in some sort of hash map in an
//! `Arc` in order to be available whenever we need to request something from a node.
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
-use futures::{future, Future, Sink, Stream};
-use libp2p_peerstore::PeerId;
-use libp2p_core::ConnectionUpgrade;
-use libp2p_core::Endpoint;
-use multiaddr::{AddrComponent, Multiaddr};
+use futures::{future, Future, Sink, stream, Stream};
+use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
+use multiaddr::Multiaddr;
use protocol::{self, KadMsg, KademliaProtocolConfig, Peer};
use std::collections::VecDeque;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
+use std::sync::{Arc, atomic};
use tokio_io::{AsyncRead, AsyncWrite};
-/// Interface that this server system uses to communicate with the rest of the system.
-pub trait KadServerInterface: Clone {
- /// Returns the peer ID of the local node.
- fn local_id(&self) -> &PeerId;
-
- /// Returns known information about the peer. Not atomic/thread-safe in the sense that
- /// information can change immediately after being returned and before they are processed.
- fn peer_info(&self, _: &PeerId) -> (Vec, protocol::ConnectionType);
-
- /// Updates an entry in the K-Buckets. Called whenever that peer sends us a message.
- fn kbuckets_update(&self, peer: &PeerId);
-
- /// Finds the nodes closest to a peer ID.
- fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec;
-}
-
/// Configuration for a Kademlia server.
///
/// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KademliaServerController`
/// and a `Future`. The controller lets you send queries to the remote and receive answers, while
/// the `Future` must be driven to completion in order for things to work.
#[derive(Debug, Clone)]
-pub struct KademliaServerConfig {
+pub struct KademliaServerConfig {
raw_proto: KademliaProtocolConfig,
- interface: I,
}
-impl KademliaServerConfig {
+impl KademliaServerConfig {
/// Builds a configuration object for an upcoming Kademlia server.
#[inline]
- pub fn new(interface: I) -> Self {
+ pub fn new() -> Self {
KademliaServerConfig {
raw_proto: KademliaProtocolConfig,
- interface: interface,
}
}
}
-impl ConnectionUpgrade for KademliaServerConfig
+impl ConnectionUpgrade for KademliaServerConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
- I: KadServerInterface + 'static, // TODO: 'static :-/
{
type Output = (
KademliaServerController,
- Box>,
+ Box>,
);
- type Future = Box>;
+ type Future = future::Map<>::Future, fn(>::Output) -> Self::Output>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@@ -106,42 +84,9 @@ where
#[inline]
fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: &Multiaddr) -> Self::Future {
- let peer_id = {
- let mut iter = addr.iter();
- let protocol = iter.next();
- let after_proto = iter.next();
- match (protocol, after_proto) {
- (Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => {
- match PeerId::from_bytes(key) {
- Ok(id) => id,
- Err(_) => {
- let err = IoError::new(
- IoErrorKind::InvalidData,
- "invalid peer ID sent by remote identification",
- );
- return Box::new(future::err(err));
- }
- }
- }
- _ => {
- let err =
- IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node");
- return Box::new(future::err(err));
- }
- }
- };
-
- let interface = self.interface;
- let future = self.raw_proto
+ self.raw_proto
.upgrade(incoming, id, endpoint, addr)
- .map(move |connec| {
- let (tx, rx) = mpsc::unbounded();
- let future = kademlia_handler(connec, peer_id, rx, interface);
- let controller = KademliaServerController { inner: tx };
- (controller, future)
- });
-
- Box::new(future) as Box<_>
+ .map(build_from_sink_stream)
}
}
@@ -161,7 +106,7 @@ impl KademliaServerController {
pub fn find_node(
&self,
searched_key: &PeerId,
- ) -> Box, Error = IoError>> {
+ ) -> impl Future- , Error = IoError> {
let message = protocol::KadMsg::FindNodeReq {
key: searched_key.clone().into_bytes(),
};
@@ -175,7 +120,8 @@ impl KademliaServerController {
IoErrorKind::ConnectionAborted,
"connection to remote has aborted",
));
- return Box::new(fut) as Box<_>;
+
+ return future::Either::B(fut);
}
};
@@ -192,15 +138,14 @@ impl KademliaServerController {
)),
});
- Box::new(future) as Box<_>
+ future::Either::A(future)
}
/// Sends a `PING` query to the node. Because of the way the protocol is designed, there is
/// no way to differentiate between a ping and a pong. Therefore this function doesn't return a
- /// future, and the only way to be notified of the result is through the `kbuckets_update`
- /// method in the `KadServerInterface` trait.
+ /// future, and the only way to be notified of the result is through the stream.
pub fn ping(&self) -> Result<(), IoError> {
- // Dummy channel.
+ // Dummy channel, as the `tx` is going to be dropped anyway.
let (tx, _rx) = oneshot::channel();
match self.inner.unbounded_send((protocol::KadMsg::Ping, tx)) {
Ok(()) => Ok(()),
@@ -212,248 +157,359 @@ impl KademliaServerController {
}
}
+/// Request received from the remote.
+pub enum KademliaIncomingRequest {
+ /// Find the nodes closest to `searched`.
+ FindNode {
+ /// The value being searched.
+ searched: PeerId,
+ /// Object to use to respond to the request.
+ responder: KademliaFindNodeRespond,
+ },
+
+ // TODO: PutValue and FindValue
+
+ /// Received either a ping or a pong.
+ PingPong,
+}
+
+/// Object used to respond to `FindNode` queries from remotes.
+pub struct KademliaFindNodeRespond {
+ inner: oneshot::Sender
,
+}
+
+impl KademliaFindNodeRespond {
+ /// Respond to the `FindNode` request.
+ pub fn respond(self, peers: I)
+ where I: IntoIterator-
+ {
+ let _ = self.inner.send(KadMsg::FindNodeRes {
+ closer_peers: peers.into_iter().collect()
+ });
+ }
+}
+
+// Builds a controller and stream from a stream/sink of raw messages.
+fn build_from_sink_stream<'a, S>(connec: S) -> (KademliaServerController, Box
+ 'a>)
+where S: Sink + Stream- + 'a
+{
+ let (tx, rx) = mpsc::unbounded();
+ let future = kademlia_handler(connec, rx);
+ let controller = KademliaServerController { inner: tx };
+ (controller, future)
+}
+
// Handles a newly-opened Kademlia stream with a remote peer.
//
// Takes a `Stream` and `Sink` of Kademlia messages representing the connection to the client,
-// plus the ID of the peer that we are handling, plus a `Receiver` that will receive messages to
-// transmit to that connection, plus the interface.
+// plus a `Receiver` that will receive messages to transmit to that connection.
//
-// Returns a `Future` that must be resolved in order for progress to work. It will never yield any
-// item (unless both `rx` and `kad_bistream` are closed) but will propagate any I/O of protocol
-// error that could happen. If the `Receiver` closes, no error is generated.
-fn kademlia_handler<'a, S, I>(
+// Returns a `Stream` that must be resolved in order for progress to work. The `Stream` will
+// produce objects that represent the requests sent by the remote. These requests must be answered
+// immediately before the stream continues to produce items.
+fn kademlia_handler<'a, S>(
kad_bistream: S,
- peer_id: PeerId,
- rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender
)>,
- interface: I,
-) -> Box + 'a>
+ rq_rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender)>,
+) -> Box + 'a>
where
S: Stream- + Sink
+ 'a,
- I: KadServerInterface + Clone + 'a,
{
- let (kad_sink, kad_stream) = kad_bistream
- .sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
- .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
- .split();
+ let (kad_sink, kad_stream) = kad_bistream.split();
- // We combine `kad_stream` and `rx` into one so that the loop wakes up whenever either
- // generates something.
- let messages = rx.map(|(m, o)| (m, Some(o)))
- .map_err(|_| unreachable!())
- .select(kad_stream.map(|m| (m, None)));
+ // This is a stream of futures containing local responses.
+ // Every time we receive a request from the remote, we create a `oneshot::channel()` and send
+ // the receiving end to `responders_tx`.
+ // This way, if a future is available on `responders_rx`, we block until it produces the
+ // response.
+ let (responders_tx, responders_rx) = mpsc::unbounded();
- // Loop forever.
- let future = future::loop_fn(
- (kad_sink, messages, VecDeque::new(), 0),
- move |(kad_sink, messages, mut send_back_queue, mut expected_pongs)| {
- let interface = interface.clone();
- let peer_id = peer_id.clone();
+ // Will be set to true if either `kad_stream` or `rq_rx` is closed.
+ let finished = Arc::new(atomic::AtomicBool::new(false));
- // The `send_back_queue` is a queue of `UnboundedSender`s in the correct order of
- // expected answers.
- // Whenever we send a message to the remote and this message expects a response, we
- // push the sender to the end of `send_back_queue`. Whenever a remote sends us a
- // response, we pop the first element of `send_back_queue`.
+ // We combine all the streams into one so that the loop wakes up whenever any generates
+ // something.
+ enum EventSource {
+ Remote(KadMsg),
+ LocalRequest(KadMsg, oneshot::Sender),
+ LocalResponse(oneshot::Receiver),
+ Finished,
+ }
- // The value of `expected_pongs` is the number of PING requests that we sent and that
- // haven't been answered by the remote yet. Because of the way the protocol is designed,
- // there is no way to differentiate between a ping and a pong. Therefore whenever we
- // send a ping request we suppose that the next ping we receive is an answer, even
- // though that may not be the case in reality.
- // Because of this behaviour, pings do not pop from the `send_back_queue`.
+ let events = {
+ let responders = responders_rx
+ .map(|m| EventSource::LocalResponse(m))
+ .map_err(|_| unreachable!());
+ let rq_rx = rq_rx
+ .map(|(m, o)| EventSource::LocalRequest(m, o))
+ .map_err(|_| unreachable!())
+ .chain({
+ let finished = finished.clone();
+ future::lazy(move || {
+ finished.store(true, atomic::Ordering::SeqCst);
+ Ok(EventSource::Finished)
+ }).into_stream()
+ });
+ let kad_stream = kad_stream
+ .map(|m| EventSource::Remote(m))
+ .chain({
+ let finished = finished.clone();
+ future::lazy(move || {
+ finished.store(true, atomic::Ordering::SeqCst);
+ Ok(EventSource::Finished)
+ }).into_stream()
+ });
+ responders.select(rq_rx).select(kad_stream)
+ };
- messages
+ let stream = stream::unfold((events, kad_sink, responders_tx, VecDeque::new(), 0u32),
+ move |(events, kad_sink, responders_tx, mut send_back_queue, expected_pongs)| {
+ if finished.load(atomic::Ordering::SeqCst) {
+ return None;
+ }
+
+ Some(events
.into_future()
.map_err(|(err, _)| err)
- .and_then(move |(message, rest)| {
- if let Some((_, None)) = message {
- // If we received a message from the remote (as opposed to a message from
- // `rx`) then we update the k-buckets.
- interface.kbuckets_update(&peer_id);
- }
-
+ .and_then(move |(message, events)| -> Box> {
match message {
- None => {
- // Both the connection stream and `rx` are empty, so we break the loop.
- let future = future::ok(future::Loop::Break(()));
- Box::new(future) as Box>
- }
- Some((message @ KadMsg::PutValue { .. }, Some(_))) => {
- // A `PutValue` message has been received on `rx`. Contrary to other
- // types of messages, this one doesn't expect any answer and therefore
- // we ignore the sender.
- let future = kad_sink.send(message).map(move |kad_sink| {
- future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- ))
- });
+ Some(EventSource::Finished) | None => {
+ // `finished` should have been set to true earlier, causing this
+ // function to return `None`.
+ unreachable!()
+ },
+ Some(EventSource::LocalResponse(message)) => {
+ let future = message
+ .map_err(|_| {
+ // The user destroyed the responder without responding.
+ warn!("Kad responder object destroyed without responding");
+ panic!() // TODO: what to do here? we have to close the connection
+ })
+ .and_then(move |message| {
+ kad_sink
+ .send(message)
+ .map(move |kad_sink| {
+ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
+ (None, state)
+ })
+ });
+ Box::new(future)
+ },
+ Some(EventSource::LocalRequest(message @ KadMsg::PutValue { .. }, _)) => {
+ // A `PutValue` request. Contrary to other types of messages, this one
+ // doesn't expect any answer and therefore we ignore the sender.
+ let future = kad_sink
+ .send(message)
+ .map(move |kad_sink| {
+ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
+ (None, state)
+ });
Box::new(future) as Box<_>
}
- Some((message @ KadMsg::Ping { .. }, Some(_))) => {
- // A `Ping` message has been received on `rx`.
- expected_pongs += 1;
- let future = kad_sink.send(message).map(move |kad_sink| {
- future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- ))
- });
+ Some(EventSource::LocalRequest(message @ KadMsg::Ping { .. }, _)) => {
+ // A local `Ping` request.
+ let expected_pongs = expected_pongs.checked_add(1)
+ .expect("overflow in number of simultaneous pings");
+ let future = kad_sink
+ .send(message)
+ .map(move |kad_sink| {
+ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
+ (None, state)
+ });
Box::new(future) as Box<_>
}
- Some((message, Some(send_back))) => {
- // Any message other than `PutValue` or `Ping` has been received on
- // `rx`. Send it to the remote.
- let future = kad_sink.send(message).map(move |kad_sink| {
- send_back_queue.push_back(send_back);
- future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- ))
- });
+ Some(EventSource::LocalRequest(message, send_back)) => {
+ // Any local request other than `PutValue` or `Ping`.
+ send_back_queue.push_back(send_back);
+ let future = kad_sink
+ .send(message)
+ .map(move |kad_sink| {
+ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
+ (None, state)
+ });
Box::new(future) as Box<_>
}
- Some((KadMsg::Ping, None)) => {
- // Note: The way the protocol was designed, there is no way to
- // differentiate between a ping and a pong.
- if expected_pongs == 0 {
- let message = KadMsg::Ping;
- let future = kad_sink.send(message).map(move |kad_sink| {
- future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- ))
+ Some(EventSource::Remote(KadMsg::Ping)) => {
+ // The way the protocol was designed, there is no way to differentiate
+ // between a ping and a pong.
+ if let Some(expected_pongs) = expected_pongs.checked_sub(1) {
+ // Maybe we received a PONG, or maybe we received a PONG, no way
+ // to tell. If it was a PING and we expected a PONG, then the
+ // remote will see its PING answered only when it PONGs us.
+ let future = future::ok({
+ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
+ let rq = KademliaIncomingRequest::PingPong;
+ (Some(rq), state)
});
Box::new(future) as Box<_>
} else {
- expected_pongs -= 1;
- let future = future::ok({
- future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- ))
- });
+ let future = kad_sink
+ .send(KadMsg::Ping)
+ .map(move |kad_sink| {
+ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
+ let rq = KademliaIncomingRequest::PingPong;
+ (Some(rq), state)
+ });
Box::new(future) as Box<_>
}
}
- Some((message @ KadMsg::FindNodeRes { .. }, None))
- | Some((message @ KadMsg::GetValueRes { .. }, None)) => {
+ Some(EventSource::Remote(message @ KadMsg::FindNodeRes { .. }))
+ | Some(EventSource::Remote(message @ KadMsg::GetValueRes { .. })) => {
// `FindNodeRes` or `GetValueRes` received on the socket.
// Send it back through `send_back_queue`.
if let Some(send_back) = send_back_queue.pop_front() {
let _ = send_back.send(message);
- let future = future::ok(future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- )));
- return Box::new(future) as Box<_>;
+ let future = future::ok({
+ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
+ (None, state)
+ });
+ Box::new(future)
} else {
+ debug!("Remote sent a Kad response but we didn't request anything");
let future = future::err(IoErrorKind::InvalidData.into());
- return Box::new(future) as Box<_>;
+ Box::new(future)
}
}
- Some((KadMsg::FindNodeReq { key, .. }, None)) => {
- // `FindNodeReq` received on the socket.
- let message = handle_find_node_req(&interface, &key);
- let future = kad_sink.send(message).map(move |kad_sink| {
- future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- ))
- });
- Box::new(future) as Box<_>
- }
- Some((KadMsg::GetValueReq { key, .. }, None)) => {
- // `GetValueReq` received on the socket.
- let message = handle_get_value_req(&interface, &key);
- let future = kad_sink.send(message).map(move |kad_sink| {
- future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- ))
- });
- Box::new(future) as Box<_>
- }
- Some((KadMsg::PutValue { .. }, None)) => {
- // `PutValue` received on the socket.
- handle_put_value_req(&interface);
+ Some(EventSource::Remote(KadMsg::FindNodeReq { key, .. })) => {
+ let peer_id = match PeerId::from_bytes(key) {
+ Ok(id) => id,
+ Err(key) => {
+ debug!("Ignoring FIND_NODE request with invalid key: {:?}", key);
+ let future = future::err(IoError::new(IoErrorKind::InvalidData, "invalid key in FIND_NODE"));
+ return Box::new(future);
+ }
+ };
+
+ let (tx, rx) = oneshot::channel();
+ let _ = responders_tx.unbounded_send(rx);
let future = future::ok({
- future::Loop::Continue((
- kad_sink,
- rest,
- send_back_queue,
- expected_pongs,
- ))
+ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs);
+ let rq = KademliaIncomingRequest::FindNode {
+ searched: peer_id,
+ responder: KademliaFindNodeRespond {
+ inner: tx
+ }
+ };
+ (Some(rq), state)
});
- Box::new(future) as Box<_>
+
+ Box::new(future)
+ }
+ Some(EventSource::Remote(KadMsg::GetValueReq { .. })) => {
+ unimplemented!() // FIXME:
+ }
+ Some(EventSource::Remote(KadMsg::PutValue { .. })) => {
+ unimplemented!() // FIXME:
}
}
- })
- },
- );
+ }))
+ }).filter_map(|val| val);
- Box::new(future) as Box>
+ Box::new(stream) as Box>
}
-// Builds a `KadMsg` that handles a `FIND_NODE` request received from the remote.
-fn handle_find_node_req(interface: &I, requested_key: &[u8]) -> KadMsg
-where
- I: ?Sized + KadServerInterface,
-{
- let peer_id = match PeerId::from_bytes(requested_key.to_vec()) {
- // TODO: suboptimal
- Ok(id) => id,
- Err(_) => {
- return KadMsg::FindNodeRes {
- closer_peers: vec![],
- }
+#[cfg(test)]
+mod tests {
+ use std::io::Error as IoError;
+ use std::iter;
+ use futures::{Future, Poll, Sink, StartSend, Stream};
+ use futures::sync::mpsc;
+ use kad_server::{self, KademliaIncomingRequest, KademliaServerController};
+ use libp2p_core::PublicKeyBytes;
+ use protocol::{ConnectionType, Peer};
+ use rand;
+
+ // This struct merges a stream and a sink and is quite useful for tests.
+ struct Wrapper(St, Si);
+ impl Stream for Wrapper
+ where
+ St: Stream,
+ {
+ type Item = St::Item;
+ type Error = St::Error;
+ fn poll(&mut self) -> Poll, Self::Error> {
+ self.0.poll()
}
- };
+ }
+ impl Sink for Wrapper
+ where
+ Si: Sink,
+ {
+ type SinkItem = Si::SinkItem;
+ type SinkError = Si::SinkError;
+ fn start_send(
+ &mut self,
+ item: Self::SinkItem,
+ ) -> StartSend {
+ self.1.start_send(item)
+ }
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.1.poll_complete()
+ }
+ }
- let closer_peers = interface
- .kbuckets_find_closest(&peer_id)
- .into_iter()
- .map(|peer| {
- let (multiaddrs, connection_ty) = interface.peer_info(&peer);
- protocol::Peer {
- node_id: peer,
- multiaddrs: multiaddrs,
- connection_ty: connection_ty,
- }
- })
- .collect();
+ fn build_test() -> (KademliaServerController, impl Stream- , KademliaServerController, impl Stream
- ) {
+ let (a_to_b, b_from_a) = mpsc::unbounded();
+ let (b_to_a, a_from_b) = mpsc::unbounded();
- KadMsg::FindNodeRes { closer_peers }
-}
-
-// Builds a `KadMsg` that handles a `FIND_VALUE` request received from the remote.
-fn handle_get_value_req
(_interface: &I, _requested_key: &[u8]) -> KadMsg
-where
- I: ?Sized + KadServerInterface,
-{
- unimplemented!()
-}
-
-// Handles a `STORE` request received from the remote.
-fn handle_put_value_req(_interface: &I)
-where
- I: ?Sized + KadServerInterface,
-{
- unimplemented!()
+ let sink_stream_a = Wrapper(a_from_b, a_to_b)
+ .map_err(|_| panic!()).sink_map_err(|_| panic!());
+ let sink_stream_b = Wrapper(b_from_a, b_to_a)
+ .map_err(|_| panic!()).sink_map_err(|_| panic!());
+
+ let (controller_a, stream_events_a) = kad_server::build_from_sink_stream(sink_stream_a);
+ let (controller_b, stream_events_b) = kad_server::build_from_sink_stream(sink_stream_b);
+ (controller_a, stream_events_a, controller_b, stream_events_b)
+ }
+
+ #[test]
+ fn ping_response() {
+ let (controller_a, stream_events_a, _controller_b, stream_events_b) = build_test();
+
+ controller_a.ping().unwrap();
+
+ let streams = stream_events_a.map(|ev| (ev, "a"))
+ .select(stream_events_b.map(|ev| (ev, "b")));
+ match streams.into_future().map_err(|(err, _)| err).wait().unwrap() {
+ (Some((KademliaIncomingRequest::PingPong, "b")), _) => {},
+ _ => panic!()
+ }
+ }
+
+ #[test]
+ fn find_node_response() {
+ let (controller_a, stream_events_a, _controller_b, stream_events_b) = build_test();
+
+ let random_peer_id = {
+ let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::>();
+ PublicKeyBytes(buf).to_peer_id()
+ };
+
+ let find_node_fut = controller_a.find_node(&random_peer_id);
+
+ let example_response = Peer {
+ node_id: {
+ let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::>();
+ PublicKeyBytes(buf).to_peer_id()
+ },
+ multiaddrs: Vec::new(),
+ connection_ty: ConnectionType::Connected,
+ };
+
+ let streams = stream_events_a.map(|ev| (ev, "a"))
+ .select(stream_events_b.map(|ev| (ev, "b")));
+
+ let streams = match streams.into_future().map_err(|(err, _)| err).wait().unwrap() {
+ (Some((KademliaIncomingRequest::FindNode { searched, responder }, "b")), streams) => {
+ assert_eq!(searched, random_peer_id);
+ responder.respond(iter::once(example_response.clone()));
+ streams
+ },
+ _ => panic!()
+ };
+
+ let resp = streams.into_future().map_err(|(err, _)| err).map(|_| unreachable!())
+ .select(find_node_fut)
+ .map_err(|_| -> IoError { panic!() });
+ assert_eq!(resp.wait().unwrap().0, vec![example_response]);
+ }
}
diff --git a/kad/src/kbucket.rs b/kad/src/kbucket.rs
index 3478f897..6408bd01 100644
--- a/kad/src/kbucket.rs
+++ b/kad/src/kbucket.rs
@@ -29,7 +29,7 @@
use arrayvec::ArrayVec;
use bigint::U512;
-use libp2p_peerstore::PeerId;
+use libp2p_core::PeerId;
use parking_lot::{Mutex, MutexGuard};
use std::mem;
use std::slice::Iter as SliceIter;
@@ -89,11 +89,12 @@ impl KBucket {
// If a node is pending and the timeout has expired, removes the first element of `nodes`
// and pushes back the node in `pending_node`.
fn flush(&mut self, timeout: Duration) {
- if let Some((_, instant)) = self.pending_node {
+ if let Some((pending_node, instant)) = self.pending_node.take() {
if instant.elapsed() >= timeout {
- let (pending_node, _) = self.pending_node.take().unwrap();
let _ = self.nodes.remove(0);
self.nodes.push(pending_node);
+ } else {
+ self.pending_node = Some((pending_node, instant));
}
}
}
@@ -209,15 +210,10 @@ where
/// Marks the node as "most recent" in its bucket and modifies the value associated to it.
/// This function should be called whenever we receive a communication from a node.
- ///
- /// # Panic
- ///
- /// Panics if `id` is equal to the local node ID.
- ///
pub fn update(&self, id: Id, value: Val) -> UpdateOutcome {
let table = match self.bucket_num(&id) {
Some(n) => &self.tables[n],
- None => panic!("tried to update our own node in the kbuckets table"),
+ None => return UpdateOutcome::FailSelfUpdate,
};
let mut table = table.lock();
@@ -272,10 +268,13 @@ pub enum UpdateOutcome {
Added,
/// The node was already in the bucket and has been refreshed.
Refreshed(Val),
- /// The node wasn't added. Instead we need to ping the node passed as parameter.
+ /// The node wasn't added. Instead we need to ping the node passed as parameter, and call
+ /// `update` if it responds.
NeedPing(Id),
/// The node wasn't added at all because a node was already pending.
Discarded,
+ /// Tried to update the local peer ID. This is an invalid operation.
+ FailSelfUpdate,
}
/// Iterator giving access to a bucket.
@@ -335,7 +334,7 @@ mod tests {
extern crate rand;
use self::rand::random;
use kbucket::{KBucketsTable, UpdateOutcome, MAX_NODES_PER_BUCKET};
- use libp2p_peerstore::PeerId;
+ use libp2p_core::PeerId;
use std::thread;
use std::time::Duration;
@@ -364,8 +363,7 @@ mod tests {
}
#[test]
- #[should_panic(expected = "tried to update our own node in the kbuckets table")]
- fn update_local_id_panic() {
+ fn update_local_id_fails() {
let my_id = {
let mut bytes = vec![random(); 34];
bytes[0] = 18;
@@ -374,7 +372,10 @@ mod tests {
};
let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5));
- let _ = table.update(my_id, ());
+ match table.update(my_id, ()) {
+ UpdateOutcome::FailSelfUpdate => (),
+ _ => panic!()
+ }
}
#[test]
diff --git a/kad/src/lib.rs b/kad/src/lib.rs
index 75e65f63..8b48c3db 100644
--- a/kad/src/lib.rs
+++ b/kad/src/lib.rs
@@ -69,7 +69,6 @@ extern crate datastore;
extern crate fnv;
extern crate futures;
extern crate libp2p_identify;
-extern crate libp2p_peerstore;
extern crate libp2p_ping;
extern crate libp2p_core;
#[macro_use]
@@ -84,7 +83,9 @@ extern crate tokio_timer;
extern crate varint;
pub use self::high_level::{KademliaConfig, KademliaController, KademliaControllerPrototype};
-pub use self::high_level::{KademliaProcessingFuture, KademliaUpgrade};
+pub use self::high_level::{KademliaPeerReqStream, KademliaUpgrade, KademliaPeerReq};
+pub use self::protocol::{ConnectionType, Peer};
+pub use self::query::QueryEvent;
mod high_level;
mod kad_server;
diff --git a/kad/src/protocol.rs b/kad/src/protocol.rs
index 6dabb7d6..f10ceabe 100644
--- a/kad/src/protocol.rs
+++ b/kad/src/protocol.rs
@@ -25,16 +25,14 @@
//! The `Stream` component is used to poll the underlying transport, and the `Sink` component is
//! used to send messages.
-use bytes::Bytes;
-use futures::future;
-use futures::{Sink, Stream};
-use libp2p_peerstore::PeerId;
-use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr};
+use bytes::{Bytes, BytesMut};
+use futures::{future, sink, Sink, stream, Stream};
+use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr, PeerId};
use protobuf::{self, Message};
use protobuf_structs;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
-use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_io::{AsyncRead, AsyncWrite, codec::Framed};
use varint::VarintCodec;
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
@@ -122,8 +120,7 @@ impl ConnectionUpgrade for KademliaProtocolConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
{
- type Output =
- Box>;
+ type Output = KadStreamSink;
type Future = future::FutureResult;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@@ -139,37 +136,26 @@ where
}
}
+type KadStreamSink = stream::AndThen>>, IoError>, KadMsg, fn(KadMsg) -> Result, IoError>, Result, IoError>>, fn(BytesMut) -> Result, Result>;
+
// Upgrades a socket to use the Kademlia protocol.
-fn kademlia_protocol<'a, S>(
+fn kademlia_protocol(
socket: S,
-) -> Box + 'a>
+) -> KadStreamSink
where
- S: AsyncRead + AsyncWrite + 'a,
+ S: AsyncRead + AsyncWrite,
{
- let wrapped = socket
+ socket
.framed(VarintCodec::default())
.from_err::()
- .with(|request| -> Result<_, IoError> {
+ .with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> {
let proto_struct = msg_to_proto(request);
Ok(proto_struct.write_to_bytes().unwrap()) // TODO: error?
})
- .and_then(|bytes| {
+ .and_then:: _, _>(|bytes| {
let response = protobuf::parse_from_bytes(&bytes)?;
proto_to_msg(response)
- });
-
- Box::new(wrapped)
-}
-
-/// Custom trait that derives `Sink` and `Stream`, so that we can box it.
-pub trait KadStreamSink:
- Stream- + Sink
-{
-}
-impl KadStreamSink for T
-where
- T: Stream- + Sink
,
-{
+ })
}
/// Message that we can send to a peer or received from a peer.
@@ -307,8 +293,7 @@ mod tests {
use self::libp2p_tcp_transport::TcpConfig;
use self::tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
- use libp2p_peerstore::PeerId;
- use libp2p_core::{Transport, PublicKeyBytesSlice};
+ use libp2p_core::{Transport, PeerId, PublicKeyBytesSlice};
use protocol::{ConnectionType, KadMsg, KademliaProtocolConfig, Peer};
use std::sync::mpsc;
use std::thread;
diff --git a/kad/src/query.rs b/kad/src/query.rs
index 9d97d6b0..6098a63c 100644
--- a/kad/src/query.rs
+++ b/kad/src/query.rs
@@ -21,10 +21,9 @@
//! This module handles performing iterative queries about the network.
use fnv::FnvHashSet;
-use futures::{future, Future};
-use kad_server::KademliaServerController;
+use futures::{future, Future, stream, Stream};
use kbucket::KBucketsPeerId;
-use libp2p_peerstore::PeerId;
+use libp2p_core::PeerId;
use multiaddr::{AddrComponent, Multiaddr};
use protocol;
use rand;
@@ -32,78 +31,73 @@ use smallvec::SmallVec;
use std::cmp::Ordering;
use std::io::Error as IoError;
use std::mem;
-use std::time::Duration;
-/// Interface that the query uses to communicate with the rest of the system.
-pub trait QueryInterface: Clone {
- /// Returns the peer ID of the local node.
- fn local_id(&self) -> &PeerId;
+/// Parameters of a query. Allows plugging the query-related code with the rest of the
+/// infrastructure.
+pub struct QueryParams {
+ /// Identifier of the local peer.
+ pub local_id: PeerId,
+ /// Called whenever we need to obtain the peers closest to a certain peer.
+ pub kbuckets_find_closest: FBuckets,
+ /// Level of parallelism for networking. If this is `N`, then we can dial `N` nodes at a time.
+ pub parallelism: usize,
+ /// Called whenever we want to send a `FIND_NODE` RPC query.
+ pub find_node: FFindNode,
+}
- /// Finds the nodes closest to a peer ID.
- fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec;
-
- /// Adds new known multiaddrs for the given peer.
- fn peer_add_addrs(&self, peer: &PeerId, multiaddrs: I, ttl: Duration)
- where
- I: Iterator- ;
-
- /// Returns the level of parallelism wanted for this query.
- fn parallelism(&self) -> usize;
-
- /// Attempts to contact the given multiaddress, then calls `and_then` on success. Returns a
- /// future that contains the output of `and_then`, or an error if we failed to contact the
- /// remote.
- // TODO: use HKTB once Rust supports that, to avoid boxing the future
- fn send
(
- &self,
- addr: Multiaddr,
- and_then: F,
- ) -> Box>
- where
- F: FnOnce(&KademliaServerController) -> FRet + 'static,
- FRet: 'static;
+/// Event that happens during a query.
+#[derive(Debug, Clone)]
+pub enum QueryEvent {
+ /// Learned about new mutiaddresses for the given peers.
+ NewKnownMultiaddrs(Vec<(PeerId, Vec)>),
+ /// Finished the processing of the query. Contains the result.
+ Finished(TOut),
}
/// Starts a query for an iterative `FIND_NODE` request.
#[inline]
-pub fn find_node<'a, I>(
- query_interface: I,
+pub fn find_node<'a, FBuckets, FFindNode>(
+ query_params: QueryParams,
searched_key: PeerId,
-) -> Box, Error = IoError> + 'a>
+) -> Box>, Error = IoError> + 'a>
where
- I: QueryInterface + 'a,
+ FBuckets: Fn(PeerId) -> Vec + 'a + Clone,
+ FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone,
{
- query(query_interface, searched_key, 20) // TODO: constant
+ query(query_params, searched_key, 20) // TODO: constant
}
/// Refreshes a specific bucket by performing an iterative `FIND_NODE` on a random ID of this
/// bucket.
///
/// Returns a dummy no-op future if `bucket_num` is out of range.
-pub fn refresh<'a, I>(
- query_interface: I,
+pub fn refresh<'a, FBuckets, FFindNode>(
+ query_params: QueryParams,
bucket_num: usize,
-) -> Box + 'a>
+) -> Box, Error = IoError> + 'a>
where
- I: QueryInterface + 'a,
+ FBuckets: Fn(PeerId) -> Vec + 'a + Clone,
+ FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone,
{
- let peer_id = match gen_random_id(&query_interface, bucket_num) {
+ let peer_id = match gen_random_id(&query_params.local_id, bucket_num) {
Ok(p) => p,
- Err(()) => return Box::new(future::ok(())),
+ Err(()) => return Box::new(stream::once(Ok(QueryEvent::Finished(())))),
};
- let future = find_node(query_interface, peer_id).map(|_| ());
- Box::new(future) as Box<_>
+ let stream = find_node(query_params, peer_id).map(|event| {
+ match event {
+ QueryEvent::NewKnownMultiaddrs(peers) => QueryEvent::NewKnownMultiaddrs(peers),
+ QueryEvent::Finished(_) => QueryEvent::Finished(()),
+ }
+ });
+
+ Box::new(stream) as Box<_>
}
// Generates a random `PeerId` that belongs to the given bucket.
//
// Returns an error if `bucket_num` is out of range.
-fn gen_random_id(query_interface: &I, bucket_num: usize) -> Result
-where
- I: ?Sized + QueryInterface,
-{
- let my_id = query_interface.local_id();
+fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result {
let my_id_len = my_id.as_bytes().len();
// TODO: this 2 is magic here ; it is the length of the hash of the multihash
@@ -134,22 +128,21 @@ where
}
// Generic query-performing function.
-fn query<'a, I>(
- query_interface: I,
+fn query<'a, FBuckets, FFindNode>(
+ query_params: QueryParams,
searched_key: PeerId,
num_results: usize,
-) -> Box, Error = IoError> + 'a>
+) -> Box>, Error = IoError> + 'a>
where
- I: QueryInterface + 'a,
+ FBuckets: Fn(PeerId) -> Vec + 'a + Clone,
+ FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone,
{
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
// State of the current iterative process.
struct State<'a> {
- // If true, we are still in the first step of the algorithm where we try to find the
- // closest node. If false, then we are contacting the k closest nodes in order to fill the
- // list with enough results.
- looking_for_closer: bool,
+ // At which stage we are.
+ stage: Stage,
// Final output of the iteration.
result: Vec,
// For each open connection, a future with the response of the remote.
@@ -164,26 +157,55 @@ where
failed_to_contact: FnvHashSet,
}
+ // General stage of the state.
+ #[derive(Copy, Clone, PartialEq, Eq)]
+ enum Stage {
+ // We are still in the first step of the algorithm where we try to find the closest node.
+ FirstStep,
+ // We are contacting the k closest nodes in order to fill the list with enough results.
+ SecondStep,
+ // The results are complete, and the next stream iteration will produce the outcome.
+ FinishingNextIter,
+ // We are finished and the stream shouldn't return anything anymore.
+ Finished,
+ }
+
let initial_state = State {
- looking_for_closer: true,
+ stage: Stage::FirstStep,
result: Vec::with_capacity(num_results),
current_attempts_fut: Vec::new(),
current_attempts_addrs: SmallVec::new(),
- pending_nodes: query_interface.kbuckets_find_closest(&searched_key),
+ pending_nodes: {
+ let kbuckets_find_closest = query_params.kbuckets_find_closest.clone();
+ kbuckets_find_closest(searched_key.clone()) // TODO: suboptimal
+ },
failed_to_contact: Default::default(),
};
- let parallelism = query_interface.parallelism();
+ let parallelism = query_params.parallelism;
// Start of the iterative process.
- let stream = future::loop_fn(initial_state, move |mut state| {
+ let stream = stream::unfold(initial_state, move |mut state| -> Option<_> {
+ match state.stage {
+ Stage::FinishingNextIter => {
+ let result = mem::replace(&mut state.result, Vec::new());
+ debug!("Query finished with {} results", result.len());
+ state.stage = Stage::Finished;
+ let future = future::ok((Some(QueryEvent::Finished(result)), state));
+ return Some(future::Either::A(future));
+ },
+ Stage::Finished => {
+ return None;
+ },
+ _ => ()
+ };
+
let searched_key = searched_key.clone();
- let query_interface = query_interface.clone();
- let query_interface2 = query_interface.clone();
+ let find_node_rpc = query_params.find_node.clone();
// Find out which nodes to contact at this iteration.
let to_contact = {
- let wanted_len = if state.looking_for_closer {
+ let wanted_len = if state.stage == Stage::FirstStep {
parallelism.saturating_sub(state.current_attempts_fut.len())
} else {
num_results.saturating_sub(state.current_attempts_fut.len())
@@ -218,10 +240,8 @@ where
let multiaddr: Multiaddr = AddrComponent::P2P(peer.clone().into_bytes()).into();
let searched_key2 = searched_key.clone();
- let resp_rx =
- query_interface.send(multiaddr.clone(), move |ctl| ctl.find_node(&searched_key2));
+ let current_attempt = find_node_rpc(multiaddr.clone(), searched_key2); // TODO: suboptimal
state.current_attempts_addrs.push(peer.clone());
- let current_attempt = resp_rx.flatten();
state
.current_attempts_fut
.push(Box::new(current_attempt) as Box<_>);
@@ -237,8 +257,10 @@ where
if current_attempts_fut.is_empty() {
// If `current_attempts_fut` is empty, then `select_all` would panic. It happens
// when we have no additional node to query.
- let future = future::ok(future::Loop::Break(state));
- return future::Either::A(future);
+ debug!("Finishing query early because no additional node available");
+ state.stage = Stage::FinishingNextIter;
+ let future = future::ok((None, state));
+ return Some(future::Either::A(future));
}
// This is the future that continues or breaks the `loop_fn`.
@@ -263,7 +285,7 @@ where
Err(err) => {
trace!("RPC query failed for {:?}: {:?}", remote_id, err);
state.failed_to_contact.insert(remote_id);
- return Ok(future::Loop::Continue(state));
+ return future::ok((None, state));
}
};
@@ -288,17 +310,14 @@ where
let mut local_nearest_node_updated = false;
// Update `state` with the actual content of the message.
+ let mut new_known_multiaddrs = Vec::with_capacity(closer_peers.len());
for mut peer in closer_peers {
// Update the peerstore with the information sent by
// the remote.
{
- let valid_multiaddrs = peer.multiaddrs.drain(..);
- trace!("Adding multiaddresses to {:?}: {:?}", peer.node_id, valid_multiaddrs);
- query_interface2.peer_add_addrs(
- &peer.node_id,
- valid_multiaddrs,
- Duration::from_secs(3600),
- ); // TODO: which TTL?
+ let multiaddrs = mem::replace(&mut peer.multiaddrs, Vec::new());
+ trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, multiaddrs);
+ new_known_multiaddrs.push((peer.node_id.clone(), multiaddrs));
}
if peer.node_id.distance_with(&searched_key)
@@ -325,28 +344,22 @@ where
}
if state.result.len() >= num_results
- || (!state.looking_for_closer && state.current_attempts_fut.is_empty())
+ || (state.stage != Stage::FirstStep && state.current_attempts_fut.is_empty())
{
- // Check that our `Vec::with_capacity` is correct.
- debug_assert_eq!(state.result.capacity(), num_results);
- Ok(future::Loop::Break(state))
+ state.stage = Stage::FinishingNextIter;
+
} else {
if !local_nearest_node_updated {
trace!("Loop didn't update closer node ; jumping to step 2");
- state.looking_for_closer = false;
+ state.stage = Stage::SecondStep;
}
-
- Ok(future::Loop::Continue(state))
}
+
+ future::ok((Some(QueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state))
});
- future::Either::B(future)
- });
-
- let stream = stream.map(|state| {
- debug!("Query finished with {} results", state.result.len());
- state.result
- });
+ Some(future::Either::B(future))
+ }).filter_map(|val| val);
Box::new(stream) as Box<_>
}
diff --git a/libp2p/examples/kademlia.rs b/libp2p/examples/kademlia.rs
index 069f88a1..d10a3d9b 100644
--- a/libp2p/examples/kademlia.rs
+++ b/libp2p/examples/kademlia.rs
@@ -27,7 +27,7 @@ extern crate tokio_core;
extern crate tokio_io;
use bigint::U512;
-use futures::future::Future;
+use futures::{Future, Stream};
use libp2p::peerstore::{PeerAccess, PeerId, Peerstore};
use libp2p::Multiaddr;
use std::env;
@@ -35,6 +35,7 @@ use std::sync::Arc;
use std::time::Duration;
use libp2p::core::{Transport, PublicKeyBytesSlice};
use libp2p::core::{upgrade, either::EitherOutput};
+use libp2p::kad::{ConnectionType, Peer, QueryEvent};
use libp2p::tcp::TcpConfig;
use tokio_core::reactor::Core;
@@ -103,13 +104,11 @@ fn main() {
// and outgoing connections for us.
let kad_config = libp2p::kad::KademliaConfig {
parallelism: 3,
- record_store: (),
- peer_store: peer_store,
local_peer_id: my_peer_id.clone(),
timeout: Duration::from_secs(2),
};
- let kad_ctl_proto = libp2p::kad::KademliaControllerPrototype::new(kad_config);
+ let kad_ctl_proto = libp2p::kad::KademliaControllerPrototype::new(kad_config, peer_store.peers());
let proto = libp2p::kad::KademliaUpgrade::from_prototype(&kad_ctl_proto);
@@ -117,7 +116,32 @@ fn main() {
// outgoing connections for us.
let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(proto.clone()),
- |upgrade, _| upgrade,
+ {
+ let peer_store = peer_store.clone();
+ move |kademlia_stream, _| {
+ let peer_store = peer_store.clone();
+ kademlia_stream.for_each(move |req| {
+ let peer_store = peer_store.clone();
+ let result = req
+ .requested_peers()
+ .map(move |peer_id| {
+ let addrs = peer_store
+ .peer(peer_id)
+ .into_iter()
+ .flat_map(|p| p.addrs())
+ .collect::>();
+ Peer {
+ node_id: peer_id.clone(),
+ multiaddrs: addrs,
+ connection_ty: ConnectionType::Connected, // meh :-/
+ }
+ })
+ .collect::>();
+ req.respond(result);
+ Ok(())
+ })
+ }
+ }
);
let (kad_controller, _kad_init) =
@@ -132,6 +156,21 @@ fn main() {
let finish_enum = kad_controller
.find_node(my_peer_id.clone())
+ .filter_map(move |event| {
+ match event {
+ QueryEvent::NewKnownMultiaddrs(peers) => {
+ for (peer, addrs) in peers {
+ peer_store.peer_or_create(&peer)
+ .add_addrs(addrs, Duration::from_secs(3600));
+ }
+ None
+ },
+ QueryEvent::Finished(out) => Some(out),
+ }
+ })
+ .into_future()
+ .map_err(|(err, _)| err)
+ .map(|(out, _)| out.unwrap())
.and_then(|out| {
let local_hash = U512::from(my_peer_id.hash());
println!("Results of peer discovery for {:?}:", my_peer_id);