From 6897eca91f76aca0f337ca38f837eb28c1d6e7ac Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 7 Jun 2018 17:15:19 +0200 Subject: [PATCH] Lots of improvements to kademlia code (#243) * No longer panic when updating self peer ID in kbuckets * Minor code improvement in flush() * Small improvement to handle_find_node_req * expected_pongs no longer mut * Clean up KadServerInterface trait * find_node() returns an impl Future * Rework kad_server's API to remove the interface * Remove the error mapping in kad_bistream.split() * Use a name type in protocol.rs * respond() now takes an iter of Peers + add tests * Use concrete Future type in kad_server upgrade * Let the high level code decide the TTL of the addrs * Replace QueryInterface::send with find_node_rpc * Replace KademliaProcessingFuture with KademliaPeerReqStream * requested_peers() now returns an iter * gen_random_id() only requires &PeerId * Remove QueryInterface and return stream of events * Remove add_peer_addrs from query * Remove the peer_store and record_store params * Tweak multiaddresses reportin * Remove dependency on peerstore * Fix tests --- kad/Cargo.toml | 1 - kad/src/high_level.rs | 330 ++++++++++---------- kad/src/kad_server.rs | 606 ++++++++++++++++++++---------------- kad/src/kbucket.rs | 29 +- kad/src/lib.rs | 5 +- kad/src/protocol.rs | 45 +-- kad/src/query.rs | 195 ++++++------ libp2p/examples/kademlia.rs | 49 ++- 8 files changed, 681 insertions(+), 579 deletions(-) diff --git a/kad/Cargo.toml b/kad/Cargo.toml index 0660f65f..d8ac4475 100644 --- a/kad/Cargo.toml +++ b/kad/Cargo.toml @@ -12,7 +12,6 @@ datastore = { path = "../datastore" } fnv = "1.0" futures = "0.1" libp2p-identify = { path = "../identify" } -libp2p-peerstore = { path = "../peerstore" } libp2p-ping = { path = "../ping" } libp2p-core = { path = "../core" } log = "0.4" diff --git a/kad/src/high_level.rs b/kad/src/high_level.rs index 6266ff4e..a4e26628 100644 --- a/kad/src/high_level.rs +++ b/kad/src/high_level.rs @@ -25,20 +25,19 @@ use bytes::Bytes; use fnv::FnvHashMap; use futures::sync::oneshot; -use futures::{self, future, Future}; -use kad_server::{KadServerInterface, KademliaServerConfig, KademliaServerController}; +use futures::{self, future, Future, Stream}; +use kad_server::{KademliaServerConfig, KademliaServerController, KademliaIncomingRequest, KademliaFindNodeRespond}; use kbucket::{KBucketsPeerId, KBucketsTable, UpdateOutcome}; -use libp2p_peerstore::{PeerAccess, PeerId, Peerstore}; -use libp2p_core::{ConnectionUpgrade, Endpoint, MuxedTransport, SwarmController, Transport}; -use multiaddr::Multiaddr; +use libp2p_core::{ConnectionUpgrade, Endpoint, MuxedTransport, PeerId, SwarmController, Transport}; +use multiaddr::{AddrComponent, Multiaddr}; use parking_lot::Mutex; -use protocol::ConnectionType; +use protocol::Peer; use query; use std::collections::hash_map::Entry; use std::fmt; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; -use std::ops::Deref; +use std::slice::Iter as SliceIter; use std::sync::Arc; use std::time::Duration; use tokio_io::{AsyncRead, AsyncWrite}; @@ -46,45 +45,60 @@ use tokio_timer; /// Prototype for a future Kademlia protocol running on a socket. #[derive(Debug, Clone)] -pub struct KademliaConfig { +pub struct KademliaConfig { /// Degree of parallelism on the network. Often called `alpha` in technical papers. /// No more than this number of remotes will be used at a given time for any given operation. // TODO: ^ share this number between operations? or does each operation use `alpha` remotes? pub parallelism: u32, - /// Used to load and store data requests of peers. - // TODO: say that must implement the `Recordstore` trait. - pub record_store: R, - /// Used to load and store information about peers. - pub peer_store: P, /// Id of the local peer. pub local_peer_id: PeerId, /// When contacting a node, duration after which we consider it unresponsive. pub timeout: Duration, } -/// Object that allows one to make queries on the Kademlia system. -#[derive(Debug)] -pub struct KademliaControllerPrototype { - inner: Arc>, +// Builds a `QueryParams` that fetches information from `$controller`. +// +// Because of lifetime issues and type naming issues, a macro is the most convenient solution. +macro_rules! gen_query_params { + ($controller:expr) => {{ + let controller = $controller; + query::QueryParams { + local_id: $controller.inner.kbuckets.my_id().clone(), + kbuckets_find_closest: { + let controller = controller.clone(); + move |addr| controller.inner.kbuckets.find_closest(&addr).collect() + }, + parallelism: $controller.inner.parallelism, + find_node: { + let controller = controller.clone(); + move |addr, searched| { + // TODO: rewrite to be more direct + Box::new(controller.send(addr, move |ctl| ctl.find_node(&searched)).flatten()) as Box<_> + } + }, + } + }}; } -impl KademliaControllerPrototype -where - P: Deref, - for<'r> &'r Pc: Peerstore, -{ +/// Object that allows one to make queries on the Kademlia system. +#[derive(Debug)] +pub struct KademliaControllerPrototype { + inner: Arc, +} + +impl KademliaControllerPrototype { /// Creates a new controller from that configuration. - pub fn new(config: KademliaConfig) -> KademliaControllerPrototype { + pub fn new(config: KademliaConfig, initial_peers: I) -> KademliaControllerPrototype + where I: IntoIterator + { let buckets = KBucketsTable::new(config.local_peer_id.clone(), config.timeout); - for peer_id in config.peer_store.deref().peers() { + for peer_id in initial_peers { let _ = buckets.update(peer_id, ()); } let inner = Arc::new(Inner { kbuckets: buckets, timer: tokio_timer::wheel().build(), - record_store: config.record_store, - peer_store: config.peer_store, connections: Default::default(), timeout: config.timeout, parallelism: config.parallelism as usize, @@ -96,24 +110,21 @@ where /// Turns the prototype into an actual controller by feeding it a swarm controller. /// /// You must pass to this function the transport to use to dial and obtain - /// `KademliaProcessingFuture`, plus a mapping function that will turn the - /// `KademliaProcessingFuture` into whatever the swarm expects. + /// `KademliaPeerReqStream`, plus a mapping function that will turn the + /// `KademliaPeerReqStream` into whatever the swarm expects. pub fn start( self, swarm: SwarmController, kademlia_transport: K, map: M, ) -> ( - KademliaController, + KademliaController, Box>, ) where - P: Clone + Deref + 'static, // TODO: 'static :-/ - for<'r> &'r Pc: Peerstore, - R: Clone + 'static, // TODO: 'static :-/ T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - K: Transport + Clone + 'static, // TODO: 'static :-/ - M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static, + K: Transport + Clone + 'static, // TODO: 'static :-/ + M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, { // TODO: initialization @@ -126,7 +137,13 @@ where let init_future = { let futures: Vec<_> = (0..256) - .map(|n| query::refresh(controller.clone(), n)) + .map({ + let controller = controller.clone(); + move |n| query::refresh(gen_query_params!(controller.clone()), n) + }) + .map(|stream| { + stream.for_each(|_| Ok(())) + }) .collect(); future::loop_fn(futures, |futures| { @@ -148,17 +165,17 @@ where /// Object that allows one to make queries on the Kademlia system. #[derive(Debug)] -pub struct KademliaController +pub struct KademliaController where T: MuxedTransport + 'static, // TODO: 'static :-/ { - inner: Arc>, + inner: Arc, swarm_controller: SwarmController, kademlia_transport: K, map: M, } -impl Clone for KademliaController +impl Clone for KademliaController where T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ K: Clone, @@ -175,11 +192,8 @@ where } } -impl KademliaController +impl KademliaController where - P: Deref, - for<'r> &'r Pc: Peerstore, - R: Clone, T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ { /// Performs an iterative find node query on the network. @@ -193,55 +207,51 @@ where pub fn find_node( &self, searched_key: PeerId, - ) -> Box, Error = IoError>> + ) -> Box>, Error = IoError>> where - P: Clone + 'static, - R: 'static, - K: Transport + Clone + 'static, - M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static, // TODO: 'static :-/ + K: Transport + Clone + 'static, + M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/ { - query::find_node(self.clone(), searched_key) + let me = self.clone(); + query::find_node(gen_query_params!(me.clone()), searched_key) } } /// Connection upgrade to the Kademlia protocol. #[derive(Clone)] -pub struct KademliaUpgrade { - inner: Arc>, - upgrade: KademliaServerConfig>>, +pub struct KademliaUpgrade { + inner: Arc, + upgrade: KademliaServerConfig, } -impl KademliaUpgrade { +impl KademliaUpgrade { /// Builds a connection upgrade from the controller. #[inline] - pub fn from_prototype(proto: &KademliaControllerPrototype) -> Self { + pub fn from_prototype(proto: &KademliaControllerPrototype) -> Self { KademliaUpgrade { inner: proto.inner.clone(), - upgrade: KademliaServerConfig::new(proto.inner.clone()), + upgrade: KademliaServerConfig::new(), } } /// Builds a connection upgrade from the controller. #[inline] - pub fn from_controller(ctl: &KademliaController) -> Self + pub fn from_controller(ctl: &KademliaController) -> Self where T: MuxedTransport, { KademliaUpgrade { inner: ctl.inner.clone(), - upgrade: KademliaServerConfig::new(ctl.inner.clone()), + upgrade: KademliaServerConfig::new(), } } } -impl ConnectionUpgrade for KademliaUpgrade +impl ConnectionUpgrade for KademliaUpgrade where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ - P: Deref + Clone + 'static, // TODO: 'static :-/ - for<'r> &'r Pc: Peerstore, - R: 'static, // TODO: 'static :-/ { - type Output = KademliaProcessingFuture; + type Output = KademliaPeerReqStream; type Future = Box>; type NamesIter = iter::Once<(Bytes, ())>; type UpgradeIdentifier = (); @@ -256,8 +266,33 @@ where let inner = self.inner; let client_addr = addr.clone(); + let peer_id = { + let mut iter = addr.iter(); + let protocol = iter.next(); + let after_proto = iter.next(); + match (protocol, after_proto) { + (Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => { + match PeerId::from_bytes(key) { + Ok(id) => id, + Err(_) => { + let err = IoError::new( + IoErrorKind::InvalidData, + "invalid peer ID sent by remote identification", + ); + return Box::new(future::err(err)); + } + } + } + _ => { + let err = + IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node"); + return Box::new(future::err(err)); + } + } + }; + let future = self.upgrade.upgrade(incoming, id, endpoint, addr).map( - move |(controller, future)| { + move |(controller, stream)| { match inner.connections.lock().entry(client_addr) { Entry::Occupied(mut entry) => { match entry.insert(Connection::Active(controller)) { @@ -285,7 +320,43 @@ where } }; - KademliaProcessingFuture { inner: future } + let stream = stream.map(move |query| { + match inner.kbuckets.update(peer_id.clone(), ()) { + UpdateOutcome::NeedPing(node_to_ping) => { + // TODO: do something with this info + println!("need to ping {:?}", node_to_ping); + } + _ => (), + } + + match query { + KademliaIncomingRequest::FindNode { searched, responder } => { + let mut intermediate: Vec<_> = inner.kbuckets.find_closest(&searched).collect(); + let my_id = inner.kbuckets.my_id().clone(); + if let Some(pos) = intermediate + .iter() + .position(|e| e.distance_with(&searched) >= my_id.distance_with(&searched)) + { + if intermediate[pos] != my_id { + intermediate.insert(pos, my_id); + } + } else { + intermediate.push(my_id); + } + + Some(KademliaPeerReq { + requested_peers: intermediate, + inner: responder, + }) + }, + KademliaIncomingRequest::PingPong => { + // We updated the k-bucket above. + None + }, + } + }).filter_map(|val| val); + + KademliaPeerReqStream { inner: Box::new(stream) } }, ); @@ -293,24 +364,49 @@ where } } -/// Future that must be processed for the Kademlia system to work. -pub struct KademliaProcessingFuture { - inner: Box>, +/// Stream that must be processed for the Kademlia system to work. +/// +/// Produces requests for peer information. These requests should be answered for the stream to +/// continue to progress. +pub struct KademliaPeerReqStream { + inner: Box>, } -impl Future for KademliaProcessingFuture { - type Item = (); +impl Stream for KademliaPeerReqStream { + type Item = KademliaPeerReq; type Error = IoError; #[inline] - fn poll(&mut self) -> futures::Poll { + fn poll(&mut self) -> futures::Poll, Self::Error> { self.inner.poll() } } +/// Request for information about some peers. +pub struct KademliaPeerReq { + inner: KademliaFindNodeRespond, + requested_peers: Vec, +} + +impl KademliaPeerReq { + /// Returns a list of the IDs of the peers that were requested. + #[inline] + pub fn requested_peers(&self) -> SliceIter { + self.requested_peers.iter() + } + + /// Responds to the request. + #[inline] + pub fn respond(self, peers: I) + where I: IntoIterator + { + self.inner.respond(peers); + } +} + // Inner struct shared throughout the Kademlia system. #[derive(Debug)] -struct Inner { +struct Inner { // The remotes are identified by their public keys. kbuckets: KBucketsTable, @@ -323,12 +419,6 @@ struct Inner { // Same as in the config. parallelism: usize, - // Same as in the config. - record_store: R, - - // Same as in the config. - peer_store: P, - // List of open connections with remotes. // // Since the keys are the nodes' multiaddress, it is expected that each node only has one @@ -363,94 +453,12 @@ impl fmt::Debug for Connection { } } -impl KadServerInterface for Arc> +impl KademliaController where - P: Deref, - for<'r> &'r Pc: Peerstore, -{ - #[inline] - fn local_id(&self) -> &PeerId { - self.kbuckets.my_id() - } - - fn peer_info(&self, peer_id: &PeerId) -> (Vec, ConnectionType) { - let addrs = self.peer_store - .peer(peer_id) - .into_iter() - .flat_map(|p| p.addrs()) - .collect::>(); - (addrs, ConnectionType::Connected) // ConnectionType meh :-/ - } - - #[inline] - fn kbuckets_update(&self, peer: &PeerId) { - // TODO: is this the right place for this check? - if peer == self.kbuckets.my_id() { - return; - } - - match self.kbuckets.update(peer.clone(), ()) { - UpdateOutcome::NeedPing(node_to_ping) => { - // TODO: return this info somehow - println!("need to ping {:?}", node_to_ping); - } - _ => (), - } - } - - #[inline] - fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec { - let mut intermediate: Vec<_> = self.kbuckets.find_closest(addr).collect(); - let my_id = self.kbuckets.my_id().clone(); - if let Some(pos) = intermediate - .iter() - .position(|e| e.distance_with(&addr) >= my_id.distance_with(&addr)) - { - if intermediate[pos] != my_id { - intermediate.insert(pos, my_id); - } - } else { - intermediate.push(my_id); - } - intermediate - } -} - -impl query::QueryInterface for KademliaController -where - P: Clone + Deref + 'static, // TODO: 'static :-/ - for<'r> &'r Pc: Peerstore, - R: Clone + 'static, // TODO: 'static :-/ T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - K: Transport + Clone + 'static, // TODO: 'static - M: FnOnce(KademliaProcessingFuture) -> T::Output + Clone + 'static, // TODO: 'static :-/ + K: Transport + Clone + 'static, // TODO: 'static + M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/ { - #[inline] - fn local_id(&self) -> &PeerId { - self.inner.kbuckets.my_id() - } - - #[inline] - fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec { - self.inner.kbuckets.find_closest(addr).collect() - } - - #[inline] - fn peer_add_addrs(&self, peer: &PeerId, multiaddrs: I, ttl: Duration) - where - I: Iterator, - { - self.inner - .peer_store - .peer_or_create(peer) - .add_addrs(multiaddrs, ttl); - } - - #[inline] - fn parallelism(&self) -> usize { - self.inner.parallelism - } - #[inline] fn send( &self, diff --git a/kad/src/kad_server.rs b/kad/src/kad_server.rs index eb5b6f72..b37b45a4 100644 --- a/kad/src/kad_server.rs +++ b/kad/src/kad_server.rs @@ -23,79 +23,57 @@ //! //! # Usage //! -//! - Implement the `KadServerInterface` trait on something clonable (usually an `Arc`). -//! -//! - Create a `KademliaServerConfig` object from that interface. This struct implements -//! `ConnectionUpgrade`. +//! - Create a `KademliaServerConfig` object. This struct implements `ConnectionUpgrade`. //! //! - Update a connection through that `KademliaServerConfig`. The output yields you a -//! `KademliaServerController` and a future that must be driven to completion. The controller -//! allows you to perform queries and receive responses. +//! `KademliaServerController` and a stream that must be driven to completion. The controller +//! allows you to perform queries and receive responses. The stream produces incoming requests +//! from the remote. //! //! This `KademliaServerController` is usually extracted and stored in some sort of hash map in an //! `Arc` in order to be available whenever we need to request something from a node. use bytes::Bytes; use futures::sync::{mpsc, oneshot}; -use futures::{future, Future, Sink, Stream}; -use libp2p_peerstore::PeerId; -use libp2p_core::ConnectionUpgrade; -use libp2p_core::Endpoint; -use multiaddr::{AddrComponent, Multiaddr}; +use futures::{future, Future, Sink, stream, Stream}; +use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId}; +use multiaddr::Multiaddr; use protocol::{self, KadMsg, KademliaProtocolConfig, Peer}; use std::collections::VecDeque; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; +use std::sync::{Arc, atomic}; use tokio_io::{AsyncRead, AsyncWrite}; -/// Interface that this server system uses to communicate with the rest of the system. -pub trait KadServerInterface: Clone { - /// Returns the peer ID of the local node. - fn local_id(&self) -> &PeerId; - - /// Returns known information about the peer. Not atomic/thread-safe in the sense that - /// information can change immediately after being returned and before they are processed. - fn peer_info(&self, _: &PeerId) -> (Vec, protocol::ConnectionType); - - /// Updates an entry in the K-Buckets. Called whenever that peer sends us a message. - fn kbuckets_update(&self, peer: &PeerId); - - /// Finds the nodes closest to a peer ID. - fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec; -} - /// Configuration for a Kademlia server. /// /// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KademliaServerController` /// and a `Future`. The controller lets you send queries to the remote and receive answers, while /// the `Future` must be driven to completion in order for things to work. #[derive(Debug, Clone)] -pub struct KademliaServerConfig { +pub struct KademliaServerConfig { raw_proto: KademliaProtocolConfig, - interface: I, } -impl KademliaServerConfig { +impl KademliaServerConfig { /// Builds a configuration object for an upcoming Kademlia server. #[inline] - pub fn new(interface: I) -> Self { + pub fn new() -> Self { KademliaServerConfig { raw_proto: KademliaProtocolConfig, - interface: interface, } } } -impl ConnectionUpgrade for KademliaServerConfig +impl ConnectionUpgrade for KademliaServerConfig where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ - I: KadServerInterface + 'static, // TODO: 'static :-/ { type Output = ( KademliaServerController, - Box>, + Box>, ); - type Future = Box>; + type Future = future::Map<>::Future, fn(>::Output) -> Self::Output>; type NamesIter = iter::Once<(Bytes, ())>; type UpgradeIdentifier = (); @@ -106,42 +84,9 @@ where #[inline] fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: &Multiaddr) -> Self::Future { - let peer_id = { - let mut iter = addr.iter(); - let protocol = iter.next(); - let after_proto = iter.next(); - match (protocol, after_proto) { - (Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => { - match PeerId::from_bytes(key) { - Ok(id) => id, - Err(_) => { - let err = IoError::new( - IoErrorKind::InvalidData, - "invalid peer ID sent by remote identification", - ); - return Box::new(future::err(err)); - } - } - } - _ => { - let err = - IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node"); - return Box::new(future::err(err)); - } - } - }; - - let interface = self.interface; - let future = self.raw_proto + self.raw_proto .upgrade(incoming, id, endpoint, addr) - .map(move |connec| { - let (tx, rx) = mpsc::unbounded(); - let future = kademlia_handler(connec, peer_id, rx, interface); - let controller = KademliaServerController { inner: tx }; - (controller, future) - }); - - Box::new(future) as Box<_> + .map(build_from_sink_stream) } } @@ -161,7 +106,7 @@ impl KademliaServerController { pub fn find_node( &self, searched_key: &PeerId, - ) -> Box, Error = IoError>> { + ) -> impl Future, Error = IoError> { let message = protocol::KadMsg::FindNodeReq { key: searched_key.clone().into_bytes(), }; @@ -175,7 +120,8 @@ impl KademliaServerController { IoErrorKind::ConnectionAborted, "connection to remote has aborted", )); - return Box::new(fut) as Box<_>; + + return future::Either::B(fut); } }; @@ -192,15 +138,14 @@ impl KademliaServerController { )), }); - Box::new(future) as Box<_> + future::Either::A(future) } /// Sends a `PING` query to the node. Because of the way the protocol is designed, there is /// no way to differentiate between a ping and a pong. Therefore this function doesn't return a - /// future, and the only way to be notified of the result is through the `kbuckets_update` - /// method in the `KadServerInterface` trait. + /// future, and the only way to be notified of the result is through the stream. pub fn ping(&self) -> Result<(), IoError> { - // Dummy channel. + // Dummy channel, as the `tx` is going to be dropped anyway. let (tx, _rx) = oneshot::channel(); match self.inner.unbounded_send((protocol::KadMsg::Ping, tx)) { Ok(()) => Ok(()), @@ -212,248 +157,359 @@ impl KademliaServerController { } } +/// Request received from the remote. +pub enum KademliaIncomingRequest { + /// Find the nodes closest to `searched`. + FindNode { + /// The value being searched. + searched: PeerId, + /// Object to use to respond to the request. + responder: KademliaFindNodeRespond, + }, + + // TODO: PutValue and FindValue + + /// Received either a ping or a pong. + PingPong, +} + +/// Object used to respond to `FindNode` queries from remotes. +pub struct KademliaFindNodeRespond { + inner: oneshot::Sender, +} + +impl KademliaFindNodeRespond { + /// Respond to the `FindNode` request. + pub fn respond(self, peers: I) + where I: IntoIterator + { + let _ = self.inner.send(KadMsg::FindNodeRes { + closer_peers: peers.into_iter().collect() + }); + } +} + +// Builds a controller and stream from a stream/sink of raw messages. +fn build_from_sink_stream<'a, S>(connec: S) -> (KademliaServerController, Box + 'a>) +where S: Sink + Stream + 'a +{ + let (tx, rx) = mpsc::unbounded(); + let future = kademlia_handler(connec, rx); + let controller = KademliaServerController { inner: tx }; + (controller, future) +} + // Handles a newly-opened Kademlia stream with a remote peer. // // Takes a `Stream` and `Sink` of Kademlia messages representing the connection to the client, -// plus the ID of the peer that we are handling, plus a `Receiver` that will receive messages to -// transmit to that connection, plus the interface. +// plus a `Receiver` that will receive messages to transmit to that connection. // -// Returns a `Future` that must be resolved in order for progress to work. It will never yield any -// item (unless both `rx` and `kad_bistream` are closed) but will propagate any I/O of protocol -// error that could happen. If the `Receiver` closes, no error is generated. -fn kademlia_handler<'a, S, I>( +// Returns a `Stream` that must be resolved in order for progress to work. The `Stream` will +// produce objects that represent the requests sent by the remote. These requests must be answered +// immediately before the stream continues to produce items. +fn kademlia_handler<'a, S>( kad_bistream: S, - peer_id: PeerId, - rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender)>, - interface: I, -) -> Box + 'a> + rq_rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender)>, +) -> Box + 'a> where S: Stream + Sink + 'a, - I: KadServerInterface + Clone + 'a, { - let (kad_sink, kad_stream) = kad_bistream - .sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) - .map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) - .split(); + let (kad_sink, kad_stream) = kad_bistream.split(); - // We combine `kad_stream` and `rx` into one so that the loop wakes up whenever either - // generates something. - let messages = rx.map(|(m, o)| (m, Some(o))) - .map_err(|_| unreachable!()) - .select(kad_stream.map(|m| (m, None))); + // This is a stream of futures containing local responses. + // Every time we receive a request from the remote, we create a `oneshot::channel()` and send + // the receiving end to `responders_tx`. + // This way, if a future is available on `responders_rx`, we block until it produces the + // response. + let (responders_tx, responders_rx) = mpsc::unbounded(); - // Loop forever. - let future = future::loop_fn( - (kad_sink, messages, VecDeque::new(), 0), - move |(kad_sink, messages, mut send_back_queue, mut expected_pongs)| { - let interface = interface.clone(); - let peer_id = peer_id.clone(); + // Will be set to true if either `kad_stream` or `rq_rx` is closed. + let finished = Arc::new(atomic::AtomicBool::new(false)); - // The `send_back_queue` is a queue of `UnboundedSender`s in the correct order of - // expected answers. - // Whenever we send a message to the remote and this message expects a response, we - // push the sender to the end of `send_back_queue`. Whenever a remote sends us a - // response, we pop the first element of `send_back_queue`. + // We combine all the streams into one so that the loop wakes up whenever any generates + // something. + enum EventSource { + Remote(KadMsg), + LocalRequest(KadMsg, oneshot::Sender), + LocalResponse(oneshot::Receiver), + Finished, + } - // The value of `expected_pongs` is the number of PING requests that we sent and that - // haven't been answered by the remote yet. Because of the way the protocol is designed, - // there is no way to differentiate between a ping and a pong. Therefore whenever we - // send a ping request we suppose that the next ping we receive is an answer, even - // though that may not be the case in reality. - // Because of this behaviour, pings do not pop from the `send_back_queue`. + let events = { + let responders = responders_rx + .map(|m| EventSource::LocalResponse(m)) + .map_err(|_| unreachable!()); + let rq_rx = rq_rx + .map(|(m, o)| EventSource::LocalRequest(m, o)) + .map_err(|_| unreachable!()) + .chain({ + let finished = finished.clone(); + future::lazy(move || { + finished.store(true, atomic::Ordering::SeqCst); + Ok(EventSource::Finished) + }).into_stream() + }); + let kad_stream = kad_stream + .map(|m| EventSource::Remote(m)) + .chain({ + let finished = finished.clone(); + future::lazy(move || { + finished.store(true, atomic::Ordering::SeqCst); + Ok(EventSource::Finished) + }).into_stream() + }); + responders.select(rq_rx).select(kad_stream) + }; - messages + let stream = stream::unfold((events, kad_sink, responders_tx, VecDeque::new(), 0u32), + move |(events, kad_sink, responders_tx, mut send_back_queue, expected_pongs)| { + if finished.load(atomic::Ordering::SeqCst) { + return None; + } + + Some(events .into_future() .map_err(|(err, _)| err) - .and_then(move |(message, rest)| { - if let Some((_, None)) = message { - // If we received a message from the remote (as opposed to a message from - // `rx`) then we update the k-buckets. - interface.kbuckets_update(&peer_id); - } - + .and_then(move |(message, events)| -> Box> { match message { - None => { - // Both the connection stream and `rx` are empty, so we break the loop. - let future = future::ok(future::Loop::Break(())); - Box::new(future) as Box> - } - Some((message @ KadMsg::PutValue { .. }, Some(_))) => { - // A `PutValue` message has been received on `rx`. Contrary to other - // types of messages, this one doesn't expect any answer and therefore - // we ignore the sender. - let future = kad_sink.send(message).map(move |kad_sink| { - future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - )) - }); + Some(EventSource::Finished) | None => { + // `finished` should have been set to true earlier, causing this + // function to return `None`. + unreachable!() + }, + Some(EventSource::LocalResponse(message)) => { + let future = message + .map_err(|_| { + // The user destroyed the responder without responding. + warn!("Kad responder object destroyed without responding"); + panic!() // TODO: what to do here? we have to close the connection + }) + .and_then(move |message| { + kad_sink + .send(message) + .map(move |kad_sink| { + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs); + (None, state) + }) + }); + Box::new(future) + }, + Some(EventSource::LocalRequest(message @ KadMsg::PutValue { .. }, _)) => { + // A `PutValue` request. Contrary to other types of messages, this one + // doesn't expect any answer and therefore we ignore the sender. + let future = kad_sink + .send(message) + .map(move |kad_sink| { + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs); + (None, state) + }); Box::new(future) as Box<_> } - Some((message @ KadMsg::Ping { .. }, Some(_))) => { - // A `Ping` message has been received on `rx`. - expected_pongs += 1; - let future = kad_sink.send(message).map(move |kad_sink| { - future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - )) - }); + Some(EventSource::LocalRequest(message @ KadMsg::Ping { .. }, _)) => { + // A local `Ping` request. + let expected_pongs = expected_pongs.checked_add(1) + .expect("overflow in number of simultaneous pings"); + let future = kad_sink + .send(message) + .map(move |kad_sink| { + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs); + (None, state) + }); Box::new(future) as Box<_> } - Some((message, Some(send_back))) => { - // Any message other than `PutValue` or `Ping` has been received on - // `rx`. Send it to the remote. - let future = kad_sink.send(message).map(move |kad_sink| { - send_back_queue.push_back(send_back); - future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - )) - }); + Some(EventSource::LocalRequest(message, send_back)) => { + // Any local request other than `PutValue` or `Ping`. + send_back_queue.push_back(send_back); + let future = kad_sink + .send(message) + .map(move |kad_sink| { + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs); + (None, state) + }); Box::new(future) as Box<_> } - Some((KadMsg::Ping, None)) => { - // Note: The way the protocol was designed, there is no way to - // differentiate between a ping and a pong. - if expected_pongs == 0 { - let message = KadMsg::Ping; - let future = kad_sink.send(message).map(move |kad_sink| { - future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - )) + Some(EventSource::Remote(KadMsg::Ping)) => { + // The way the protocol was designed, there is no way to differentiate + // between a ping and a pong. + if let Some(expected_pongs) = expected_pongs.checked_sub(1) { + // Maybe we received a PONG, or maybe we received a PONG, no way + // to tell. If it was a PING and we expected a PONG, then the + // remote will see its PING answered only when it PONGs us. + let future = future::ok({ + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs); + let rq = KademliaIncomingRequest::PingPong; + (Some(rq), state) }); Box::new(future) as Box<_> } else { - expected_pongs -= 1; - let future = future::ok({ - future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - )) - }); + let future = kad_sink + .send(KadMsg::Ping) + .map(move |kad_sink| { + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs); + let rq = KademliaIncomingRequest::PingPong; + (Some(rq), state) + }); Box::new(future) as Box<_> } } - Some((message @ KadMsg::FindNodeRes { .. }, None)) - | Some((message @ KadMsg::GetValueRes { .. }, None)) => { + Some(EventSource::Remote(message @ KadMsg::FindNodeRes { .. })) + | Some(EventSource::Remote(message @ KadMsg::GetValueRes { .. })) => { // `FindNodeRes` or `GetValueRes` received on the socket. // Send it back through `send_back_queue`. if let Some(send_back) = send_back_queue.pop_front() { let _ = send_back.send(message); - let future = future::ok(future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - ))); - return Box::new(future) as Box<_>; + let future = future::ok({ + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs); + (None, state) + }); + Box::new(future) } else { + debug!("Remote sent a Kad response but we didn't request anything"); let future = future::err(IoErrorKind::InvalidData.into()); - return Box::new(future) as Box<_>; + Box::new(future) } } - Some((KadMsg::FindNodeReq { key, .. }, None)) => { - // `FindNodeReq` received on the socket. - let message = handle_find_node_req(&interface, &key); - let future = kad_sink.send(message).map(move |kad_sink| { - future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - )) - }); - Box::new(future) as Box<_> - } - Some((KadMsg::GetValueReq { key, .. }, None)) => { - // `GetValueReq` received on the socket. - let message = handle_get_value_req(&interface, &key); - let future = kad_sink.send(message).map(move |kad_sink| { - future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - )) - }); - Box::new(future) as Box<_> - } - Some((KadMsg::PutValue { .. }, None)) => { - // `PutValue` received on the socket. - handle_put_value_req(&interface); + Some(EventSource::Remote(KadMsg::FindNodeReq { key, .. })) => { + let peer_id = match PeerId::from_bytes(key) { + Ok(id) => id, + Err(key) => { + debug!("Ignoring FIND_NODE request with invalid key: {:?}", key); + let future = future::err(IoError::new(IoErrorKind::InvalidData, "invalid key in FIND_NODE")); + return Box::new(future); + } + }; + + let (tx, rx) = oneshot::channel(); + let _ = responders_tx.unbounded_send(rx); let future = future::ok({ - future::Loop::Continue(( - kad_sink, - rest, - send_back_queue, - expected_pongs, - )) + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs); + let rq = KademliaIncomingRequest::FindNode { + searched: peer_id, + responder: KademliaFindNodeRespond { + inner: tx + } + }; + (Some(rq), state) }); - Box::new(future) as Box<_> + + Box::new(future) + } + Some(EventSource::Remote(KadMsg::GetValueReq { .. })) => { + unimplemented!() // FIXME: + } + Some(EventSource::Remote(KadMsg::PutValue { .. })) => { + unimplemented!() // FIXME: } } - }) - }, - ); + })) + }).filter_map(|val| val); - Box::new(future) as Box> + Box::new(stream) as Box> } -// Builds a `KadMsg` that handles a `FIND_NODE` request received from the remote. -fn handle_find_node_req(interface: &I, requested_key: &[u8]) -> KadMsg -where - I: ?Sized + KadServerInterface, -{ - let peer_id = match PeerId::from_bytes(requested_key.to_vec()) { - // TODO: suboptimal - Ok(id) => id, - Err(_) => { - return KadMsg::FindNodeRes { - closer_peers: vec![], - } +#[cfg(test)] +mod tests { + use std::io::Error as IoError; + use std::iter; + use futures::{Future, Poll, Sink, StartSend, Stream}; + use futures::sync::mpsc; + use kad_server::{self, KademliaIncomingRequest, KademliaServerController}; + use libp2p_core::PublicKeyBytes; + use protocol::{ConnectionType, Peer}; + use rand; + + // This struct merges a stream and a sink and is quite useful for tests. + struct Wrapper(St, Si); + impl Stream for Wrapper + where + St: Stream, + { + type Item = St::Item; + type Error = St::Error; + fn poll(&mut self) -> Poll, Self::Error> { + self.0.poll() } - }; + } + impl Sink for Wrapper + where + Si: Sink, + { + type SinkItem = Si::SinkItem; + type SinkError = Si::SinkError; + fn start_send( + &mut self, + item: Self::SinkItem, + ) -> StartSend { + self.1.start_send(item) + } + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.1.poll_complete() + } + } - let closer_peers = interface - .kbuckets_find_closest(&peer_id) - .into_iter() - .map(|peer| { - let (multiaddrs, connection_ty) = interface.peer_info(&peer); - protocol::Peer { - node_id: peer, - multiaddrs: multiaddrs, - connection_ty: connection_ty, - } - }) - .collect(); + fn build_test() -> (KademliaServerController, impl Stream, KademliaServerController, impl Stream) { + let (a_to_b, b_from_a) = mpsc::unbounded(); + let (b_to_a, a_from_b) = mpsc::unbounded(); - KadMsg::FindNodeRes { closer_peers } -} - -// Builds a `KadMsg` that handles a `FIND_VALUE` request received from the remote. -fn handle_get_value_req(_interface: &I, _requested_key: &[u8]) -> KadMsg -where - I: ?Sized + KadServerInterface, -{ - unimplemented!() -} - -// Handles a `STORE` request received from the remote. -fn handle_put_value_req(_interface: &I) -where - I: ?Sized + KadServerInterface, -{ - unimplemented!() + let sink_stream_a = Wrapper(a_from_b, a_to_b) + .map_err(|_| panic!()).sink_map_err(|_| panic!()); + let sink_stream_b = Wrapper(b_from_a, b_to_a) + .map_err(|_| panic!()).sink_map_err(|_| panic!()); + + let (controller_a, stream_events_a) = kad_server::build_from_sink_stream(sink_stream_a); + let (controller_b, stream_events_b) = kad_server::build_from_sink_stream(sink_stream_b); + (controller_a, stream_events_a, controller_b, stream_events_b) + } + + #[test] + fn ping_response() { + let (controller_a, stream_events_a, _controller_b, stream_events_b) = build_test(); + + controller_a.ping().unwrap(); + + let streams = stream_events_a.map(|ev| (ev, "a")) + .select(stream_events_b.map(|ev| (ev, "b"))); + match streams.into_future().map_err(|(err, _)| err).wait().unwrap() { + (Some((KademliaIncomingRequest::PingPong, "b")), _) => {}, + _ => panic!() + } + } + + #[test] + fn find_node_response() { + let (controller_a, stream_events_a, _controller_b, stream_events_b) = build_test(); + + let random_peer_id = { + let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::>(); + PublicKeyBytes(buf).to_peer_id() + }; + + let find_node_fut = controller_a.find_node(&random_peer_id); + + let example_response = Peer { + node_id: { + let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::>(); + PublicKeyBytes(buf).to_peer_id() + }, + multiaddrs: Vec::new(), + connection_ty: ConnectionType::Connected, + }; + + let streams = stream_events_a.map(|ev| (ev, "a")) + .select(stream_events_b.map(|ev| (ev, "b"))); + + let streams = match streams.into_future().map_err(|(err, _)| err).wait().unwrap() { + (Some((KademliaIncomingRequest::FindNode { searched, responder }, "b")), streams) => { + assert_eq!(searched, random_peer_id); + responder.respond(iter::once(example_response.clone())); + streams + }, + _ => panic!() + }; + + let resp = streams.into_future().map_err(|(err, _)| err).map(|_| unreachable!()) + .select(find_node_fut) + .map_err(|_| -> IoError { panic!() }); + assert_eq!(resp.wait().unwrap().0, vec![example_response]); + } } diff --git a/kad/src/kbucket.rs b/kad/src/kbucket.rs index 3478f897..6408bd01 100644 --- a/kad/src/kbucket.rs +++ b/kad/src/kbucket.rs @@ -29,7 +29,7 @@ use arrayvec::ArrayVec; use bigint::U512; -use libp2p_peerstore::PeerId; +use libp2p_core::PeerId; use parking_lot::{Mutex, MutexGuard}; use std::mem; use std::slice::Iter as SliceIter; @@ -89,11 +89,12 @@ impl KBucket { // If a node is pending and the timeout has expired, removes the first element of `nodes` // and pushes back the node in `pending_node`. fn flush(&mut self, timeout: Duration) { - if let Some((_, instant)) = self.pending_node { + if let Some((pending_node, instant)) = self.pending_node.take() { if instant.elapsed() >= timeout { - let (pending_node, _) = self.pending_node.take().unwrap(); let _ = self.nodes.remove(0); self.nodes.push(pending_node); + } else { + self.pending_node = Some((pending_node, instant)); } } } @@ -209,15 +210,10 @@ where /// Marks the node as "most recent" in its bucket and modifies the value associated to it. /// This function should be called whenever we receive a communication from a node. - /// - /// # Panic - /// - /// Panics if `id` is equal to the local node ID. - /// pub fn update(&self, id: Id, value: Val) -> UpdateOutcome { let table = match self.bucket_num(&id) { Some(n) => &self.tables[n], - None => panic!("tried to update our own node in the kbuckets table"), + None => return UpdateOutcome::FailSelfUpdate, }; let mut table = table.lock(); @@ -272,10 +268,13 @@ pub enum UpdateOutcome { Added, /// The node was already in the bucket and has been refreshed. Refreshed(Val), - /// The node wasn't added. Instead we need to ping the node passed as parameter. + /// The node wasn't added. Instead we need to ping the node passed as parameter, and call + /// `update` if it responds. NeedPing(Id), /// The node wasn't added at all because a node was already pending. Discarded, + /// Tried to update the local peer ID. This is an invalid operation. + FailSelfUpdate, } /// Iterator giving access to a bucket. @@ -335,7 +334,7 @@ mod tests { extern crate rand; use self::rand::random; use kbucket::{KBucketsTable, UpdateOutcome, MAX_NODES_PER_BUCKET}; - use libp2p_peerstore::PeerId; + use libp2p_core::PeerId; use std::thread; use std::time::Duration; @@ -364,8 +363,7 @@ mod tests { } #[test] - #[should_panic(expected = "tried to update our own node in the kbuckets table")] - fn update_local_id_panic() { + fn update_local_id_fails() { let my_id = { let mut bytes = vec![random(); 34]; bytes[0] = 18; @@ -374,7 +372,10 @@ mod tests { }; let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5)); - let _ = table.update(my_id, ()); + match table.update(my_id, ()) { + UpdateOutcome::FailSelfUpdate => (), + _ => panic!() + } } #[test] diff --git a/kad/src/lib.rs b/kad/src/lib.rs index 75e65f63..8b48c3db 100644 --- a/kad/src/lib.rs +++ b/kad/src/lib.rs @@ -69,7 +69,6 @@ extern crate datastore; extern crate fnv; extern crate futures; extern crate libp2p_identify; -extern crate libp2p_peerstore; extern crate libp2p_ping; extern crate libp2p_core; #[macro_use] @@ -84,7 +83,9 @@ extern crate tokio_timer; extern crate varint; pub use self::high_level::{KademliaConfig, KademliaController, KademliaControllerPrototype}; -pub use self::high_level::{KademliaProcessingFuture, KademliaUpgrade}; +pub use self::high_level::{KademliaPeerReqStream, KademliaUpgrade, KademliaPeerReq}; +pub use self::protocol::{ConnectionType, Peer}; +pub use self::query::QueryEvent; mod high_level; mod kad_server; diff --git a/kad/src/protocol.rs b/kad/src/protocol.rs index 6dabb7d6..f10ceabe 100644 --- a/kad/src/protocol.rs +++ b/kad/src/protocol.rs @@ -25,16 +25,14 @@ //! The `Stream` component is used to poll the underlying transport, and the `Sink` component is //! used to send messages. -use bytes::Bytes; -use futures::future; -use futures::{Sink, Stream}; -use libp2p_peerstore::PeerId; -use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr}; +use bytes::{Bytes, BytesMut}; +use futures::{future, sink, Sink, stream, Stream}; +use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr, PeerId}; use protobuf::{self, Message}; use protobuf_structs; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; -use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::{AsyncRead, AsyncWrite, codec::Framed}; use varint::VarintCodec; #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] @@ -122,8 +120,7 @@ impl ConnectionUpgrade for KademliaProtocolConfig where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ { - type Output = - Box>; + type Output = KadStreamSink; type Future = future::FutureResult; type NamesIter = iter::Once<(Bytes, ())>; type UpgradeIdentifier = (); @@ -139,37 +136,26 @@ where } } +type KadStreamSink = stream::AndThen>>, IoError>, KadMsg, fn(KadMsg) -> Result, IoError>, Result, IoError>>, fn(BytesMut) -> Result, Result>; + // Upgrades a socket to use the Kademlia protocol. -fn kademlia_protocol<'a, S>( +fn kademlia_protocol( socket: S, -) -> Box + 'a> +) -> KadStreamSink where - S: AsyncRead + AsyncWrite + 'a, + S: AsyncRead + AsyncWrite, { - let wrapped = socket + socket .framed(VarintCodec::default()) .from_err::() - .with(|request| -> Result<_, IoError> { + .with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> { let proto_struct = msg_to_proto(request); Ok(proto_struct.write_to_bytes().unwrap()) // TODO: error? }) - .and_then(|bytes| { + .and_then:: _, _>(|bytes| { let response = protobuf::parse_from_bytes(&bytes)?; proto_to_msg(response) - }); - - Box::new(wrapped) -} - -/// Custom trait that derives `Sink` and `Stream`, so that we can box it. -pub trait KadStreamSink: - Stream + Sink -{ -} -impl KadStreamSink for T -where - T: Stream + Sink, -{ + }) } /// Message that we can send to a peer or received from a peer. @@ -307,8 +293,7 @@ mod tests { use self::libp2p_tcp_transport::TcpConfig; use self::tokio_core::reactor::Core; use futures::{Future, Sink, Stream}; - use libp2p_peerstore::PeerId; - use libp2p_core::{Transport, PublicKeyBytesSlice}; + use libp2p_core::{Transport, PeerId, PublicKeyBytesSlice}; use protocol::{ConnectionType, KadMsg, KademliaProtocolConfig, Peer}; use std::sync::mpsc; use std::thread; diff --git a/kad/src/query.rs b/kad/src/query.rs index 9d97d6b0..6098a63c 100644 --- a/kad/src/query.rs +++ b/kad/src/query.rs @@ -21,10 +21,9 @@ //! This module handles performing iterative queries about the network. use fnv::FnvHashSet; -use futures::{future, Future}; -use kad_server::KademliaServerController; +use futures::{future, Future, stream, Stream}; use kbucket::KBucketsPeerId; -use libp2p_peerstore::PeerId; +use libp2p_core::PeerId; use multiaddr::{AddrComponent, Multiaddr}; use protocol; use rand; @@ -32,78 +31,73 @@ use smallvec::SmallVec; use std::cmp::Ordering; use std::io::Error as IoError; use std::mem; -use std::time::Duration; -/// Interface that the query uses to communicate with the rest of the system. -pub trait QueryInterface: Clone { - /// Returns the peer ID of the local node. - fn local_id(&self) -> &PeerId; +/// Parameters of a query. Allows plugging the query-related code with the rest of the +/// infrastructure. +pub struct QueryParams { + /// Identifier of the local peer. + pub local_id: PeerId, + /// Called whenever we need to obtain the peers closest to a certain peer. + pub kbuckets_find_closest: FBuckets, + /// Level of parallelism for networking. If this is `N`, then we can dial `N` nodes at a time. + pub parallelism: usize, + /// Called whenever we want to send a `FIND_NODE` RPC query. + pub find_node: FFindNode, +} - /// Finds the nodes closest to a peer ID. - fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec; - - /// Adds new known multiaddrs for the given peer. - fn peer_add_addrs(&self, peer: &PeerId, multiaddrs: I, ttl: Duration) - where - I: Iterator; - - /// Returns the level of parallelism wanted for this query. - fn parallelism(&self) -> usize; - - /// Attempts to contact the given multiaddress, then calls `and_then` on success. Returns a - /// future that contains the output of `and_then`, or an error if we failed to contact the - /// remote. - // TODO: use HKTB once Rust supports that, to avoid boxing the future - fn send( - &self, - addr: Multiaddr, - and_then: F, - ) -> Box> - where - F: FnOnce(&KademliaServerController) -> FRet + 'static, - FRet: 'static; +/// Event that happens during a query. +#[derive(Debug, Clone)] +pub enum QueryEvent { + /// Learned about new mutiaddresses for the given peers. + NewKnownMultiaddrs(Vec<(PeerId, Vec)>), + /// Finished the processing of the query. Contains the result. + Finished(TOut), } /// Starts a query for an iterative `FIND_NODE` request. #[inline] -pub fn find_node<'a, I>( - query_interface: I, +pub fn find_node<'a, FBuckets, FFindNode>( + query_params: QueryParams, searched_key: PeerId, -) -> Box, Error = IoError> + 'a> +) -> Box>, Error = IoError> + 'a> where - I: QueryInterface + 'a, + FBuckets: Fn(PeerId) -> Vec + 'a + Clone, + FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone, { - query(query_interface, searched_key, 20) // TODO: constant + query(query_params, searched_key, 20) // TODO: constant } /// Refreshes a specific bucket by performing an iterative `FIND_NODE` on a random ID of this /// bucket. /// /// Returns a dummy no-op future if `bucket_num` is out of range. -pub fn refresh<'a, I>( - query_interface: I, +pub fn refresh<'a, FBuckets, FFindNode>( + query_params: QueryParams, bucket_num: usize, -) -> Box + 'a> +) -> Box, Error = IoError> + 'a> where - I: QueryInterface + 'a, + FBuckets: Fn(PeerId) -> Vec + 'a + Clone, + FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone, { - let peer_id = match gen_random_id(&query_interface, bucket_num) { + let peer_id = match gen_random_id(&query_params.local_id, bucket_num) { Ok(p) => p, - Err(()) => return Box::new(future::ok(())), + Err(()) => return Box::new(stream::once(Ok(QueryEvent::Finished(())))), }; - let future = find_node(query_interface, peer_id).map(|_| ()); - Box::new(future) as Box<_> + let stream = find_node(query_params, peer_id).map(|event| { + match event { + QueryEvent::NewKnownMultiaddrs(peers) => QueryEvent::NewKnownMultiaddrs(peers), + QueryEvent::Finished(_) => QueryEvent::Finished(()), + } + }); + + Box::new(stream) as Box<_> } // Generates a random `PeerId` that belongs to the given bucket. // // Returns an error if `bucket_num` is out of range. -fn gen_random_id(query_interface: &I, bucket_num: usize) -> Result -where - I: ?Sized + QueryInterface, -{ - let my_id = query_interface.local_id(); +fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result { let my_id_len = my_id.as_bytes().len(); // TODO: this 2 is magic here ; it is the length of the hash of the multihash @@ -134,22 +128,21 @@ where } // Generic query-performing function. -fn query<'a, I>( - query_interface: I, +fn query<'a, FBuckets, FFindNode>( + query_params: QueryParams, searched_key: PeerId, num_results: usize, -) -> Box, Error = IoError> + 'a> +) -> Box>, Error = IoError> + 'a> where - I: QueryInterface + 'a, + FBuckets: Fn(PeerId) -> Vec + 'a + Clone, + FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone, { debug!("Start query for {:?} ; num results = {}", searched_key, num_results); // State of the current iterative process. struct State<'a> { - // If true, we are still in the first step of the algorithm where we try to find the - // closest node. If false, then we are contacting the k closest nodes in order to fill the - // list with enough results. - looking_for_closer: bool, + // At which stage we are. + stage: Stage, // Final output of the iteration. result: Vec, // For each open connection, a future with the response of the remote. @@ -164,26 +157,55 @@ where failed_to_contact: FnvHashSet, } + // General stage of the state. + #[derive(Copy, Clone, PartialEq, Eq)] + enum Stage { + // We are still in the first step of the algorithm where we try to find the closest node. + FirstStep, + // We are contacting the k closest nodes in order to fill the list with enough results. + SecondStep, + // The results are complete, and the next stream iteration will produce the outcome. + FinishingNextIter, + // We are finished and the stream shouldn't return anything anymore. + Finished, + } + let initial_state = State { - looking_for_closer: true, + stage: Stage::FirstStep, result: Vec::with_capacity(num_results), current_attempts_fut: Vec::new(), current_attempts_addrs: SmallVec::new(), - pending_nodes: query_interface.kbuckets_find_closest(&searched_key), + pending_nodes: { + let kbuckets_find_closest = query_params.kbuckets_find_closest.clone(); + kbuckets_find_closest(searched_key.clone()) // TODO: suboptimal + }, failed_to_contact: Default::default(), }; - let parallelism = query_interface.parallelism(); + let parallelism = query_params.parallelism; // Start of the iterative process. - let stream = future::loop_fn(initial_state, move |mut state| { + let stream = stream::unfold(initial_state, move |mut state| -> Option<_> { + match state.stage { + Stage::FinishingNextIter => { + let result = mem::replace(&mut state.result, Vec::new()); + debug!("Query finished with {} results", result.len()); + state.stage = Stage::Finished; + let future = future::ok((Some(QueryEvent::Finished(result)), state)); + return Some(future::Either::A(future)); + }, + Stage::Finished => { + return None; + }, + _ => () + }; + let searched_key = searched_key.clone(); - let query_interface = query_interface.clone(); - let query_interface2 = query_interface.clone(); + let find_node_rpc = query_params.find_node.clone(); // Find out which nodes to contact at this iteration. let to_contact = { - let wanted_len = if state.looking_for_closer { + let wanted_len = if state.stage == Stage::FirstStep { parallelism.saturating_sub(state.current_attempts_fut.len()) } else { num_results.saturating_sub(state.current_attempts_fut.len()) @@ -218,10 +240,8 @@ where let multiaddr: Multiaddr = AddrComponent::P2P(peer.clone().into_bytes()).into(); let searched_key2 = searched_key.clone(); - let resp_rx = - query_interface.send(multiaddr.clone(), move |ctl| ctl.find_node(&searched_key2)); + let current_attempt = find_node_rpc(multiaddr.clone(), searched_key2); // TODO: suboptimal state.current_attempts_addrs.push(peer.clone()); - let current_attempt = resp_rx.flatten(); state .current_attempts_fut .push(Box::new(current_attempt) as Box<_>); @@ -237,8 +257,10 @@ where if current_attempts_fut.is_empty() { // If `current_attempts_fut` is empty, then `select_all` would panic. It happens // when we have no additional node to query. - let future = future::ok(future::Loop::Break(state)); - return future::Either::A(future); + debug!("Finishing query early because no additional node available"); + state.stage = Stage::FinishingNextIter; + let future = future::ok((None, state)); + return Some(future::Either::A(future)); } // This is the future that continues or breaks the `loop_fn`. @@ -263,7 +285,7 @@ where Err(err) => { trace!("RPC query failed for {:?}: {:?}", remote_id, err); state.failed_to_contact.insert(remote_id); - return Ok(future::Loop::Continue(state)); + return future::ok((None, state)); } }; @@ -288,17 +310,14 @@ where let mut local_nearest_node_updated = false; // Update `state` with the actual content of the message. + let mut new_known_multiaddrs = Vec::with_capacity(closer_peers.len()); for mut peer in closer_peers { // Update the peerstore with the information sent by // the remote. { - let valid_multiaddrs = peer.multiaddrs.drain(..); - trace!("Adding multiaddresses to {:?}: {:?}", peer.node_id, valid_multiaddrs); - query_interface2.peer_add_addrs( - &peer.node_id, - valid_multiaddrs, - Duration::from_secs(3600), - ); // TODO: which TTL? + let multiaddrs = mem::replace(&mut peer.multiaddrs, Vec::new()); + trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, multiaddrs); + new_known_multiaddrs.push((peer.node_id.clone(), multiaddrs)); } if peer.node_id.distance_with(&searched_key) @@ -325,28 +344,22 @@ where } if state.result.len() >= num_results - || (!state.looking_for_closer && state.current_attempts_fut.is_empty()) + || (state.stage != Stage::FirstStep && state.current_attempts_fut.is_empty()) { - // Check that our `Vec::with_capacity` is correct. - debug_assert_eq!(state.result.capacity(), num_results); - Ok(future::Loop::Break(state)) + state.stage = Stage::FinishingNextIter; + } else { if !local_nearest_node_updated { trace!("Loop didn't update closer node ; jumping to step 2"); - state.looking_for_closer = false; + state.stage = Stage::SecondStep; } - - Ok(future::Loop::Continue(state)) } + + future::ok((Some(QueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state)) }); - future::Either::B(future) - }); - - let stream = stream.map(|state| { - debug!("Query finished with {} results", state.result.len()); - state.result - }); + Some(future::Either::B(future)) + }).filter_map(|val| val); Box::new(stream) as Box<_> } diff --git a/libp2p/examples/kademlia.rs b/libp2p/examples/kademlia.rs index 069f88a1..d10a3d9b 100644 --- a/libp2p/examples/kademlia.rs +++ b/libp2p/examples/kademlia.rs @@ -27,7 +27,7 @@ extern crate tokio_core; extern crate tokio_io; use bigint::U512; -use futures::future::Future; +use futures::{Future, Stream}; use libp2p::peerstore::{PeerAccess, PeerId, Peerstore}; use libp2p::Multiaddr; use std::env; @@ -35,6 +35,7 @@ use std::sync::Arc; use std::time::Duration; use libp2p::core::{Transport, PublicKeyBytesSlice}; use libp2p::core::{upgrade, either::EitherOutput}; +use libp2p::kad::{ConnectionType, Peer, QueryEvent}; use libp2p::tcp::TcpConfig; use tokio_core::reactor::Core; @@ -103,13 +104,11 @@ fn main() { // and outgoing connections for us. let kad_config = libp2p::kad::KademliaConfig { parallelism: 3, - record_store: (), - peer_store: peer_store, local_peer_id: my_peer_id.clone(), timeout: Duration::from_secs(2), }; - let kad_ctl_proto = libp2p::kad::KademliaControllerPrototype::new(kad_config); + let kad_ctl_proto = libp2p::kad::KademliaControllerPrototype::new(kad_config, peer_store.peers()); let proto = libp2p::kad::KademliaUpgrade::from_prototype(&kad_ctl_proto); @@ -117,7 +116,32 @@ fn main() { // outgoing connections for us. let (swarm_controller, swarm_future) = libp2p::core::swarm( transport.clone().with_upgrade(proto.clone()), - |upgrade, _| upgrade, + { + let peer_store = peer_store.clone(); + move |kademlia_stream, _| { + let peer_store = peer_store.clone(); + kademlia_stream.for_each(move |req| { + let peer_store = peer_store.clone(); + let result = req + .requested_peers() + .map(move |peer_id| { + let addrs = peer_store + .peer(peer_id) + .into_iter() + .flat_map(|p| p.addrs()) + .collect::>(); + Peer { + node_id: peer_id.clone(), + multiaddrs: addrs, + connection_ty: ConnectionType::Connected, // meh :-/ + } + }) + .collect::>(); + req.respond(result); + Ok(()) + }) + } + } ); let (kad_controller, _kad_init) = @@ -132,6 +156,21 @@ fn main() { let finish_enum = kad_controller .find_node(my_peer_id.clone()) + .filter_map(move |event| { + match event { + QueryEvent::NewKnownMultiaddrs(peers) => { + for (peer, addrs) in peers { + peer_store.peer_or_create(&peer) + .add_addrs(addrs, Duration::from_secs(3600)); + } + None + }, + QueryEvent::Finished(out) => Some(out), + } + }) + .into_future() + .map_err(|(err, _)| err) + .map(|(out, _)| out.unwrap()) .and_then(|out| { let local_hash = U512::from(my_peer_id.hash()); println!("Results of peer discovery for {:?}:", my_peer_id);