diff --git a/core/src/lib.rs b/core/src/lib.rs index f70f1522..5e24f7d0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -223,6 +223,7 @@ mod connection_reuse; mod keys_proto; mod peer_id; mod public_key; +mod unique; pub mod either; pub mod muxing; @@ -237,4 +238,5 @@ pub use self::peer_id::PeerId; pub use self::public_key::PublicKey; pub use self::swarm::{swarm, SwarmController, SwarmFuture}; pub use self::transport::{MuxedTransport, Transport}; +pub use self::unique::{UniqueConnec, UniqueConnecFuture, UniqueConnecState}; pub use self::upgrade::{ConnectionUpgrade, Endpoint}; diff --git a/core/src/unique.rs b/core/src/unique.rs new file mode 100644 index 00000000..0a901573 --- /dev/null +++ b/core/src/unique.rs @@ -0,0 +1,333 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{future, sync::oneshot, task, Async, Future, Poll, IntoFuture}; +use parking_lot::Mutex; +use {Multiaddr, MuxedTransport, SwarmController, Transport}; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::mem; +use std::sync::{Arc, Weak}; + +/// Storage for a unique connection with a remote. +pub struct UniqueConnec { + inner: Arc>>, +} + +enum UniqueConnecInner { + /// The `UniqueConnec` was created, but nothing is in it. + Empty, + /// We started dialing, but no response has been obtained so far. + Pending { + /// Tasks that need to be awakened when the content of this object is set. + tasks_waiting: Vec, + /// Future that represents when `set_until` should have been called. + // TODO: Send + Sync bound is meh + dial_fut: Box + Send + Sync>, + }, + /// The value of this unique connec has been set. + /// Can only transition to `Empty` when the future has expired. + Full { + /// Content of the object. + value: T, + /// Sender to trigger if the content gets cleared. + on_clear: oneshot::Sender<()>, + }, + /// The `dial_fut` has errored. + Errored(IoError), +} + +impl UniqueConnec { + /// Builds a new empty `UniqueConnec`. + #[inline] + pub fn empty() -> Self { + UniqueConnec { + inner: Arc::new(Mutex::new(UniqueConnecInner::Empty)), + } + } + + /// Builds a new `UniqueConnec` that contains a value. + #[inline] + pub fn with_value(value: T) -> Self { + let (on_clear, _) = oneshot::channel(); + UniqueConnec { + inner: Arc::new(Mutex::new(UniqueConnecInner::Full { value, on_clear })), + } + } + + /// Instantly returns the value from the object if there is any. + pub fn poll(&self) -> Option + where T: Clone, + { + let inner = self.inner.lock(); + if let UniqueConnecInner::Full { ref value, .. } = &*inner { + Some(value.clone()) + } else { + None + } + } + + /// Loads the value from the object. + /// + /// If the object is empty, dials the given multiaddress with the given transport. + /// + /// The closure of the `swarm` is expected to call `set_until()` on the `UniqueConnec`. Failure + /// to do so will make the `UniqueConnecFuture` produce an error. + pub fn get_or_dial(&self, swarm: &SwarmController, multiaddr: &Multiaddr, + transport: Du) -> UniqueConnecFuture + where T: Clone, + Du: Transport + 'static, // TODO: 'static :-/ + Du::Output: Into, + S: Clone + MuxedTransport, + { + self.get(|| { + swarm.dial(multiaddr.clone(), transport) + .map_err(|_| IoError::new(IoErrorKind::Other, "multiaddress not supported")) + .into_future() + .flatten() + }) + } + + /// Loads the value from the object. + /// + /// If the object is empty, calls the closure. The closure should return a future that + /// should be signaled after `set_until` has been called. If the future produces an error, + /// then the object will empty itself again and the `UniqueConnecFuture` will return an error. + /// If the future is finished and `set_until` hasn't been called, then the `UniqueConnecFuture` + /// will return an error. + pub fn get(&self, or: F) -> UniqueConnecFuture + where F: FnOnce() -> Fut, + T: Clone, + Fut: IntoFuture, + Fut::Future: Send + Sync + 'static, // TODO: 'static :-/ + { + match &*self.inner.lock() { + UniqueConnecInner::Empty => (), + _ => return UniqueConnecFuture { inner: Arc::downgrade(&self.inner) }, + }; + + // The mutex is unlocked when we call `or`, in order to avoid potential deadlocks. + let dial_fut = or().into_future(); + + let mut inner = self.inner.lock(); + // Since we unlocked the mutex, it's possible that the object was filled in the meanwhile. + // Therefore we check again whether it's still `Empty`. + if let UniqueConnecInner::Empty = &mut *inner { + *inner = UniqueConnecInner::Pending { + tasks_waiting: Vec::new(), + dial_fut: Box::new(dial_fut), + }; + } + + UniqueConnecFuture { inner: Arc::downgrade(&self.inner) } + } + + /// Puts `value` inside the object. The second parameter is a future whose completion will + /// clear up the content. Returns an adjusted version of that same future. + /// + /// If `clear()` is called, the returned future will automatically complete with an error. + /// + /// Has no effect if the object already contains something. + pub fn set_until(&self, value: T, until: F) -> impl Future + where F: Future + { + let mut tasks_to_notify = Vec::new(); + + let mut inner = self.inner.lock(); + let (on_clear, on_clear_rx) = oneshot::channel(); + match mem::replace(&mut *inner, UniqueConnecInner::Full { value, on_clear }) { + UniqueConnecInner::Empty => {}, + UniqueConnecInner::Errored(_) => {}, + UniqueConnecInner::Pending { tasks_waiting, .. } => { + tasks_to_notify = tasks_waiting; + }, + old @ UniqueConnecInner::Full { .. } => { + // Keep the old value. + *inner = old; + return future::Either::B(until); + }, + }; + drop(inner); + + // The mutex is unlocked when we notify the pending tasks. + for task in tasks_to_notify { + task.notify(); + } + + let inner = self.inner.clone(); + let fut = until + .select(on_clear_rx.then(|_| Ok(()))) + .map(|((), _)| ()) + .map_err(|(err, _)| err) + .then(move |val| { + *inner.lock() = UniqueConnecInner::Empty; + val + }); + future::Either::A(fut) + } + + /// Clears the content of the object. + /// + /// Has no effect if the content is empty or pending. + /// If the node was full, calling `clear` will stop the future returned by `set_until`. + pub fn clear(&self) { + let mut inner = self.inner.lock(); + match mem::replace(&mut *inner, UniqueConnecInner::Empty) { + UniqueConnecInner::Empty => {}, + UniqueConnecInner::Errored(_) => {}, + pending @ UniqueConnecInner::Pending { .. } => { + *inner = pending; + }, + UniqueConnecInner::Full { on_clear, .. } => { + let _ = on_clear.send(()); + }, + }; + } + + /// Returns the state of the object. + /// + /// Note that this can be racy, as the object can be used at the same time. In other words, + /// the returned value may no longer reflect the actual state. + pub fn state(&self) -> UniqueConnecState { + match *self.inner.lock() { + UniqueConnecInner::Empty => UniqueConnecState::Empty, + UniqueConnecInner::Errored(_) => UniqueConnecState::Errored, + UniqueConnecInner::Pending { .. } => UniqueConnecState::Pending, + UniqueConnecInner::Full { .. } => UniqueConnecState::Full, + } + } +} + +impl Clone for UniqueConnec { + #[inline] + fn clone(&self) -> UniqueConnec { + UniqueConnec { + inner: self.inner.clone(), + } + } +} + +impl Default for UniqueConnec { + #[inline] + fn default() -> Self { + UniqueConnec::empty() + } +} + +/// Future returned by `UniqueConnec::get()`. +pub struct UniqueConnecFuture { + inner: Weak>>, +} + +impl Future for UniqueConnecFuture + where T: Clone +{ + type Item = T; + type Error = IoError; + + fn poll(&mut self) -> Poll { + let inner = match self.inner.upgrade() { + Some(inner) => inner, + // All the `UniqueConnec` have been destroyed. + None => return Err(IoErrorKind::ConnectionAborted.into()), + }; + + let mut inner = inner.lock(); + match mem::replace(&mut *inner, UniqueConnecInner::Empty) { + UniqueConnecInner::Empty => { + // This can happen if `set_until()` is called, and the future expires before the + // future returned by `get()` gets polled. This means that the connection has been + // closed. + Err(IoErrorKind::ConnectionAborted.into()) + }, + UniqueConnecInner::Pending { mut tasks_waiting, mut dial_fut } => { + match dial_fut.poll() { + Ok(Async::Ready(())) => { + // This happens if we successfully dialed a remote, but the callback + // doesn't call `set_until`. This can be a logic error by the user, + // but could also indicate that the user decided to filter out this + // connection for whatever reason. + *inner = UniqueConnecInner::Errored(IoErrorKind::ConnectionAborted.into()); + Err(IoErrorKind::ConnectionAborted.into()) + }, + Ok(Async::NotReady) => { + tasks_waiting.push(task::current()); + *inner = UniqueConnecInner::Pending { tasks_waiting, dial_fut }; + Ok(Async::NotReady) + } + Err(err) => { + let tr = IoError::new(IoErrorKind::ConnectionAborted, err.to_string()); + *inner = UniqueConnecInner::Errored(err); + Err(tr) + }, + } + }, + UniqueConnecInner::Full { value, on_clear } => { + *inner = UniqueConnecInner::Full { + value: value.clone(), + on_clear + }; + Ok(Async::Ready(value)) + }, + UniqueConnecInner::Errored(err) => { + let tr = IoError::new(IoErrorKind::ConnectionAborted, err.to_string()); + *inner = UniqueConnecInner::Errored(err); + Err(tr) + }, + } + } +} + +/// State of a `UniqueConnec`. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum UniqueConnecState { + /// The object is empty. + Empty, + /// `get_*` has been called and we are waiting for `set_until` to be called. + Pending, + /// `set_until` has been called. + Full, + /// The future returned by the closure of `get_*` has errored or has finished before + /// `set_until` has been called. + Errored, +} + +#[cfg(test)] +mod tests { + use futures::{future, Future}; + use transport::DeniedTransport; + use {UniqueConnec, UniqueConnecState}; + use swarm; + + #[test] + fn invalid_multiaddr_produces_error() { + let unique = UniqueConnec::empty(); + assert_eq!(unique.state(), UniqueConnecState::Empty); + let unique2 = unique.clone(); + let (swarm_ctrl, _swarm_fut) = swarm(DeniedTransport, |_, _| { + unique2.set_until((), future::empty()) + }); + let fut = unique.get_or_dial(&swarm_ctrl, &"/ip4/1.2.3.4".parse().unwrap(), + DeniedTransport); + assert!(fut.wait().is_err()); + assert_eq!(unique.state(), UniqueConnecState::Errored); + } + + // TODO: more tests +} diff --git a/kad/Cargo.toml b/kad/Cargo.toml index 02875295..c7ef2223 100644 --- a/kad/Cargo.toml +++ b/kad/Cargo.toml @@ -22,7 +22,7 @@ rand = "0.4.2" smallvec = "0.5" tokio-codec = "0.1" tokio-io = "0.1" -tokio-timer = "0.1.2" +tokio-timer = "0.2" varint = { path = "../varint-rs" } [dev-dependencies] diff --git a/kad/src/high_level.rs b/kad/src/high_level.rs index dc5bd95c..c2887203 100644 --- a/kad/src/high_level.rs +++ b/kad/src/high_level.rs @@ -18,513 +18,447 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! High-level structs/traits of the crate. -//! -//! Lies on top of the `kad_server` module. - -use bytes::Bytes; -use fnv::FnvHashMap; -use futures::sync::oneshot; -use futures::{self, future, Future, Stream}; -use kad_server::{KademliaServerConfig, KademliaServerController, KademliaIncomingRequest, KademliaFindNodeRespond}; -use kbucket::{KBucketsPeerId, KBucketsTable, UpdateOutcome}; -use libp2p_core::{ConnectionUpgrade, Endpoint, MuxedTransport, PeerId, SwarmController, Transport}; -use multiaddr::{AddrComponent, Multiaddr}; -use parking_lot::Mutex; -use protocol::Peer; -use query; -use std::collections::hash_map::Entry; -use std::fmt; +use fnv::FnvHashSet; +use futures::{future, Future, IntoFuture, stream, Stream}; +use kad_server::KadConnecController; +use kbucket::{KBucketsTable, KBucketsPeerId}; +use libp2p_core::PeerId; +use multiaddr::Multiaddr; +use protocol; +use rand; +use smallvec::SmallVec; +use std::cmp::Ordering; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::iter; -use std::slice::Iter as SliceIter; -use std::sync::Arc; -use std::time::Duration; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer; +use std::mem; +use std::time::{Duration, Instant}; +use tokio_timer::Deadline; /// Prototype for a future Kademlia protocol running on a socket. #[derive(Debug, Clone)] -pub struct KademliaConfig { +pub struct KadSystemConfig { /// Degree of parallelism on the network. Often called `alpha` in technical papers. /// No more than this number of remotes will be used at a given time for any given operation. // TODO: ^ share this number between operations? or does each operation use `alpha` remotes? pub parallelism: u32, /// Id of the local peer. pub local_peer_id: PeerId, + /// List of peers initially known. + pub known_initial_peers: I, + /// Duration after which a node in the k-buckets needs to be pinged again. + pub kbuckets_timeout: Duration, /// When contacting a node, duration after which we consider it unresponsive. - pub timeout: Duration, + pub request_timeout: Duration, } -// Builds a `QueryParams` that fetches information from `$controller`. -// -// Because of lifetime issues and type naming issues, a macro is the most convenient solution. -macro_rules! gen_query_params { - ($controller:expr) => {{ - let controller = $controller; - query::QueryParams { - local_id: $controller.inner.kbuckets.my_id().clone(), - kbuckets_find_closest: { - let controller = controller.clone(); - move |addr| controller.inner.kbuckets.find_closest(&addr).collect() - }, - parallelism: $controller.inner.parallelism, - find_node: { - let controller = controller.clone(); - move |addr, searched| { - // TODO: rewrite to be more direct - Box::new(controller.send(addr, move |ctl| ctl.find_node(&searched)).flatten()) as Box<_> - } - }, - } - }}; +/// System that drives the whole Kademlia process. +pub struct KadSystem { + // The actual DHT. + kbuckets: KBucketsTable, + // Same as in the config. + parallelism: u32, + // Same as in the config. + request_timeout: Duration, } -/// Object that allows one to make queries on the Kademlia system. -#[derive(Debug)] -pub struct KademliaControllerPrototype { - inner: Arc, +/// Event that happens during a query. +#[derive(Debug, Clone)] +pub enum KadQueryEvent { + /// Learned about new mutiaddresses for the given peers. + NewKnownMultiaddrs(Vec<(PeerId, Vec)>), + /// Finished the processing of the query. Contains the result. + Finished(TOut), } -impl KademliaControllerPrototype { - /// Creates a new controller from that configuration. - pub fn new(config: KademliaConfig, initial_peers: I) -> KademliaControllerPrototype - where I: IntoIterator +impl KadSystem { + /// Starts a new Kademlia system. + /// + /// Also produces a `Future` that drives a Kademlia initialization process. + /// This future should be driven to completion by the caller. + pub fn start<'a, F, Fut>(config: KadSystemConfig>, access: F) + -> (KadSystem, impl Future + 'a) + where F: FnMut(&PeerId) -> Fut + Clone + 'a, + Fut: IntoFuture + 'a, { - let buckets = KBucketsTable::new(config.local_peer_id.clone(), config.timeout); - for peer_id in initial_peers { - let _ = buckets.update(peer_id, ()); - } - - let inner = Arc::new(Inner { - kbuckets: buckets, - timer: tokio_timer::wheel().build(), - connections: Default::default(), - timeout: config.timeout, - parallelism: config.parallelism as usize, - }); - - KademliaControllerPrototype { inner: inner } + let system = KadSystem::without_init(config); + let init_future = system.perform_initialization(access); + (system, init_future) } - /// Turns the prototype into an actual controller by feeding it a swarm controller. - /// - /// You must pass to this function the transport to use to dial and obtain - /// `KademliaPeerReqStream`, plus a mapping function that will turn the - /// `KademliaPeerReqStream` into whatever the swarm expects. - pub fn start( - self, - swarm: SwarmController, - kademlia_transport: K, - map: M, - ) -> ( - KademliaController, - Box>, - ) - where - T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - K: Transport + Clone + 'static, // TODO: 'static :-/ - M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, - { - // TODO: initialization + /// Same as `start`, but doesn't perform the initialization process. + pub fn without_init(config: KadSystemConfig>) -> KadSystem { + let kbuckets = KBucketsTable::new(config.local_peer_id.clone(), config.kbuckets_timeout); + for peer in config.known_initial_peers { + let _ = kbuckets.update(peer, ()); + } - let controller = KademliaController { - inner: self.inner.clone(), - swarm_controller: swarm, - kademlia_transport, - map, + let system = KadSystem { + kbuckets: kbuckets, + parallelism: config.parallelism, + request_timeout: config.request_timeout, }; - let init_future = { - let futures: Vec<_> = (0..256) - .map({ - let controller = controller.clone(); - move |n| query::refresh(gen_query_params!(controller.clone()), n) - }) - .map(|stream| { - stream.for_each(|_| Ok(())) - }) - .collect(); + system + } - future::loop_fn(futures, |futures| { - if futures.is_empty() { - let fut = future::ok(future::Loop::Break(())); - return future::Either::A(fut); - } - - let fut = future::select_all(futures) - .map_err(|(err, _, _)| err) - .map(|(_, _, rest)| future::Loop::Continue(rest)); - future::Either::B(fut) + /// Starts an initialization process. + pub fn perform_initialization<'a, F, Fut>(&self, access: F) -> impl Future + 'a + where F: FnMut(&PeerId) -> Fut + Clone + 'a, + Fut: IntoFuture + 'a, + { + let futures: Vec<_> = (0..256) // TODO: 256 is arbitrary + .map(|n| { + refresh(n, access.clone(), &self.kbuckets, + self.parallelism as usize, self.request_timeout) }) + .map(|stream| stream.for_each(|_| Ok(()))) + .collect(); + + future::loop_fn(futures, |futures| { + if futures.is_empty() { + let fut = future::ok(future::Loop::Break(())); + return future::Either::A(fut); + } + + let fut = future::select_all(futures) + .map_err(|(err, _, _)| err) + .map(|(_, _, rest)| future::Loop::Continue(rest)); + future::Either::B(fut) + }) + } + + /// Updates the k-buckets with the specific peer. + /// + /// Should be called whenever we receive a message from a peer. + pub fn update_kbuckets(&self, peer: PeerId) { + // TODO: ping system + let _ = self.kbuckets.update(peer, ()); + } + + /// Returns the local peer ID, as passed in the configuration. + pub fn local_peer_id(&self) -> &PeerId { + self.kbuckets.my_id() + } + + /// Finds the known nodes closest to `id`, ordered by distance. + pub fn known_closest_peers(&self, id: &PeerId) -> impl Iterator { + self.kbuckets.find_closest_with_self(id) + } + + /// Starts a query for an iterative `FIND_NODE` request. + pub fn find_node<'a, F, Fut>(&self, searched_key: PeerId, access: F) + -> impl Stream>, Error = IoError> + 'a + where F: FnMut(&PeerId) -> Fut + 'a, + Fut: IntoFuture + 'a, + { + query(access, &self.kbuckets, searched_key, self.parallelism as usize, + 20, self.request_timeout) // TODO: arbitrary const + } +} + +// Refreshes a specific bucket by performing an iterative `FIND_NODE` on a random ID of this +// bucket. +// +// Returns a dummy no-op future if `bucket_num` is out of range. +fn refresh<'a, F, Fut>(bucket_num: usize, access: F, kbuckets: &KBucketsTable, + parallelism: usize, request_timeout: Duration) + -> impl Stream, Error = IoError> + 'a +where F: FnMut(&PeerId) -> Fut + 'a, + Fut: IntoFuture + 'a, +{ + let peer_id = match gen_random_id(kbuckets.my_id(), bucket_num) { + Ok(p) => p, + Err(()) => { + let stream = stream::once(Ok(KadQueryEvent::Finished(()))); + return Box::new(stream) as Box>; + }, + }; + + let stream = query(access, kbuckets, peer_id, parallelism, 20, request_timeout) // TODO: 20 is arbitrary + .map(|event| { + match event { + KadQueryEvent::NewKnownMultiaddrs(peers) => KadQueryEvent::NewKnownMultiaddrs(peers), + KadQueryEvent::Finished(_) => KadQueryEvent::Finished(()), + } + }); + Box::new(stream) as Box> +} + +// Generates a random `PeerId` that belongs to the given bucket. +// +// Returns an error if `bucket_num` is out of range. +fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result { + let my_id_len = my_id.as_bytes().len(); + + // TODO: this 2 is magic here ; it is the length of the hash of the multihash + let bits_diff = bucket_num + 1; + if bits_diff > 8 * (my_id_len - 2) { + return Err(()); + } + + let mut random_id = [0; 64]; + for byte in 0..my_id_len { + match byte.cmp(&(my_id_len - bits_diff / 8 - 1)) { + Ordering::Less => { + random_id[byte] = my_id.as_bytes()[byte]; + } + Ordering::Equal => { + let mask: u8 = (1 << (bits_diff % 8)) - 1; + random_id[byte] = (my_id.as_bytes()[byte] & !mask) | (rand::random::() & mask); + } + Ordering::Greater => { + random_id[byte] = rand::random(); + } + } + } + + let peer_id = PeerId::from_bytes(random_id[..my_id_len].to_owned()) + .expect("randomly-generated peer ID should always be valid"); + Ok(peer_id) +} + +// Generic query-performing function. +fn query<'a, F, Fut>( + access: F, + kbuckets: &KBucketsTable, + searched_key: PeerId, + parallelism: usize, + num_results: usize, + request_timeout: Duration, +) -> impl Stream>, Error = IoError> + 'a +where F: FnMut(&PeerId) -> Fut + 'a, + Fut: IntoFuture + 'a, +{ + debug!("Start query for {:?} ; num results = {}", searched_key, num_results); + + // State of the current iterative process. + struct State<'a, F> { + // At which stage we are. + stage: Stage, + // The `access` parameter. + access: F, + // Final output of the iteration. + result: Vec, + // For each open connection, a future with the response of the remote. + // Note that don't use a `SmallVec` here because `select_all` produces a `Vec`. + current_attempts_fut: Vec, Error = IoError> + 'a>>, + // For each open connection, the peer ID that we are connected to. + // Must always have the same length as `current_attempts_fut`. + current_attempts_addrs: SmallVec<[PeerId; 32]>, + // Nodes that need to be attempted. + pending_nodes: Vec, + // Peers that we tried to contact but failed. + failed_to_contact: FnvHashSet, + } + + // General stage of the state. + #[derive(Copy, Clone, PartialEq, Eq)] + enum Stage { + // We are still in the first step of the algorithm where we try to find the closest node. + FirstStep, + // We are contacting the k closest nodes in order to fill the list with enough results. + SecondStep, + // The results are complete, and the next stream iteration will produce the outcome. + FinishingNextIter, + // We are finished and the stream shouldn't return anything anymore. + Finished, + } + + let initial_state = State { + stage: Stage::FirstStep, + access: access, + result: Vec::with_capacity(num_results), + current_attempts_fut: Vec::new(), + current_attempts_addrs: SmallVec::new(), + pending_nodes: kbuckets.find_closest(&searched_key).collect(), + failed_to_contact: Default::default(), + }; + + // Start of the iterative process. + let stream = stream::unfold(initial_state, move |mut state| -> Option<_> { + match state.stage { + Stage::FinishingNextIter => { + let result = mem::replace(&mut state.result, Vec::new()); + debug!("Query finished with {} results", result.len()); + state.stage = Stage::Finished; + let future = future::ok((Some(KadQueryEvent::Finished(result)), state)); + return Some(future::Either::A(future)); + }, + Stage::Finished => { + return None; + }, + _ => () }; - (controller, Box::new(init_future)) - } -} + let searched_key = searched_key.clone(); -/// Object that allows one to make queries on the Kademlia system. -#[derive(Debug)] -pub struct KademliaController -where - T: MuxedTransport + 'static, // TODO: 'static :-/ -{ - inner: Arc, - swarm_controller: SwarmController, - kademlia_transport: K, - map: M, -} + // Find out which nodes to contact at this iteration. + let to_contact = { + let wanted_len = if state.stage == Stage::FirstStep { + parallelism.saturating_sub(state.current_attempts_fut.len()) + } else { + num_results.saturating_sub(state.current_attempts_fut.len()) + }; + let mut to_contact = SmallVec::<[_; 16]>::new(); + while to_contact.len() < wanted_len && !state.pending_nodes.is_empty() { + // Move the first element of `pending_nodes` to `to_contact`, but ignore nodes that + // are already part of the results or of a current attempt or if we failed to + // contact it before. + let peer = state.pending_nodes.remove(0); + if state.result.iter().any(|p| p == &peer) { + continue; + } + if state.current_attempts_addrs.iter().any(|p| p == &peer) { + continue; + } + if state.failed_to_contact.iter().any(|p| p == &peer) { + continue; + } + to_contact.push(peer); + } + to_contact + }; -impl Clone for KademliaController -where - T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - K: Clone, - M: Clone, -{ - #[inline] - fn clone(&self) -> Self { - KademliaController { - inner: self.inner.clone(), - swarm_controller: self.swarm_controller.clone(), - kademlia_transport: self.kademlia_transport.clone(), - map: self.map.clone(), - } - } -} + debug!("New query round ; {} queries in progress ; contacting {} new peers", + state.current_attempts_fut.len(), + to_contact.len()); -impl KademliaController -where - T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ -{ - /// Performs an iterative find node query on the network. - /// - /// Will query the network for the peers that4 are the closest to `searched_key` and return - /// the results. - /// - /// The algorithm used is a standard Kademlia algorithm. The details are not documented, so - /// that the implementation is free to modify them. - #[inline] - pub fn find_node( - &self, - searched_key: PeerId, - ) -> Box>, Error = IoError>> - where - K: Transport + Clone + 'static, - M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/ - { - let me = self.clone(); - query::find_node(gen_query_params!(me.clone()), searched_key) - } -} - -/// Connection upgrade to the Kademlia protocol. -#[derive(Clone)] -pub struct KademliaUpgrade { - inner: Arc, - upgrade: KademliaServerConfig, -} - -impl KademliaUpgrade { - /// Builds a connection upgrade from the controller. - #[inline] - pub fn from_prototype(proto: &KademliaControllerPrototype) -> Self { - KademliaUpgrade { - inner: proto.inner.clone(), - upgrade: KademliaServerConfig::new(), - } - } - - /// Builds a connection upgrade from the controller. - #[inline] - pub fn from_controller(ctl: &KademliaController) -> Self - where - T: MuxedTransport, - { - KademliaUpgrade { - inner: ctl.inner.clone(), - upgrade: KademliaServerConfig::new(), - } - } -} - -impl ConnectionUpgrade for KademliaUpgrade -where - C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ - Maf: Future + 'static, // TODO: 'static :( -{ - type Output = KademliaPeerReqStream; - type MultiaddrFuture = future::FutureResult; - type Future = Box>; - type NamesIter = iter::Once<(Bytes, ())>; - type UpgradeIdentifier = (); - - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - ConnectionUpgrade::::protocol_names(&self.upgrade) - } - - #[inline] - fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: Maf) -> Self::Future { - let future = addr.and_then(move |addr| { - let inner = self.inner; - let client_addr = addr.clone(); - - let peer_id = { - let mut iter = addr.iter(); - let protocol = iter.next(); - let after_proto = iter.next(); - match (protocol, after_proto) { - (Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => { - match PeerId::from_bytes(key) { - Ok(id) => id, - Err(_) => { - let err = IoError::new( - IoErrorKind::InvalidData, - "invalid peer ID sent by remote identification", - ); - return Box::new(future::err(err)) as Box>; - } - } - } - _ => { - let err = - IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node"); - return Box::new(future::err(err)); + // For each node in `to_contact`, start an RPC query and a corresponding entry in the two + // `state.current_attempts_*` fields. + for peer in to_contact { + let searched_key2 = searched_key.clone(); + let current_attempt = (state.access)(&peer) + .into_future() + .and_then(move |controller| { + controller.find_node(&searched_key2) + }); + let with_deadline = Deadline::new(current_attempt, Instant::now() + request_timeout) + .map_err(|err| { + if let Some(err) = err.into_inner() { + err + } else { + IoError::new(IoErrorKind::ConnectionAborted, "kademlia request timeout") } + }); + state.current_attempts_addrs.push(peer.clone()); + state + .current_attempts_fut + .push(Box::new(with_deadline) as Box<_>); + } + debug_assert_eq!( + state.current_attempts_addrs.len(), + state.current_attempts_fut.len() + ); + + // Extract `current_attempts_fut` so that we can pass it to `select_all`. We will push the + // values back when inside the loop. + let current_attempts_fut = mem::replace(&mut state.current_attempts_fut, Vec::new()); + if current_attempts_fut.is_empty() { + // If `current_attempts_fut` is empty, then `select_all` would panic. It happens + // when we have no additional node to query. + debug!("Finishing query early because no additional node available"); + state.stage = Stage::FinishingNextIter; + let future = future::ok((None, state)); + return Some(future::Either::A(future)); + } + + // This is the future that continues or breaks the `loop_fn`. + let future = future::select_all(current_attempts_fut.into_iter()).then(move |result| { + let (message, trigger_idx, other_current_attempts) = match result { + Err((err, trigger_idx, other_current_attempts)) => { + (Err(err), trigger_idx, other_current_attempts) + } + Ok((message, trigger_idx, other_current_attempts)) => { + (Ok(message), trigger_idx, other_current_attempts) } }; - let future = self.upgrade.upgrade(incoming, id, endpoint, future::ok::<_, IoError>(addr)).map( - move |((controller, stream), _)| { - match inner.connections.lock().entry(client_addr.clone()) { - Entry::Occupied(mut entry) => { - match entry.insert(Connection::Active(controller)) { - // If there was already an active connection to this remote, it gets - // replaced by the new more recent one. - Connection::Active(_old_connection) => {} - Connection::Pending(closures) => { - let new_ctl = match entry.get_mut() { - &mut Connection::Active(ref mut ctl) => ctl, - _ => unreachable!( - "logic error: an Active enum variant was \ - inserted, but reading back didn't give \ - an Active" - ), - }; + // Putting back the extracted elements in `state`. + let remote_id = state.current_attempts_addrs.remove(trigger_idx); + debug_assert!(state.current_attempts_fut.is_empty()); + state.current_attempts_fut = other_current_attempts; - for mut closure in closures { - closure(new_ctl); - } - } - }; - } - Entry::Vacant(entry) => { - entry.insert(Connection::Active(controller)); - } - }; + // `message` contains the reason why the current future was woken up. + let closer_peers = match message { + Ok(msg) => msg, + Err(err) => { + trace!("RPC query failed for {:?}: {:?}", remote_id, err); + state.failed_to_contact.insert(remote_id); + return future::ok((None, state)); + } + }; - let stream = stream.map(move |query| { - match inner.kbuckets.update(peer_id.clone(), ()) { - UpdateOutcome::NeedPing(node_to_ping) => { - // TODO: do something with this info - println!("need to ping {:?}", node_to_ping); - } - _ => (), - } + // Inserting the node we received a response from into `state.result`. + // The code is non-trivial because `state.result` is ordered by distance and is limited + // by `num_results` elements. + if let Some(insert_pos) = state.result.iter().position(|e| { + e.distance_with(&searched_key) >= remote_id.distance_with(&searched_key) + }) { + if state.result[insert_pos] != remote_id { + if state.result.len() >= num_results { + state.result.pop(); + } + state.result.insert(insert_pos, remote_id); + } + } else if state.result.len() < num_results { + state.result.push(remote_id); + } - match query { - KademliaIncomingRequest::FindNode { searched, responder } => { - let mut intermediate: Vec<_> = inner.kbuckets.find_closest(&searched).collect(); - let my_id = inner.kbuckets.my_id().clone(); - if let Some(pos) = intermediate - .iter() - .position(|e| e.distance_with(&searched) >= my_id.distance_with(&searched)) - { - if intermediate[pos] != my_id { - intermediate.insert(pos, my_id); - } - } else { - intermediate.push(my_id); - } + // The loop below will set this variable to `true` if we find a new element to put at + // the top of the result. This would mean that we have to continue looping. + let mut local_nearest_node_updated = false; - Some(KademliaPeerReq { - requested_peers: intermediate, - inner: responder, - }) - }, - KademliaIncomingRequest::PingPong => { - // We updated the k-bucket above. - None - }, - } - }).filter_map(|val| val); + // Update `state` with the actual content of the message. + let mut new_known_multiaddrs = Vec::with_capacity(closer_peers.len()); + for mut peer in closer_peers { + // Update the peerstore with the information sent by + // the remote. + { + let multiaddrs = mem::replace(&mut peer.multiaddrs, Vec::new()); + trace!("Reporting multiaddresses for {:?}: {:?}", peer.node_id, multiaddrs); + new_known_multiaddrs.push((peer.node_id.clone(), multiaddrs)); + } - (KademliaPeerReqStream { inner: Box::new(stream) }, future::ok(client_addr)) - }, - ); + if peer.node_id.distance_with(&searched_key) + <= state.result[0].distance_with(&searched_key) + { + local_nearest_node_updated = true; + } - Box::new(future) as Box<_> + if state.result.iter().any(|ma| ma == &peer.node_id) { + continue; + } + + // Insert the node into `pending_nodes` at the right position, or do not + // insert it if it is already in there. + if let Some(insert_pos) = state.pending_nodes.iter().position(|e| { + e.distance_with(&searched_key) >= peer.node_id.distance_with(&searched_key) + }) { + if state.pending_nodes[insert_pos] != peer.node_id { + state.pending_nodes.insert(insert_pos, peer.node_id.clone()); + } + } else { + state.pending_nodes.push(peer.node_id.clone()); + } + } + + if state.result.len() >= num_results + || (state.stage != Stage::FirstStep && state.current_attempts_fut.is_empty()) + { + state.stage = Stage::FinishingNextIter; + + } else { + if !local_nearest_node_updated { + trace!("Loop didn't update closer node ; jumping to step 2"); + state.stage = Stage::SecondStep; + } + } + + future::ok((Some(KadQueryEvent::NewKnownMultiaddrs(new_known_multiaddrs)), state)) }); - Box::new(future) as Box<_> - } -} - -/// Stream that must be processed for the Kademlia system to work. -/// -/// Produces requests for peer information. These requests should be answered for the stream to -/// continue to progress. -pub struct KademliaPeerReqStream { - inner: Box>, -} - -impl Stream for KademliaPeerReqStream { - type Item = KademliaPeerReq; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> futures::Poll, Self::Error> { - self.inner.poll() - } -} - -/// Request for information about some peers. -pub struct KademliaPeerReq { - inner: KademliaFindNodeRespond, - requested_peers: Vec, -} - -impl KademliaPeerReq { - /// Returns a list of the IDs of the peers that were requested. - #[inline] - pub fn requested_peers(&self) -> SliceIter { - self.requested_peers.iter() - } - - /// Responds to the request. - #[inline] - pub fn respond(self, peers: I) - where I: IntoIterator - { - self.inner.respond(peers); - } -} - -// Inner struct shared throughout the Kademlia system. -#[derive(Debug)] -struct Inner { - // The remotes are identified by their public keys. - kbuckets: KBucketsTable, - - // Timer used for building the timeouts. - timer: tokio_timer::Timer, - - // Same as in the config. - timeout: Duration, - - // Same as in the config. - parallelism: usize, - - // List of open connections with remotes. - // - // Since the keys are the nodes' multiaddress, it is expected that each node only has one - // multiaddress. This should be the case if the user uses the identify transport that - // automatically maps peer IDs to multiaddresses. - // TODO: is it correct to use FnvHashMap with a Multiaddr? needs benchmarks - connections: Mutex>, -} - -// Current state of a connection to a specific multiaddr. -// -// There is no `Inactive` entry, as an inactive connection corresponds to no entry in the -// `connections` hash map. -enum Connection { - // The connection has already been opened and is ready to be controlled through the given - // controller. - Active(KademliaServerController), - - // The connection is in the process of being opened. Any closure added to this `Vec` will be - // executed on the controller once it is available. - // Once the connection is open, it will be replaced with `Active`. - // TODO: should be FnOnce once Rust allows that - Pending(Vec>), -} - -impl fmt::Debug for Connection { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - Connection::Active(_) => write!(f, "Connection::Active"), - Connection::Pending(_) => write!(f, "Connection::Pending"), - } - } -} - -impl KademliaController -where - T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - K: Transport + Clone + 'static, // TODO: 'static - M: FnOnce(KademliaPeerReqStream) -> T::Output + Clone + 'static, // TODO: 'static :-/ -{ - #[inline] - fn send( - &self, - addr: Multiaddr, - and_then: F, - ) -> Box> - where - F: FnOnce(&KademliaServerController) -> FRet + 'static, - FRet: 'static, - { - let mut lock = self.inner.connections.lock(); - - let pending_list = match lock.entry(addr.clone()) { - Entry::Occupied(entry) => { - match entry.into_mut() { - &mut Connection::Pending(ref mut list) => list, - &mut Connection::Active(ref mut ctrl) => { - // If we have an active connection, entirely short-circuit the function. - let output = future::ok(and_then(ctrl)); - return Box::new(output) as Box<_>; - } - } - } - Entry::Vacant(entry) => { - // Need to open a connection. - let map = self.map.clone(); - match self.swarm_controller - .dial(addr, self.kademlia_transport.clone().map(move |out, _| map(out))) - { - Ok(_) => (), - Err(_addr) => { - let fut = future::err(IoError::new( - IoErrorKind::InvalidData, - "unsupported multiaddress", - )); - return Box::new(fut) as Box<_>; - } - } - match entry.insert(Connection::Pending(Vec::with_capacity(1))) { - &mut Connection::Pending(ref mut list) => list, - _ => unreachable!("we just inserted a Pending variant"), - } - } - }; - - let (tx, rx) = oneshot::channel(); - let mut tx = Some(tx); - let mut and_then = Some(and_then); - pending_list.push(Box::new(move |ctrl: &mut KademliaServerController| { - let and_then = and_then - .take() - .expect("Programmer error: 'pending' closure was called multiple times"); - let tx = tx.take() - .expect("Programmer error: 'pending' closure was called multiple times"); - let ret = and_then(ctrl); - let _ = tx.send(ret); - }) as Box<_>); - - let future = rx.map_err(|_| IoErrorKind::ConnectionAborted.into()); - let future = self.inner.timer.timeout(future, self.inner.timeout); - Box::new(future) as Box<_> - } + Some(future::Either::B(future)) + }).filter_map(|val| val); + + // Boxing the stream is not necessary, but we do it in order to improve compilation time. + Box::new(stream) as Box<_> } diff --git a/kad/src/kad_server.rs b/kad/src/kad_server.rs index 199142af..3f3343b4 100644 --- a/kad/src/kad_server.rs +++ b/kad/src/kad_server.rs @@ -23,21 +23,21 @@ //! //! # Usage //! -//! - Create a `KademliaServerConfig` object. This struct implements `ConnectionUpgrade`. +//! - Create a `KadConnecConfig` object. This struct implements `ConnectionUpgrade`. //! -//! - Update a connection through that `KademliaServerConfig`. The output yields you a -//! `KademliaServerController` and a stream that must be driven to completion. The controller +//! - Update a connection through that `KadConnecConfig`. The output yields you a +//! `KadConnecController` and a stream that must be driven to completion. The controller //! allows you to perform queries and receive responses. The stream produces incoming requests //! from the remote. //! -//! This `KademliaServerController` is usually extracted and stored in some sort of hash map in an +//! This `KadConnecController` is usually extracted and stored in some sort of hash map in an //! `Arc` in order to be available whenever we need to request something from a node. use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::{future, Future, Sink, stream, Stream}; use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId}; -use protocol::{self, KadMsg, KademliaProtocolConfig, Peer}; +use protocol::{self, KadMsg, KademliaProtocolConfig, KadPeer}; use std::collections::VecDeque; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; @@ -45,31 +45,31 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// Configuration for a Kademlia server. /// -/// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KademliaServerController` +/// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KadConnecController` /// and a `Future`. The controller lets you send queries to the remote and receive answers, while /// the `Future` must be driven to completion in order for things to work. #[derive(Debug, Clone)] -pub struct KademliaServerConfig { +pub struct KadConnecConfig { raw_proto: KademliaProtocolConfig, } -impl KademliaServerConfig { +impl KadConnecConfig { /// Builds a configuration object for an upcoming Kademlia server. #[inline] pub fn new() -> Self { - KademliaServerConfig { + KadConnecConfig { raw_proto: KademliaProtocolConfig, } } } -impl ConnectionUpgrade for KademliaServerConfig +impl ConnectionUpgrade for KadConnecConfig where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ { type Output = ( - KademliaServerController, - Box>, + KadConnecController, + Box>, ); type MultiaddrFuture = Maf; type Future = future::Map<>::Future, fn((>::Output, Maf)) -> (Self::Output, Maf)>; @@ -93,7 +93,7 @@ where /// Allows sending Kademlia requests and receiving responses. #[derive(Debug, Clone)] -pub struct KademliaServerController { +pub struct KadConnecController { // In order to send a request, we use this sender to send a tuple. The first element of the // tuple is the message to send to the remote, and the second element is what is used to // receive the response. If the query doesn't expect a response (eg. `PUT_VALUE`), then the @@ -101,13 +101,13 @@ pub struct KademliaServerController { inner: mpsc::UnboundedSender<(KadMsg, oneshot::Sender)>, } -impl KademliaServerController { +impl KadConnecController { /// Sends a `FIND_NODE` query to the node and provides a future that will contain the response. // TODO: future item could be `impl Iterator` instead pub fn find_node( &self, searched_key: &PeerId, - ) -> impl Future, Error = IoError> { + ) -> impl Future, Error = IoError> { let message = protocol::KadMsg::FindNodeReq { key: searched_key.clone().into_bytes(), }; @@ -159,13 +159,13 @@ impl KademliaServerController { } /// Request received from the remote. -pub enum KademliaIncomingRequest { +pub enum KadIncomingRequest { /// Find the nodes closest to `searched`. FindNode { /// The value being searched. searched: PeerId, /// Object to use to respond to the request. - responder: KademliaFindNodeRespond, + responder: KadFindNodeRespond, }, // TODO: PutValue and FindValue @@ -175,14 +175,14 @@ pub enum KademliaIncomingRequest { } /// Object used to respond to `FindNode` queries from remotes. -pub struct KademliaFindNodeRespond { +pub struct KadFindNodeRespond { inner: oneshot::Sender, } -impl KademliaFindNodeRespond { +impl KadFindNodeRespond { /// Respond to the `FindNode` request. pub fn respond(self, peers: I) - where I: IntoIterator + where I: IntoIterator { let _ = self.inner.send(KadMsg::FindNodeRes { closer_peers: peers.into_iter().collect() @@ -191,12 +191,12 @@ impl KademliaFindNodeRespond { } // Builds a controller and stream from a stream/sink of raw messages. -fn build_from_sink_stream<'a, S>(connec: S) -> (KademliaServerController, Box + 'a>) +fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box + 'a>) where S: Sink + Stream + 'a { let (tx, rx) = mpsc::unbounded(); let future = kademlia_handler(connec, rx); - let controller = KademliaServerController { inner: tx }; + let controller = KadConnecController { inner: tx }; (controller, future) } @@ -211,7 +211,7 @@ where S: Sink + Stream( kad_bistream: S, rq_rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender)>, -) -> Box + 'a> +) -> Box + 'a> where S: Stream + Sink + 'a, { @@ -239,8 +239,7 @@ where .map_err(|_| unreachable!()); let rq_rx = rq_rx .map(|(m, o)| EventSource::LocalRequest(m, o)) - .map_err(|_| unreachable!()) - .chain(future::ok(EventSource::Finished).into_stream()); + .map_err(|_| unreachable!()); let kad_stream = kad_stream .map(|m| EventSource::Remote(m)) .chain(future::ok(EventSource::Finished).into_stream()); @@ -325,7 +324,7 @@ where // remote will see its PING answered only when it PONGs us. let future = future::ok({ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished); - let rq = KademliaIncomingRequest::PingPong; + let rq = KadIncomingRequest::PingPong; (Some(rq), state) }); Box::new(future) as Box<_> @@ -334,7 +333,7 @@ where .send(KadMsg::Ping) .map(move |kad_sink| { let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished); - let rq = KademliaIncomingRequest::PingPong; + let rq = KadIncomingRequest::PingPong; (Some(rq), state) }); Box::new(future) as Box<_> @@ -371,9 +370,9 @@ where let _ = responders_tx.unbounded_send(rx); let future = future::ok({ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished); - let rq = KademliaIncomingRequest::FindNode { + let rq = KadIncomingRequest::FindNode { searched: peer_id, - responder: KademliaFindNodeRespond { + responder: KadFindNodeRespond { inner: tx } }; @@ -407,9 +406,9 @@ mod tests { use std::iter; use futures::{Future, Poll, Sink, StartSend, Stream}; use futures::sync::mpsc; - use kad_server::{self, KademliaIncomingRequest, KademliaServerController}; + use kad_server::{self, KadIncomingRequest, KadConnecController}; use libp2p_core::PublicKey; - use protocol::{ConnectionType, Peer}; + use protocol::{KadConnectionType, KadPeer}; use rand; // This struct merges a stream and a sink and is quite useful for tests. @@ -441,7 +440,7 @@ mod tests { } } - fn build_test() -> (KademliaServerController, impl Stream, KademliaServerController, impl Stream) { + fn build_test() -> (KadConnecController, impl Stream, KadConnecController, impl Stream) { let (a_to_b, b_from_a) = mpsc::unbounded(); let (b_to_a, a_from_b) = mpsc::unbounded(); @@ -464,7 +463,7 @@ mod tests { let streams = stream_events_a.map(|ev| (ev, "a")) .select(stream_events_b.map(|ev| (ev, "b"))); match streams.into_future().map_err(|(err, _)| err).wait().unwrap() { - (Some((KademliaIncomingRequest::PingPong, "b")), _) => {}, + (Some((KadIncomingRequest::PingPong, "b")), _) => {}, _ => panic!() } } @@ -480,20 +479,20 @@ mod tests { let find_node_fut = controller_a.find_node(&random_peer_id); - let example_response = Peer { + let example_response = KadPeer { node_id: { let buf = (0 .. 1024).map(|_| -> u8 { rand::random() }).collect::>(); PublicKey::Rsa(buf).into_peer_id() }, multiaddrs: Vec::new(), - connection_ty: ConnectionType::Connected, + connection_ty: KadConnectionType::Connected, }; let streams = stream_events_a.map(|ev| (ev, "a")) .select(stream_events_b.map(|ev| (ev, "b"))); let streams = match streams.into_future().map_err(|(err, _)| err).wait().unwrap() { - (Some((KademliaIncomingRequest::FindNode { searched, responder }, "b")), streams) => { + (Some((KadIncomingRequest::FindNode { searched, responder }, "b")), streams) => { assert_eq!(searched, random_peer_id); responder.respond(iter::once(example_response.clone())); streams diff --git a/kad/src/kbucket.rs b/kad/src/kbucket.rs index 6408bd01..7b2f8aa5 100644 --- a/kad/src/kbucket.rs +++ b/kad/src/kbucket.rs @@ -208,6 +208,26 @@ where out.into_iter() } + /// Same as `find_closest`, but includes the local peer as well. + pub fn find_closest_with_self(&self, id: &Id) -> VecIntoIter + where + Id: Clone, + { + // TODO: optimize + let mut intermediate: Vec<_> = self.find_closest(&id).collect(); + if let Some(pos) = intermediate + .iter() + .position(|e| e.distance_with(&id) >= self.my_id.distance_with(&id)) + { + if intermediate[pos] != self.my_id { + intermediate.insert(pos, self.my_id.clone()); + } + } else { + intermediate.push(self.my_id.clone()); + } + intermediate.into_iter() + } + /// Marks the node as "most recent" in its bucket and modifies the value associated to it. /// This function should be called whenever we receive a communication from a node. pub fn update(&self, id: Id, value: Val) -> UpdateOutcome { diff --git a/kad/src/lib.rs b/kad/src/lib.rs index b94d69b1..03ac68d4 100644 --- a/kad/src/lib.rs +++ b/kad/src/lib.rs @@ -24,17 +24,15 @@ //! //! Usage is done in the following steps: //! -//! - Build a `KademliaConfig` that contains the way you want the Kademlia protocol to behave. +//! - Build a `KadSystemConfig` and a `KadConnecConfig` object that contain the way you want the +//! Kademlia protocol to behave. //! -//! - Build a `KademliaControllerPrototype` from that configuration object. +//! - Create a swarm that upgrades incoming connections with the `KadConnecConfig`. //! -//! - Build a `KademliaUpgrade` from that prototype. Then create a swarm (from the *swarm* crate) -//! and pass the `KademliaUpgrade` you built as part of the list of protocols. +//! - Build a `KadSystem` from the `KadSystemConfig`. This requires passing a closure that provides +//! the Kademlia controller of a peer. //! -//! - Then turn the controller prototype into an actual `KademliaController` by passing to it the -//! swarm controller you got. -//! -//! - You can now perform operations using that controller. +//! - You can perform queries using the `KadSystem`. //! // TODO: we allow dead_code for now because this library contains a lot of unused code that will @@ -54,11 +52,8 @@ // will automatically respond to Kad requests received by the remote. The controller lets you // send your own requests to this remote and obtain strongly-typed responses. // -// - The third level of abstraction is in `high_level`. This module also provides a -// `ConnectionUpgrade`, but all the upgraded connections share informations through a struct in -// an `Arc`. The user has a single clonable controller that operates on all the upgraded -// connections. This controller lets you perform peer discovery and record load operations over -// the whole network. +// - The third level of abstraction is in `high_level`. This module only provides the +// `KademliaSystem`. // extern crate arrayvec; @@ -83,14 +78,12 @@ extern crate tokio_io; extern crate tokio_timer; extern crate varint; -pub use self::high_level::{KademliaConfig, KademliaController, KademliaControllerPrototype}; -pub use self::high_level::{KademliaPeerReqStream, KademliaUpgrade, KademliaPeerReq}; -pub use self::protocol::{ConnectionType, Peer}; -pub use self::query::QueryEvent; +pub use self::high_level::{KadSystemConfig, KadSystem, KadQueryEvent}; +pub use self::kad_server::{KadConnecController, KadConnecConfig, KadIncomingRequest, KadFindNodeRespond}; +pub use self::protocol::{KadConnectionType, KadPeer}; mod high_level; mod kad_server; mod kbucket; mod protobuf_structs; mod protocol; -mod query; diff --git a/kad/src/protocol.rs b/kad/src/protocol.rs index 3f8ad791..bdd5c46a 100644 --- a/kad/src/protocol.rs +++ b/kad/src/protocol.rs @@ -37,7 +37,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use varint::VarintCodec; #[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] -pub enum ConnectionType { +pub enum KadConnectionType { /// Sender hasn't tried to connect to peer. NotConnected = 0, /// Sender is currently connected to peer. @@ -48,45 +48,45 @@ pub enum ConnectionType { CannotConnect = 3, } -impl From for ConnectionType { +impl From for KadConnectionType { #[inline] - fn from(raw: protobuf_structs::dht::Message_ConnectionType) -> ConnectionType { + fn from(raw: protobuf_structs::dht::Message_ConnectionType) -> KadConnectionType { use protobuf_structs::dht::Message_ConnectionType::*; match raw { - NOT_CONNECTED => ConnectionType::NotConnected, - CONNECTED => ConnectionType::Connected, - CAN_CONNECT => ConnectionType::CanConnect, - CANNOT_CONNECT => ConnectionType::CannotConnect, + NOT_CONNECTED => KadConnectionType::NotConnected, + CONNECTED => KadConnectionType::Connected, + CAN_CONNECT => KadConnectionType::CanConnect, + CANNOT_CONNECT => KadConnectionType::CannotConnect, } } } -impl Into for ConnectionType { +impl Into for KadConnectionType { #[inline] fn into(self) -> protobuf_structs::dht::Message_ConnectionType { use protobuf_structs::dht::Message_ConnectionType::*; match self { - ConnectionType::NotConnected => NOT_CONNECTED, - ConnectionType::Connected => CONNECTED, - ConnectionType::CanConnect => CAN_CONNECT, - ConnectionType::CannotConnect => CANNOT_CONNECT, + KadConnectionType::NotConnected => NOT_CONNECTED, + KadConnectionType::Connected => CONNECTED, + KadConnectionType::CanConnect => CAN_CONNECT, + KadConnectionType::CannotConnect => CANNOT_CONNECT, } } } /// Information about a peer, as known by the sender. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct Peer { +pub struct KadPeer { pub node_id: PeerId, /// The multiaddresses that are known for that peer. pub multiaddrs: Vec, - pub connection_ty: ConnectionType, + pub connection_ty: KadConnectionType, } -impl Peer { - // Builds a `Peer` from its raw protobuf equivalent. +impl KadPeer { + // Builds a `KadPeer` from its raw protobuf equivalent. // TODO: use TryFrom once stable - fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result { + fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result { // TODO: this is in fact a CID ; not sure if this should be handled in `from_bytes` or // as a special case here let node_id = PeerId::from_bytes(peer.get_id().to_vec()) @@ -102,7 +102,7 @@ impl Peer { let connection_ty = peer.get_connection().into(); - Ok(Peer { + Ok(KadPeer { node_id: node_id, multiaddrs: addrs, connection_ty: connection_ty, @@ -110,7 +110,7 @@ impl Peer { } } -impl Into for Peer { +impl Into for KadPeer { fn into(self) -> protobuf_structs::dht::Message_Peer { let mut out = protobuf_structs::dht::Message_Peer::new(); out.set_id(self.node_id.into_bytes()); @@ -190,7 +190,7 @@ pub enum KadMsg { /// Identifier of the returned record. key: Vec, record: (), //record: Option, // TODO: no - closer_peers: Vec, + closer_peers: Vec, }, /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes /// returned is not specified, but should be around 20. @@ -201,7 +201,7 @@ pub enum KadMsg { /// Response to a `FindNodeReq`. FindNodeRes { /// Results of the request. - closer_peers: Vec, + closer_peers: Vec, }, } @@ -237,6 +237,7 @@ fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message { } KadMsg::FindNodeRes { closer_peers } => { // TODO: if empty, the remote will think it's a request + // TODO: not good, possibly exposed in the API assert!(!closer_peers.is_empty()); let mut msg = protobuf_structs::dht::Message::new(); msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE); @@ -280,7 +281,7 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result>(); Ok(KadMsg::FindNodeRes { @@ -309,7 +310,7 @@ mod tests { use self::libp2p_tcp_transport::TcpConfig; use futures::{Future, Sink, Stream}; use libp2p_core::{Transport, PeerId, PublicKey}; - use protocol::{ConnectionType, KadMsg, KademliaProtocolConfig, Peer}; + use protocol::{KadConnectionType, KadMsg, KademliaProtocolConfig, KadPeer}; use std::sync::mpsc; use std::thread; @@ -331,10 +332,10 @@ mod tests { }); test_one(KadMsg::FindNodeRes { closer_peers: vec![ - Peer { + KadPeer { node_id: PeerId::from_public_key(PublicKey::Rsa(vec![93, 80, 12, 250])), multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()], - connection_ty: ConnectionType::Connected, + connection_ty: KadConnectionType::Connected, }, ], }); diff --git a/libp2p/examples/kademlia.rs b/libp2p/examples/kademlia.rs index c8da2d37..0b79464e 100644 --- a/libp2p/examples/kademlia.rs +++ b/libp2p/examples/kademlia.rs @@ -30,12 +30,14 @@ use bigint::U512; use futures::{Future, Stream}; use libp2p::peerstore::{PeerAccess, PeerId, Peerstore}; use libp2p::Multiaddr; +use std::collections::HashMap; use std::env; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; -use libp2p::core::{Transport, PublicKey}; +use libp2p::core::{Transport, PublicKey, UniqueConnec}; use libp2p::core::{upgrade, either::EitherOutput}; -use libp2p::kad::{ConnectionType, Peer, QueryEvent}; +use libp2p::kad::{KadConnecConfig, KadConnectionType, KadPeer, KadQueryEvent, KadSystem}; +use libp2p::kad::{KadSystemConfig, KadIncomingRequest}; use libp2p::tcp::TcpConfig; fn main() { @@ -116,53 +118,69 @@ fn main() { let my_peer_id = PeerId::from_public_key(PublicKey::Rsa(include_bytes!("test-rsa-public-key.der").to_vec())); println!("Local peer id is: {:?}", my_peer_id); - // Let's put this `transport` into a Kademlia *swarm*. The swarm will handle all the incoming - // and outgoing connections for us. - let kad_config = libp2p::kad::KademliaConfig { + let kad_system = Arc::new(KadSystem::without_init(KadSystemConfig { parallelism: 3, local_peer_id: my_peer_id.clone(), - timeout: Duration::from_secs(2), - }; + kbuckets_timeout: Duration::from_secs(10), + request_timeout: Duration::from_secs(10), + known_initial_peers: peer_store.peers(), + })); - let kad_ctl_proto = libp2p::kad::KademliaControllerPrototype::new(kad_config, peer_store.peers()); - - let proto = libp2p::kad::KademliaUpgrade::from_prototype(&kad_ctl_proto); + let active_kad_connections = Arc::new(Mutex::new(HashMap::<_, UniqueConnec<_>>::new())); // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and // outgoing connections for us. let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(proto.clone()), + transport.clone().with_upgrade(KadConnecConfig::new()), { let peer_store = peer_store.clone(); - move |kademlia_stream, _| { + let kad_system = kad_system.clone(); + let active_kad_connections = active_kad_connections.clone(); + move |(kad_ctrl, kad_stream), node_addr| { let peer_store = peer_store.clone(); - kademlia_stream.for_each(move |req| { - let peer_store = peer_store.clone(); - let result = req - .requested_peers() - .map(move |peer_id| { - let addrs = peer_store - .peer(peer_id) - .into_iter() - .flat_map(|p| p.addrs()) - .collect::>(); - Peer { - node_id: peer_id.clone(), - multiaddrs: addrs, - connection_ty: ConnectionType::Connected, // meh :-/ + let kad_system = kad_system.clone(); + let active_kad_connections = active_kad_connections.clone(); + node_addr.and_then(move |node_addr| { + let node_id = p2p_multiaddr_to_node_id(node_addr); + let node_id2 = node_id.clone(); + let fut = kad_stream.for_each(move |req| { + let peer_store = peer_store.clone(); + kad_system.update_kbuckets(node_id2.clone()); + match req { + KadIncomingRequest::FindNode { searched, responder } => { + let result = kad_system + .known_closest_peers(&searched) + .map(move |peer_id| { + let addrs = peer_store + .peer(&peer_id) + .into_iter() + .flat_map(|p| p.addrs()) + .collect::>(); + KadPeer { + node_id: peer_id.clone(), + multiaddrs: addrs, + connection_ty: KadConnectionType::Connected, // meh :-/ + } + }) + .collect::>(); + responder.respond(result); + }, + KadIncomingRequest::PingPong => { } - }) - .collect::>(); - req.respond(result); - Ok(()) + }; + Ok(()) + }); + + let mut active_kad_connections = active_kad_connections.lock().unwrap(); + active_kad_connections + .entry(node_id) + .or_insert_with(Default::default) + .set_until(kad_ctrl, fut) }) } } ); - let (kad_controller, _kad_init) = - kad_ctl_proto.start(swarm_controller.clone(), transport.with_upgrade(proto), |out| out); - for listen_addr in listen_addrs { let addr = swarm_controller .listen_on(listen_addr.parse().expect("invalid multiaddr")) @@ -170,18 +188,23 @@ fn main() { println!("Now listening on {:?}", addr); } - let finish_enum = kad_controller - .find_node(my_peer_id.clone()) + let finish_enum = kad_system + .find_node(my_peer_id.clone(), |peer| { + let addr = Multiaddr::from(libp2p::multiaddr::AddrComponent::P2P(peer.clone().into_bytes())); + active_kad_connections.lock().unwrap().entry(peer.clone()) + .or_insert_with(Default::default) + .get_or_dial(&swarm_controller, &addr, transport.clone().with_upgrade(KadConnecConfig::new())) + }) .filter_map(move |event| { match event { - QueryEvent::NewKnownMultiaddrs(peers) => { + KadQueryEvent::NewKnownMultiaddrs(peers) => { for (peer, addrs) in peers { peer_store.peer_or_create(&peer) .add_addrs(addrs, Duration::from_secs(3600)); } None }, - QueryEvent::Finished(out) => Some(out), + KadQueryEvent::Finished(out) => Some(out), } }) .into_future() @@ -209,13 +232,29 @@ fn main() { ).unwrap(); } +/// Expects a multiaddr of the format `/p2p/` and returns the node ID. +/// Panics if the format is not correct. +fn p2p_multiaddr_to_node_id(client_addr: Multiaddr) -> PeerId { + let (first, second); + { + let mut iter = client_addr.iter(); + first = iter.next(); + second = iter.next(); + } + match (first, second) { + (Some(libp2p::multiaddr::AddrComponent::P2P(node_id)), None) => + PeerId::from_bytes(node_id).expect("libp2p always reports a valid node id"), + _ => panic!("Reported multiaddress is in the wrong format ; programmer error") + } +} + /// Stores initial addresses on the given peer store. Uses a very large timeout. pub fn ipfs_bootstrap

(peer_store: P) where P: Peerstore + Clone, { const ADDRESSES: &[&str] = &[ - "/ip4/127.0.0.1/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", + "/ip4/127.0.0.1/tcp/4001/ipfs/QmQRx32wQkw3hB45j4UDw8V9Ju4mGbxMyhs2m8mpFrFkur", // TODO: add some bootstrap nodes here ];