From 68299c40a583d92f4dc5f055313871565f135ee0 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 15 Mar 2018 15:18:21 +0100 Subject: [PATCH] Implement Kademlia peer discovery (#120) * Impl Clone for SwarmController and remove 'static * Implement Kademlia * Implement ConnectionReuse correctly * Implement ConnectionReuse correctly * Add some tests and fixes * Remove useless boolean in active_connections * Correctly run tests * Optimize the processing * Rustfmt on libp2p-kad * Improve Kademlia example * Next incoming is now in two steps * Some work * Remove log * Fix dialing a node even if we already have a connection * Add a proper PeerId to Peerstore * Turn identify into a transport layer * Expose the dialed multiaddress * Add identified nodes to the peerstore * Allow configuring the TTL of the addresses * Split identify in two modules * Some comments and tweaks * Run rustfmt * More work * Add test and bugfix * Fix everything * Start transition to new identify system * More work * Minor style * Start implementation of Kademlia server upgrade * Continue implementing the Kademlia server * Start reimplementing high-level kademlia code * Continue reimplementing high-level code * More work * More work * More work * Fix wrong address reported when dialing * Make it work * Remove cluster_level field everywhere * Fix bug in varint-rs when encoding * More work * More work * More work * More work * More work * Bugfix * More work * Implement ping * Style in kademlia_handler * More work * Better error handling in query.rs * More work * More work * More work * Debug impls * Some cleanup in swarm * More work * Clean up changes in swarm * Unpublish the kbucket module * Fix examples and some warnings * Fix websocket browser code * Rustfmt on libp2p-kad * Kad initialization process * Add logging to the example * Fix concerns * Fix style --- Cargo.toml | 1 + example/Cargo.toml | 3 + example/examples/kademlia.rs | 151 ++++ example/src/lib.rs | 19 +- libp2p-kad/Cargo.toml | 30 + libp2p-kad/dht.proto | 63 ++ libp2p-kad/record.proto | 21 + libp2p-kad/regen_dht_proto.sh | 13 + libp2p-kad/src/high_level.rs | 488 +++++++++++ libp2p-kad/src/kad_server.rs | 440 ++++++++++ libp2p-kad/src/kbucket.rs | 474 +++++++++++ libp2p-kad/src/lib.rs | 92 ++ libp2p-kad/src/protobuf_structs/dht.rs | 990 ++++++++++++++++++++++ libp2p-kad/src/protobuf_structs/mod.rs | 22 + libp2p-kad/src/protobuf_structs/record.rs | 498 +++++++++++ libp2p-kad/src/protocol.rs | 382 +++++++++ libp2p-kad/src/query.rs | 336 ++++++++ libp2p-swarm/src/connection_reuse.rs | 1 + libp2p-swarm/src/swarm.rs | 13 + varint-rs/src/lib.rs | 1 + 20 files changed, 4026 insertions(+), 12 deletions(-) create mode 100644 example/examples/kademlia.rs create mode 100644 libp2p-kad/Cargo.toml create mode 100644 libp2p-kad/dht.proto create mode 100644 libp2p-kad/record.proto create mode 100755 libp2p-kad/regen_dht_proto.sh create mode 100644 libp2p-kad/src/high_level.rs create mode 100644 libp2p-kad/src/kad_server.rs create mode 100644 libp2p-kad/src/kbucket.rs create mode 100644 libp2p-kad/src/lib.rs create mode 100644 libp2p-kad/src/protobuf_structs/dht.rs create mode 100644 libp2p-kad/src/protobuf_structs/mod.rs create mode 100644 libp2p-kad/src/protobuf_structs/record.rs create mode 100644 libp2p-kad/src/protocol.rs create mode 100644 libp2p-kad/src/query.rs diff --git a/Cargo.toml b/Cargo.toml index 21420794..b1a8ce88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "libp2p-dns", "libp2p-floodsub", "libp2p-identify", + "libp2p-kad", "libp2p-peerstore", "libp2p-ping", "libp2p-secio", diff --git a/example/Cargo.toml b/example/Cargo.toml index 43d33b58..49bc71f1 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -4,11 +4,14 @@ version = "0.1.0" authors = ["pierre "] [dependencies] +bigint = "4.2" bytes = "0.4" env_logger = "0.5.3" futures = "0.1" multiaddr = "0.2" multiplex = { path = "../multiplex-rs" } +libp2p-identify = { path = "../libp2p-identify" } +libp2p-kad = { path = "../libp2p-kad" } libp2p-floodsub = { path = "../libp2p-floodsub" } libp2p-peerstore = { path = "../libp2p-peerstore" } libp2p-ping = { path = "../libp2p-ping" } diff --git a/example/examples/kademlia.rs b/example/examples/kademlia.rs new file mode 100644 index 00000000..f30b9a52 --- /dev/null +++ b/example/examples/kademlia.rs @@ -0,0 +1,151 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate bigint; +extern crate bytes; +extern crate env_logger; +extern crate example; +extern crate futures; +extern crate libp2p_identify as identify; +extern crate libp2p_kad as kad; +extern crate libp2p_peerstore as peerstore; +extern crate libp2p_secio as secio; +extern crate libp2p_swarm as swarm; +extern crate libp2p_tcp_transport as tcp; +extern crate multiplex; +extern crate tokio_core; +extern crate tokio_io; + +use bigint::U512; +use futures::future::Future; +use peerstore::PeerId; +use std::env; +use std::sync::Arc; +use std::time::Duration; +use swarm::{Transport, UpgradeExt}; +use tcp::TcpConfig; +use tokio_core::reactor::Core; + +fn main() { + env_logger::init(); + + // Determine which addresses to listen to. + let listen_addrs = { + let mut args = env::args().skip(1).collect::>(); + if args.is_empty() { + args.push("/ip4/0.0.0.0/tcp/0".to_owned()); + } + args + }; + + // We start by building the tokio engine that will run all the sockets. + let mut core = Core::new().unwrap(); + + let peer_store = Arc::new(peerstore::memory_peerstore::MemoryPeerstore::empty()); + example::ipfs_bootstrap(&*peer_store); + + // Now let's build the transport stack. + // We create a `TcpConfig` that indicates that we want TCP/IP. + let transport = identify::IdentifyTransport::new( + TcpConfig::new(core.handle()) + + // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, + // depending on which one the remote supports. + .with_upgrade({ + let plain_text = swarm::PlainTextConfig; + + let secio = { + let private_key = include_bytes!("test-private-key.pk8"); + let public_key = include_bytes!("test-public-key.der").to_vec(); + secio::SecioConfig { + key: secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap(), + } + }; + + plain_text.or_upgrade(secio) + }) + + // On top of plaintext or secio, we will use the multiplex protocol. + .with_upgrade(multiplex::MultiplexConfig) + // The object returned by the call to `with_upgrade(MultiplexConfig)` can't be used as a + // `Transport` because the output of the upgrade is not a stream but a controller for + // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into + // a `Transport`. + .into_connection_reuse(), + peer_store.clone(), + ); + + // We now have a `transport` variable that can be used either to dial nodes or listen to + // incoming connections, and that will automatically apply secio and multiplex on top + // of any opened stream. + + let my_peer_id = PeerId::from_public_key(include_bytes!("test-public-key.der")); + println!("Local peer id is: {:?}", my_peer_id); + + // Let's put this `transport` into a Kademlia *swarm*. The swarm will handle all the incoming + // and outgoing connections for us. + let kad_config = kad::KademliaConfig { + parallelism: 3, + record_store: (), + peer_store: peer_store, + local_peer_id: my_peer_id.clone(), + timeout: Duration::from_secs(2), + }; + + let kad_ctl_proto = kad::KademliaControllerPrototype::new(kad_config); + + let proto = kad::KademliaUpgrade::from_prototype(&kad_ctl_proto); + + // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and + // outgoing connections for us. + let (swarm_controller, swarm_future) = swarm::swarm(transport, proto, |upgrade, _| upgrade); + + let (kad_controller, _kad_init) = kad_ctl_proto.start(swarm_controller.clone()); + + for listen_addr in listen_addrs { + let addr = swarm_controller + .listen_on(listen_addr.parse().expect("invalid multiaddr")) + .expect("unsupported multiaddr"); + println!("Now listening on {:?}", addr); + } + + let finish_enum = kad_controller + .find_node(my_peer_id.clone()) + .and_then(|out| { + let local_hash = U512::from(my_peer_id.hash()); + println!("Results of peer discovery for {:?}:", my_peer_id); + for n in out { + let other_hash = U512::from(n.hash()); + let dist = 512 - (local_hash ^ other_hash).leading_zeros(); + println!("* {:?} (distance bits = {:?} (lower is better))", n, dist); + } + Ok(()) + }); + + // `swarm_future` is a future that contains all the behaviour that we want, but nothing has + // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the + // future through the tokio core. + core.run( + finish_enum + .select(swarm_future) + .map(|(n, _)| n) + .map_err(|(err, _)| err), + ).unwrap(); +} diff --git a/example/src/lib.rs b/example/src/lib.rs index 0190897a..cddc4c25 100644 --- a/example/src/lib.rs +++ b/example/src/lib.rs @@ -32,15 +32,8 @@ where P: Peerstore + Clone, { const ADDRESSES: &[&str] = &[ - "/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", - "/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM", - "/ip4/162.243.248.213/tcp/4001/ipfs/QmSoLueR4xBeUbY9WZ9xGUUxunbKWcrNFTDAadQJmocnWm", - "/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu", - "/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", - "/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", - "/ip4/178.62.61.185/tcp/4001/ipfs/QmSoLMeWqB7YGVLJN3pNLQpmmEk35v6wYtsMGLzSr5QBU3", - "/dns4/wss0.bootstrap.libp2p.io/tcp/443/wss/ipfs/QmZMxNdpMkewiVZLMRxaNxUeZpDUb34pWjZ1kZvsd16Zic", - "/dns4/wss1.bootstrap.libp2p.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6" + "/ip4/127.0.0.1/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", + // TODO: add some bootstrap nodes here ]; let ttl = Duration::from_secs(100 * 365 * 24 * 3600); @@ -51,14 +44,16 @@ where .expect("failed to parse hard-coded multiaddr"); let ipfs_component = multiaddr.pop().expect("hard-coded multiaddr is empty"); - let public_key = match ipfs_component { - multiaddr::AddrComponent::IPFS(key) => key, + let peer = match ipfs_component { + multiaddr::AddrComponent::IPFS(key) => { + PeerId::from_bytes(key).expect("invalid peer id") + } _ => panic!("hard-coded multiaddr didn't end with /ipfs/"), }; peer_store .clone() - .peer_or_create(&PeerId::from_bytes(public_key).unwrap()) + .peer_or_create(&peer) .add_addr(multiaddr, ttl.clone()); } } diff --git a/libp2p-kad/Cargo.toml b/libp2p-kad/Cargo.toml new file mode 100644 index 00000000..3b0b83d4 --- /dev/null +++ b/libp2p-kad/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "libp2p-kad" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +arrayvec = "0.4.7" +base58 = "0.1.0" +bigint = "4.2" +bytes = "0.4" +datastore = { path = "../datastore" } +fnv = "1.0" +futures = "0.1" +libp2p-identify = { path = "../libp2p-identify" } +libp2p-peerstore = { path = "../libp2p-peerstore" } +libp2p-ping = { path = "../libp2p-ping" } +libp2p-swarm = { path = "../libp2p-swarm" } +multiaddr = "0.2" +parking_lot = "0.5.1" +protobuf = "1.4.2" +rand = "0.4.2" +smallvec = "0.5" +tokio-io = "0.1" +tokio-timer = "0.1.2" +varint = { path = "../varint-rs" } + +[dev-dependencies] +libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +rand = "0.4.2" +tokio-core = "0.1" diff --git a/libp2p-kad/dht.proto b/libp2p-kad/dht.proto new file mode 100644 index 00000000..48640639 --- /dev/null +++ b/libp2p-kad/dht.proto @@ -0,0 +1,63 @@ +syntax = "proto2"; +package dht.pb; + +import "record.proto"; + +message Message { + enum MessageType { + PUT_VALUE = 0; + GET_VALUE = 1; + ADD_PROVIDER = 2; + GET_PROVIDERS = 3; + FIND_NODE = 4; + PING = 5; + } + + enum ConnectionType { + // sender does not have a connection to peer, and no extra information (default) + NOT_CONNECTED = 0; + + // sender has a live connection to peer + CONNECTED = 1; + + // sender recently connected to peer + CAN_CONNECT = 2; + + // sender recently tried to connect to peer repeatedly but failed to connect + // ("try" here is loose, but this should signal "made strong effort, failed") + CANNOT_CONNECT = 3; + } + + message Peer { + // ID of a given peer. + optional bytes id = 1; + + // multiaddrs for a given peer + repeated bytes addrs = 2; + + // used to signal the sender's connection capabilities to the peer + optional ConnectionType connection = 3; + } + + // defines what type of message it is. + optional MessageType type = 1; + + // defines what coral cluster level this query/response belongs to. + optional int32 clusterLevelRaw = 10; + + // Used to specify the key associated with this message. + // PUT_VALUE, GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + optional bytes key = 2; + + // Used to return a value + // PUT_VALUE, GET_VALUE + optional record.pb.Record record = 3; + + // Used to return peers closer to a key in a query + // GET_VALUE, GET_PROVIDERS, FIND_NODE + repeated Peer closerPeers = 8; + + // Used to return Providers + // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS + repeated Peer providerPeers = 9; +} diff --git a/libp2p-kad/record.proto b/libp2p-kad/record.proto new file mode 100644 index 00000000..d31fc914 --- /dev/null +++ b/libp2p-kad/record.proto @@ -0,0 +1,21 @@ +syntax = "proto2"; +package record.pb; + +// Record represents a dht record that contains a value +// for a key value pair +message Record { + // The key that references this record + optional string key = 1; + + // The actual value this record is storing + optional bytes value = 2; + + // hash of the authors public key + optional string author = 3; + + // A PKI signature for the key+value+author + optional bytes signature = 4; + + // Time the record was received, set by receiver + optional string timeReceived = 5; +} diff --git a/libp2p-kad/regen_dht_proto.sh b/libp2p-kad/regen_dht_proto.sh new file mode 100755 index 00000000..f8629bb7 --- /dev/null +++ b/libp2p-kad/regen_dht_proto.sh @@ -0,0 +1,13 @@ +#!/bin/sh + +# This script regenerates the `src/dht_proto.rs` file from `dht.proto`. + +docker run --rm -v `pwd`:/usr/code:z -w /usr/code rust /bin/bash -c " \ + apt-get update; \ + apt-get install -y protobuf-compiler; \ + cargo install protobuf; \ + protoc --rust_out . dht.proto;\ + protoc --rust_out . record.proto" + +mv -f dht.rs ./src/protobuf_structs/dht.rs +mv -f record.rs ./src/protobuf_structs/record.rs diff --git a/libp2p-kad/src/high_level.rs b/libp2p-kad/src/high_level.rs new file mode 100644 index 00000000..13069e9b --- /dev/null +++ b/libp2p-kad/src/high_level.rs @@ -0,0 +1,488 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! High-level structs/traits of the crate. +//! +//! Lies on top of the `kad_server` module. + +use bytes::Bytes; +use fnv::FnvHashMap; +use futures::{self, future, Future}; +use futures::sync::oneshot; +use kad_server::{KadServerInterface, KademliaServerConfig, KademliaServerController}; +use kbucket::{KBucketsTable, UpdateOutcome}; +use libp2p_peerstore::{PeerAccess, PeerId, Peerstore}; +use libp2p_swarm::{Endpoint, MuxedTransport, SwarmController}; +use libp2p_swarm::ConnectionUpgrade; +use multiaddr::Multiaddr; +use parking_lot::Mutex; +use query; +use std::collections::hash_map::Entry; +use std::fmt; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::iter; +use std::ops::Deref; +use std::sync::Arc; +use std::time::Duration; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer; + +/// Prototype for a future Kademlia protocol running on a socket. +#[derive(Debug, Clone)] +pub struct KademliaConfig { + /// Degree of parallelism on the network. Often called `alpha` in technical papers. + /// No more than this number of remotes will be used at a given time for any given operation. + // TODO: ^ share this number between operations? or does each operation use `alpha` remotes? + pub parallelism: u32, + /// Used to load and store data requests of peers. + // TODO: say that must implement the `Recordstore` trait. + pub record_store: R, + /// Used to load and store information about peers. + pub peer_store: P, + /// Id of the local peer. + pub local_peer_id: PeerId, + /// When contacting a node, duration after which we consider it unresponsive. + pub timeout: Duration, +} + +/// Object that allows one to make queries on the Kademlia system. +#[derive(Debug)] +pub struct KademliaControllerPrototype { + inner: Arc>, +} + +impl KademliaControllerPrototype +where + P: Deref, + for<'r> &'r Pc: Peerstore, +{ + /// Creates a new controller from that configuration. + pub fn new(config: KademliaConfig) -> KademliaControllerPrototype { + let buckets = KBucketsTable::new(config.local_peer_id.clone(), config.timeout); + for peer_id in config.peer_store.deref().peers() { + let _ = buckets.update(peer_id, ()); + } + + let inner = Arc::new(Inner { + kbuckets: buckets, + timer: tokio_timer::wheel().build(), + record_store: config.record_store, + peer_store: config.peer_store, + connections: Default::default(), + timeout: config.timeout, + parallelism: config.parallelism as usize, + }); + + KademliaControllerPrototype { inner: inner } + } + + /// Turns the prototype into an actual controller by feeding it a swarm. + pub fn start( + self, + swarm: SwarmController, + ) -> ( + KademliaController, + Box>, + ) + where + P: Clone + Deref + 'static, // TODO: 'static :-/ + for<'r> &'r Pc: Peerstore, + R: Clone + 'static, // TODO: 'static :-/ + T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ + C: Clone + ConnectionUpgrade + 'static, // TODO: 'static :-/ + C::NamesIter: Clone, + C::Output: From, + { + // TODO: initialization + + let controller = KademliaController { + inner: self.inner.clone(), + swarm_controller: swarm, + }; + + let init_future = { + let futures: Vec<_> = (0..256) + .map(|n| query::refresh(controller.clone(), n)) + .collect(); + + future::loop_fn(futures, |futures| { + if futures.is_empty() { + let fut = future::ok(future::Loop::Break(())); + return future::Either::A(fut); + } + + let fut = future::select_all(futures) + .map_err(|(err, _, _)| err) + .map(|(_, _, rest)| future::Loop::Continue(rest)); + future::Either::B(fut) + }) + }; + + (controller, Box::new(init_future)) + } +} + +/// Object that allows one to make queries on the Kademlia system. +#[derive(Debug)] +pub struct KademliaController +where + T: MuxedTransport + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + 'static, // TODO: 'static :-/ +{ + inner: Arc>, + swarm_controller: SwarmController, +} + +impl Clone for KademliaController +where + T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ + C: Clone + ConnectionUpgrade + 'static, // TODO: 'static :-/ +{ + #[inline] + fn clone(&self) -> Self { + KademliaController { + inner: self.inner.clone(), + swarm_controller: self.swarm_controller.clone(), + } + } +} + +impl KademliaController +where + P: Deref, + for<'r> &'r Pc: Peerstore, + R: Clone, + T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ + C: Clone + ConnectionUpgrade + 'static, // TODO: 'static :-/ +{ + /// Performs an iterative find node query on the network. + /// + /// Will query the network for the peers that are the closest to `searched_key` and return + /// the results. + /// + /// The algorithm used is a standard Kademlia algorithm. The details are not documented, so + /// that the implementation is free to modify them. + #[inline] + pub fn find_node( + &self, + searched_key: PeerId, + ) -> Box, Error = IoError>> + where + P: Clone + 'static, + R: 'static, + C::NamesIter: Clone, + C::Output: From, + { + query::find_node(self.clone(), searched_key) + } +} + +/// Connection upgrade to the Kademlia protocol. +#[derive(Clone)] +pub struct KademliaUpgrade { + inner: Arc>, + upgrade: KademliaServerConfig>>, +} + +impl KademliaUpgrade { + /// Builds a connection upgrade from the controller. + #[inline] + pub fn from_prototype(proto: &KademliaControllerPrototype) -> Self { + KademliaUpgrade { + inner: proto.inner.clone(), + upgrade: KademliaServerConfig::new(proto.inner.clone()), + } + } + + /// Builds a connection upgrade from the controller. + #[inline] + pub fn from_controller(ctl: &KademliaController) -> Self + where + T: MuxedTransport, + C: ConnectionUpgrade, + { + KademliaUpgrade { + inner: ctl.inner.clone(), + upgrade: KademliaServerConfig::new(ctl.inner.clone()), + } + } +} + +impl ConnectionUpgrade for KademliaUpgrade +where + C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ + P: Deref + Clone + 'static, // TODO: 'static :-/ + for<'r> &'r Pc: Peerstore, + R: 'static, // TODO: 'static :-/ +{ + type Output = KademliaProcessingFuture; + type Future = Box>; + type NamesIter = iter::Once<(Bytes, ())>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + ConnectionUpgrade::::protocol_names(&self.upgrade) + } + + #[inline] + fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: &Multiaddr) -> Self::Future { + let inner = self.inner; + let client_addr = addr.clone(); + + let future = self.upgrade.upgrade(incoming, id, endpoint, addr).map( + move |(controller, future)| { + match inner.connections.lock().entry(client_addr) { + Entry::Occupied(mut entry) => { + match entry.insert(Connection::Active(controller)) { + // If there was already an active connection to this remote, it gets + // replaced by the new more recent one. + Connection::Active(_old_connection) => {} + Connection::Pending(closures) => { + let new_ctl = match entry.get_mut() { + &mut Connection::Active(ref mut ctl) => ctl, + _ => unreachable!( + "logic error: an Active enum variant was \ + inserted, but reading back didn't give \ + an Active" + ), + }; + + for mut closure in closures { + closure(new_ctl); + } + } + }; + } + Entry::Vacant(entry) => { + entry.insert(Connection::Active(controller)); + } + }; + + KademliaProcessingFuture { inner: future } + }, + ); + + Box::new(future) as Box<_> + } +} + +/// Future that must be processed for the Kademlia system to work. +pub struct KademliaProcessingFuture { + inner: Box>, +} + +impl Future for KademliaProcessingFuture { + type Item = (); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> futures::Poll { + self.inner.poll() + } +} + +// Inner struct shared throughout the Kademlia system. +#[derive(Debug)] +struct Inner { + // The remotes are identified by their public keys. + kbuckets: KBucketsTable, + + // Timer used for building the timeouts. + timer: tokio_timer::Timer, + + // Same as in the config. + timeout: Duration, + + // Same as in the config. + parallelism: usize, + + // Same as in the config. + record_store: R, + + // Same as in the config. + peer_store: P, + + // List of open connections with remotes. + // + // Since the keys are the nodes' multiaddress, it is expected that each node only has one + // multiaddress. This should be the case if the user uses the identify transport that + // automatically maps peer IDs to multiaddresses. + // TODO: is it correct to use FnvHashMap with a Multiaddr? needs benchmarks + connections: Mutex>, +} + +// Current state of a connection to a specific multiaddr. +// +// There is no `Inactive` entry, as an inactive connection corresponds to no entry in the +// `connections` hash map. +enum Connection { + // The connection has already been opened and is ready to be controlled through the given + // controller. + Active(KademliaServerController), + + // The connection is in the process of being opened. Any closure added to this `Vec` will be + // executed on the controller once it is available. + // Once the connection is open, it will be replaced with `Active`. + // TODO: should be FnOnce once Rust allows that + Pending(Vec>), +} + +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Connection::Active(_) => write!(f, "Connection::Active"), + Connection::Pending(_) => write!(f, "Connection::Pending"), + } + } +} + +impl KadServerInterface for Arc> +where + P: Deref, + for<'r> &'r Pc: Peerstore, +{ + #[inline] + fn local_id(&self) -> &PeerId { + self.kbuckets.my_id() + } + + #[inline] + fn kbuckets_update(&self, peer: &PeerId) { + // TODO: is this the right place for this check? + if peer == self.kbuckets.my_id() { + return; + } + + match self.kbuckets.update(peer.clone(), ()) { + UpdateOutcome::NeedPing(node_to_ping) => { + // TODO: return this info somehow + println!("need to ping {:?}", node_to_ping); + } + _ => (), + } + } + + #[inline] + fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec { + self.kbuckets.find_closest(addr).collect() + } +} + +impl query::QueryInterface for KademliaController +where + P: Clone + Deref + 'static, // TODO: 'static :-/ + for<'r> &'r Pc: Peerstore, + R: Clone + 'static, // TODO: 'static :-/ + T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ + C: Clone + ConnectionUpgrade + 'static, // TODO: 'static :-/ + C::NamesIter: Clone, + C::Output: From, +{ + #[inline] + fn local_id(&self) -> &PeerId { + self.inner.kbuckets.my_id() + } + + #[inline] + fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec { + self.inner.kbuckets.find_closest(addr).collect() + } + + #[inline] + fn peer_add_addrs(&self, peer: &PeerId, multiaddrs: I, ttl: Duration) + where + I: Iterator, + { + self.inner + .peer_store + .peer_or_create(peer) + .add_addrs(multiaddrs, ttl); + } + + #[inline] + fn parallelism(&self) -> usize { + self.inner.parallelism + } + + #[inline] + fn send( + &self, + addr: Multiaddr, + and_then: F, + ) -> Box> + where + F: FnOnce(&KademliaServerController) -> FRet + 'static, + FRet: 'static, + { + let mut lock = self.inner.connections.lock(); + + let pending_list = match lock.entry(addr.clone()) { + Entry::Occupied(entry) => { + match entry.into_mut() { + &mut Connection::Pending(ref mut list) => list, + &mut Connection::Active(ref mut ctrl) => { + // If we have an active connection, entirely short-circuit the function. + let output = future::ok(and_then(ctrl)); + return Box::new(output) as Box<_>; + } + } + } + Entry::Vacant(entry) => { + // Need to open a connection. + let proto = KademliaUpgrade { + inner: self.inner.clone(), + upgrade: KademliaServerConfig::new(self.inner.clone()), + }; + match self.swarm_controller.dial_to_handler(addr, proto) { + Ok(()) => (), + Err(_addr) => { + let fut = future::err(IoError::new( + IoErrorKind::InvalidData, + "unsupported multiaddress", + )); + return Box::new(fut) as Box<_>; + } + } + match entry.insert(Connection::Pending(Vec::with_capacity(1))) { + &mut Connection::Pending(ref mut list) => list, + _ => unreachable!("we just inserted a Pending variant"), + } + } + }; + + let (tx, rx) = oneshot::channel(); + let mut tx = Some(tx); + let mut and_then = Some(and_then); + pending_list.push(Box::new(move |ctrl: &mut KademliaServerController| { + let and_then = and_then + .take() + .expect("Programmer error: 'pending' closure was called multiple times"); + let tx = tx.take() + .expect("Programmer error: 'pending' closure was called multiple times"); + let ret = and_then(ctrl); + let _ = tx.send(ret); + }) as Box<_>); + + let future = rx.map_err(|_| IoErrorKind::ConnectionAborted.into()); + let future = self.inner.timer.timeout(future, self.inner.timeout); + Box::new(future) as Box<_> + } +} diff --git a/libp2p-kad/src/kad_server.rs b/libp2p-kad/src/kad_server.rs new file mode 100644 index 00000000..d2ecfae3 --- /dev/null +++ b/libp2p-kad/src/kad_server.rs @@ -0,0 +1,440 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Contains a `ConnectionUpgrade` that makes it possible to send requests and receive responses +//! from nodes after the upgrade. +//! +//! # Usage +//! +//! - Implement the `KadServerInterface` trait on something clonable (usually an `Arc`). +//! +//! - Create a `KademliaServerConfig` object from that interface. This struct implements +//! `ConnectionUpgrade`. +//! +//! - Update a connection through that `KademliaServerConfig`. The output yields you a +//! `KademliaServerController` and a future that must be driven to completion. The controller +//! allows you to perform queries and receive responses. +//! +//! This `KademliaServerController` is usually extracted and stored in some sort of hash map in an +//! `Arc` in order to be available whenever we need to request something from a node. + +use bytes::Bytes; +use futures::{future, Future, Sink, Stream}; +use futures::sync::{mpsc, oneshot}; +use libp2p_peerstore::PeerId; +use libp2p_swarm::ConnectionUpgrade; +use libp2p_swarm::Endpoint; +use multiaddr::{AddrComponent, Multiaddr}; +use protocol::{self, KadMsg, KademliaProtocolConfig, Peer}; +use std::collections::VecDeque; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::iter; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Interface that this server system uses to communicate with the rest of the system. +pub trait KadServerInterface: Clone { + /// Returns the peer ID of the local node. + fn local_id(&self) -> &PeerId; + + /// Updates an entry in the K-Buckets. Called whenever that peer sends us a message. + fn kbuckets_update(&self, peer: &PeerId); + + /// Finds the nodes closest to a peer ID. + fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec; +} + +/// Configuration for a Kademlia server. +/// +/// Implements `ConnectionUpgrade`. On a successful upgrade, produces a `KademliaServerController` +/// and a `Future`. The controller lets you send queries to the remote and receive answers, while +/// the `Future` must be driven to completion in order for things to work. +#[derive(Debug, Clone)] +pub struct KademliaServerConfig { + raw_proto: KademliaProtocolConfig, + interface: I, +} + +impl KademliaServerConfig { + /// Builds a configuration object for an upcoming Kademlia server. + #[inline] + pub fn new(interface: I) -> Self { + KademliaServerConfig { + raw_proto: KademliaProtocolConfig, + interface: interface, + } + } +} + +impl ConnectionUpgrade for KademliaServerConfig +where + C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ + I: KadServerInterface + 'static, // TODO: 'static :-/ +{ + type Output = ( + KademliaServerController, + Box>, + ); + type Future = Box>; + type NamesIter = iter::Once<(Bytes, ())>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + ConnectionUpgrade::::protocol_names(&self.raw_proto) + } + + #[inline] + fn upgrade(self, incoming: C, id: (), endpoint: Endpoint, addr: &Multiaddr) -> Self::Future { + let peer_id = { + let mut iter = addr.iter(); + let protocol = iter.next(); + let after_proto = iter.next(); + match (protocol, after_proto) { + (Some(AddrComponent::P2P(key)), None) | (Some(AddrComponent::IPFS(key)), None) => { + match PeerId::from_bytes(key) { + Ok(id) => id, + Err(_) => { + let err = IoError::new( + IoErrorKind::InvalidData, + "invalid peer ID sent by remote identification", + ); + return Box::new(future::err(err)); + } + } + } + _ => { + let err = + IoError::new(IoErrorKind::InvalidData, "couldn't identify connected node"); + return Box::new(future::err(err)); + } + } + }; + + let interface = self.interface; + let future = self.raw_proto + .upgrade(incoming, id, endpoint, addr) + .map(move |connec| { + let (tx, rx) = mpsc::unbounded(); + let future = kademlia_handler(connec, peer_id, rx, interface); + let controller = KademliaServerController { inner: tx }; + (controller, future) + }); + + Box::new(future) as Box<_> + } +} + +/// Allows sending Kademlia requests and receiving responses. +#[derive(Debug, Clone)] +pub struct KademliaServerController { + // In order to send a request, we use this sender to send a tuple. The first element of the + // tuple is the message to send to the remote, and the second element is what is used to + // receive the response. If the query doesn't expect a response (eg. `PUT_VALUE`), then the + // one-shot sender will be dropped without being used. + inner: mpsc::UnboundedSender<(KadMsg, oneshot::Sender)>, +} + +impl KademliaServerController { + /// Sends a `FIND_NODE` query to the node and provides a future that will contain the response. + // TODO: future item could be `impl Iterator` instead + pub fn find_node( + &self, + searched_key: &PeerId, + ) -> Box, Error = IoError>> { + let message = protocol::KadMsg::FindNodeReq { + key: searched_key.clone().into_bytes(), + }; + + let (tx, rx) = oneshot::channel(); + + match self.inner.unbounded_send((message, tx)) { + Ok(()) => (), + Err(_) => { + let fut = future::err(IoError::new( + IoErrorKind::ConnectionAborted, + "connection to remote has aborted", + )); + return Box::new(fut) as Box<_>; + } + }; + + let future = rx.map_err(|_| { + IoError::new( + IoErrorKind::ConnectionAborted, + "connection to remote has aborted", + ) + }).and_then(|msg| match msg { + KadMsg::FindNodeRes { closer_peers, .. } => Ok(closer_peers), + _ => Err(IoError::new( + IoErrorKind::InvalidData, + "invalid response type received from the remote", + )), + }); + + Box::new(future) as Box<_> + } + + /// Sends a `PING` query to the node. Because of the way the protocol is designed, there is + /// no way to differentiate between a ping and a pong. Therefore this function doesn't return a + /// future, and the only way to be notified of the result is through the `kbuckets_update` + /// method in the `KadServerInterface` trait. + pub fn ping(&self) -> Result<(), IoError> { + // Dummy channel. + let (tx, _rx) = oneshot::channel(); + match self.inner.unbounded_send((protocol::KadMsg::Ping, tx)) { + Ok(()) => Ok(()), + Err(_) => Err(IoError::new( + IoErrorKind::ConnectionAborted, + "connection to remote has aborted", + )), + } + } +} + +// Handles a newly-opened Kademlia stream with a remote peer. +// +// Takes a `Stream` and `Sink` of Kademlia messages representing the connection to the client, +// plus the ID of the peer that we are handling, plus a `Receiver` that will receive messages to +// transmit to that connection, plus the interface. +// +// Returns a `Future` that must be resolved in order for progress to work. It will never yield any +// item (unless both `rx` and `kad_bistream` are closed) but will propagate any I/O of protocol +// error that could happen. If the `Receiver` closes, no error is generated. +fn kademlia_handler<'a, S, I>( + kad_bistream: S, + peer_id: PeerId, + rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender)>, + interface: I, +) -> Box + 'a> +where + S: Stream + Sink + 'a, + I: KadServerInterface + Clone + 'a, +{ + let (kad_sink, kad_stream) = kad_bistream + .sink_map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) + .map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) + .split(); + + // We combine `kad_stream` and `rx` into one so that the loop wakes up whenever either + // generates something. + let messages = rx.map(|(m, o)| (m, Some(o))) + .map_err(|_| unreachable!()) + .select(kad_stream.map(|m| (m, None))); + + // Loop forever. + let future = future::loop_fn( + (kad_sink, messages, VecDeque::new(), 0), + move |(kad_sink, messages, mut send_back_queue, mut expected_pongs)| { + let interface = interface.clone(); + let peer_id = peer_id.clone(); + + // The `send_back_queue` is a queue of `UnboundedSender`s in the correct order of + // expected answers. + // Whenever we send a message to the remote and this message expects a response, we + // push the sender to the end of `send_back_queue`. Whenever a remote sends us a + // response, we pop the first element of `send_back_queue`. + + // The value of `expected_pongs` is the number of PING requests that we sent and that + // haven't been answered by the remote yet. Because of the way the protocol is designed, + // there is no way to differentiate between a ping and a pong. Therefore whenever we + // send a ping request we suppose that the next ping we receive is an answer, even + // though that may not be the case in reality. + // Because of this behaviour, pings do not pop from the `send_back_queue`. + + messages + .into_future() + .map_err(|(err, _)| err) + .and_then(move |(message, rest)| { + if let Some((_, None)) = message { + // If we received a message from the remote (as opposed to a message from + // `rx`) then we update the k-buckets. + interface.kbuckets_update(&peer_id); + } + + match message { + None => { + // Both the connection stream and `rx` are empty, so we break the loop. + let future = future::ok(future::Loop::Break(())); + Box::new(future) as Box> + } + Some((message @ KadMsg::PutValue { .. }, Some(_))) => { + // A `PutValue` message has been received on `rx`. Contrary to other + // types of messages, this one doesn't expect any answer and therefore + // we ignore the sender. + let future = kad_sink.send(message).map(move |kad_sink| { + future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + )) + }); + Box::new(future) as Box<_> + } + Some((message @ KadMsg::Ping { .. }, Some(_))) => { + // A `Ping` message has been received on `rx`. + expected_pongs += 1; + let future = kad_sink.send(message).map(move |kad_sink| { + future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + )) + }); + Box::new(future) as Box<_> + } + Some((message, Some(send_back))) => { + // Any message other than `PutValue` or `Ping` has been received on + // `rx`. Send it to the remote. + let future = kad_sink.send(message).map(move |kad_sink| { + send_back_queue.push_back(send_back); + future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + )) + }); + Box::new(future) as Box<_> + } + Some((KadMsg::Ping, None)) => { + // Note: The way the protocol was designed, there is no way to + // differentiate between a ping and a pong. + if expected_pongs == 0 { + let message = KadMsg::Ping; + let future = kad_sink.send(message).map(move |kad_sink| { + future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + )) + }); + Box::new(future) as Box<_> + } else { + expected_pongs -= 1; + let future = future::ok({ + future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + )) + }); + Box::new(future) as Box<_> + } + } + Some((message @ KadMsg::FindNodeRes { .. }, None)) + | Some((message @ KadMsg::GetValueRes { .. }, None)) => { + // `FindNodeRes` or `GetValueRes` received on the socket. + // Send it back through `send_back_queue`. + if let Some(send_back) = send_back_queue.pop_front() { + let _ = send_back.send(message); + let future = future::ok(future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + ))); + return Box::new(future) as Box<_>; + } else { + let future = future::err(IoErrorKind::InvalidData.into()); + return Box::new(future) as Box<_>; + } + } + Some((KadMsg::FindNodeReq { key, .. }, None)) => { + // `FindNodeReq` received on the socket. + let message = handle_find_node_req(&interface, &key); + let future = kad_sink.send(message).map(move |kad_sink| { + future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + )) + }); + Box::new(future) as Box<_> + } + Some((KadMsg::GetValueReq { key, .. }, None)) => { + // `GetValueReq` received on the socket. + let message = handle_get_value_req(&interface, &key); + let future = kad_sink.send(message).map(move |kad_sink| { + future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + )) + }); + Box::new(future) as Box<_> + } + Some((KadMsg::PutValue { .. }, None)) => { + // `PutValue` received on the socket. + handle_put_value_req(&interface); + let future = future::ok({ + future::Loop::Continue(( + kad_sink, + rest, + send_back_queue, + expected_pongs, + )) + }); + Box::new(future) as Box<_> + } + } + }) + }, + ); + + Box::new(future) as Box> +} + +// Builds a `KadMsg` that handles a `FIND_NODE` request received from the remote. +fn handle_find_node_req(interface: &I, _requested_key: &[u8]) -> KadMsg +where + I: ?Sized + KadServerInterface, +{ + KadMsg::FindNodeRes { + closer_peers: vec![ + protocol::Peer { + node_id: interface.local_id().clone(), + multiaddrs: vec![], + connection_ty: protocol::ConnectionType::Connected, + }, + ], // TODO: fill the multiaddresses from the peer store + } +} + +// Builds a `KadMsg` that handles a `FIND_VALUE` request received from the remote. +fn handle_get_value_req(_interface: &I, _requested_key: &[u8]) -> KadMsg +where + I: ?Sized + KadServerInterface, +{ + unimplemented!() +} + +// Handles a `STORE` request received from the remote. +fn handle_put_value_req(_interface: &I) +where + I: ?Sized + KadServerInterface, +{ + unimplemented!() +} diff --git a/libp2p-kad/src/kbucket.rs b/libp2p-kad/src/kbucket.rs new file mode 100644 index 00000000..3478f897 --- /dev/null +++ b/libp2p-kad/src/kbucket.rs @@ -0,0 +1,474 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Key-value storage, with a refresh and a time-to-live system. +//! +//! A k-buckets table allows one to store a value identified by keys, ordered by their distance +//! to a reference key passed to the constructor. +//! +//! If the local ID has `N` bits, then the k-buckets table contains `N` *buckets* each containing +//! a constant number of entries. Storing a key in the k-buckets table adds it to the bucket +//! corresponding to its distance with the reference key. + +use arrayvec::ArrayVec; +use bigint::U512; +use libp2p_peerstore::PeerId; +use parking_lot::{Mutex, MutexGuard}; +use std::mem; +use std::slice::Iter as SliceIter; +use std::time::{Duration, Instant}; +use std::vec::IntoIter as VecIntoIter; + +/// Maximum number of nodes in a bucket. +pub const MAX_NODES_PER_BUCKET: usize = 20; + +/// Table of k-buckets with interior mutability. +#[derive(Debug)] +pub struct KBucketsTable { + my_id: Id, + tables: Vec>>, + // The timeout when pinging the first node after which we consider that it no longer responds. + ping_timeout: Duration, +} + +impl Clone for KBucketsTable +where + Id: Clone, + Val: Clone, +{ + #[inline] + fn clone(&self) -> Self { + KBucketsTable { + my_id: self.my_id.clone(), + tables: self.tables + .iter() + .map(|t| t.lock().clone()) + .map(Mutex::new) + .collect(), + ping_timeout: self.ping_timeout.clone(), + } + } +} + +#[derive(Debug, Clone)] +struct KBucket { + // Nodes are always ordered from oldest to newest. + // Note that we will very often move elements to the end of this. No benchmarking has been + // performed, but it is very likely that a `ArrayVec` is the most performant data structure. + nodes: ArrayVec<[Node; MAX_NODES_PER_BUCKET]>, + + // Node received when the bucket was full. Will be added to the list if the first node doesn't + // respond in time to our ping. The second element is the time when the pending node was added. + // If it is too much in the past, then we drop the first node and add the pending node to the + // end of the list. + pending_node: Option<(Node, Instant)>, + + // Last time this bucket was updated. + last_update: Instant, +} + +impl KBucket { + // Puts the kbucket into a coherent state. + // If a node is pending and the timeout has expired, removes the first element of `nodes` + // and pushes back the node in `pending_node`. + fn flush(&mut self, timeout: Duration) { + if let Some((_, instant)) = self.pending_node { + if instant.elapsed() >= timeout { + let (pending_node, _) = self.pending_node.take().unwrap(); + let _ = self.nodes.remove(0); + self.nodes.push(pending_node); + } + } + } +} + +#[derive(Debug, Clone)] +struct Node { + id: Id, + value: Val, +} + +/// Trait that must be implemented on types that can be used as an identifier in a k-bucket. +pub trait KBucketsPeerId: Eq + Clone { + /// Distance between two peer IDs. + type Distance: Ord; + + /// Computes the XOR of this value and another one. + fn distance_with(&self, other: &Self) -> Self::Distance; + + /// Returns then number of bits that are necessary to store the distance between peer IDs. + /// Used for pre-allocations. + /// + /// > **Note**: Returning 0 would lead to a panic. + fn num_bits() -> usize; + + /// Returns the number of leading zeroes of the distance between peer IDs. + fn leading_zeros(Self::Distance) -> u32; +} + +impl KBucketsPeerId for PeerId { + type Distance = U512; + + #[inline] + fn num_bits() -> usize { + 512 + } + + #[inline] + fn distance_with(&self, other: &Self) -> Self::Distance { + // Note that we don't compare the hash functions because there's no chance of collision + // of the same value hashed with two different hash functions. + let my_hash = U512::from(self.hash()); + let other_hash = U512::from(other.hash()); + my_hash ^ other_hash + } + + #[inline] + fn leading_zeros(distance: Self::Distance) -> u32 { + distance.leading_zeros() + } +} + +impl KBucketsTable +where + Id: KBucketsPeerId, +{ + /// Builds a new routing table. + pub fn new(my_id: Id, ping_timeout: Duration) -> Self { + KBucketsTable { + my_id: my_id, + tables: (0..Id::num_bits()) + .map(|_| KBucket { + nodes: ArrayVec::new(), + pending_node: None, + last_update: Instant::now(), + }) + .map(Mutex::new) + .collect(), + ping_timeout: ping_timeout, + } + } + + // Returns the id of the bucket that should contain the peer with the given ID. + // + // Returns `None` if out of range, which happens if `id` is the same as the local peer id. + #[inline] + fn bucket_num(&self, id: &Id) -> Option { + (Id::num_bits() - 1).checked_sub(Id::leading_zeros(self.my_id.distance_with(id)) as usize) + } + + /// Returns an iterator to all the buckets of this table. + /// + /// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes + /// first. + #[inline] + pub fn buckets(&self) -> BucketsIter { + BucketsIter(self.tables.iter(), self.ping_timeout) + } + + /// Returns the ID of the local node. + #[inline] + pub fn my_id(&self) -> &Id { + &self.my_id + } + + /// Finds the `num` nodes closest to `id`, ordered by distance. + pub fn find_closest(&self, id: &Id) -> VecIntoIter + where + Id: Clone, + { + // TODO: optimize + let mut out = Vec::new(); + for table in self.tables.iter() { + let mut table = table.lock(); + table.flush(self.ping_timeout); + for node in table.nodes.iter() { + out.push(node.id.clone()); + } + } + out.sort_by(|a, b| b.distance_with(id).cmp(&a.distance_with(id))); + out.into_iter() + } + + /// Marks the node as "most recent" in its bucket and modifies the value associated to it. + /// This function should be called whenever we receive a communication from a node. + /// + /// # Panic + /// + /// Panics if `id` is equal to the local node ID. + /// + pub fn update(&self, id: Id, value: Val) -> UpdateOutcome { + let table = match self.bucket_num(&id) { + Some(n) => &self.tables[n], + None => panic!("tried to update our own node in the kbuckets table"), + }; + + let mut table = table.lock(); + table.flush(self.ping_timeout); + + if let Some(pos) = table.nodes.iter().position(|n| n.id == id) { + // Node is already in the bucket. + let mut existing = table.nodes.remove(pos); + let old_val = mem::replace(&mut existing.value, value); + if pos == 0 { + // If it's the first node of the bucket that we update, then we drop the node that + // was waiting for a ping. + table.nodes.truncate(MAX_NODES_PER_BUCKET - 1); + table.pending_node = None; + } + table.nodes.push(existing); + table.last_update = Instant::now(); + UpdateOutcome::Refreshed(old_val) + } else if table.nodes.len() < MAX_NODES_PER_BUCKET { + // Node not yet in the bucket, but there's plenty of space. + table.nodes.push(Node { + id: id, + value: value, + }); + table.last_update = Instant::now(); + UpdateOutcome::Added + } else { + // Not enough space to put the node, but we can add it to the end as "pending". We + // then need to tell the caller that we want it to ping the node at the top of the + // list. + if table.pending_node.is_none() { + table.pending_node = Some(( + Node { + id: id, + value: value, + }, + Instant::now(), + )); + UpdateOutcome::NeedPing(table.nodes[0].id.clone()) + } else { + UpdateOutcome::Discarded + } + } + } +} + +/// Return value of the `update()` method. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[must_use] +pub enum UpdateOutcome { + /// The node has been added to the bucket. + Added, + /// The node was already in the bucket and has been refreshed. + Refreshed(Val), + /// The node wasn't added. Instead we need to ping the node passed as parameter. + NeedPing(Id), + /// The node wasn't added at all because a node was already pending. + Discarded, +} + +/// Iterator giving access to a bucket. +pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIter<'a, Mutex>>, Duration); + +impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> { + type Item = Bucket<'a, Id, Val>; + + #[inline] + fn next(&mut self) -> Option { + self.0.next().map(|bucket| { + let mut bucket = bucket.lock(); + bucket.flush(self.1); + Bucket(bucket) + }) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl<'a, Id: 'a, Val: 'a> ExactSizeIterator for BucketsIter<'a, Id, Val> {} + +/// Access to a bucket. +pub struct Bucket<'a, Id: 'a, Val: 'a>(MutexGuard<'a, KBucket>); + +impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> { + /// Returns the number of entries in that bucket. + /// + /// > **Note**: Keep in mind that this operation can be racy. If `update()` is called on the + /// > table while this function is running, the `update()` may or may not be taken + /// > into account. + #[inline] + pub fn num_entries(&self) -> usize { + self.0.nodes.len() + } + + /// Returns true if this bucket has a pending node. + #[inline] + pub fn has_pending(&self) -> bool { + self.0.pending_node.is_some() + } + + /// Returns the time when any of the values in this bucket was last updated. + /// + /// If the bucket is empty, this returns the time when the whole table was created. + #[inline] + pub fn last_update(&self) -> Instant { + self.0.last_update.clone() + } +} + +#[cfg(test)] +mod tests { + extern crate rand; + use self::rand::random; + use kbucket::{KBucketsTable, UpdateOutcome, MAX_NODES_PER_BUCKET}; + use libp2p_peerstore::PeerId; + use std::thread; + use std::time::Duration; + + #[test] + fn basic_closest() { + let my_id = { + let mut bytes = vec![random(); 34]; + bytes[0] = 18; + bytes[1] = 32; + PeerId::from_bytes(bytes).unwrap() + }; + + let other_id = { + let mut bytes = vec![random(); 34]; + bytes[0] = 18; + bytes[1] = 32; + PeerId::from_bytes(bytes).unwrap() + }; + + let table = KBucketsTable::new(my_id, Duration::from_secs(5)); + let _ = table.update(other_id.clone(), ()); + + let res = table.find_closest(&other_id).collect::>(); + assert_eq!(res.len(), 1); + assert_eq!(res[0], other_id); + } + + #[test] + #[should_panic(expected = "tried to update our own node in the kbuckets table")] + fn update_local_id_panic() { + let my_id = { + let mut bytes = vec![random(); 34]; + bytes[0] = 18; + bytes[1] = 32; + PeerId::from_bytes(bytes).unwrap() + }; + + let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5)); + let _ = table.update(my_id, ()); + } + + #[test] + fn update_time_last_refresh() { + let my_id = { + let mut bytes = vec![random(); 34]; + bytes[0] = 18; + bytes[1] = 32; + PeerId::from_bytes(bytes).unwrap() + }; + + // Generate some other IDs varying by just one bit. + let other_ids = (0..random::() % 20) + .map(|_| { + let bit_num = random::() % 256; + let mut id = my_id.as_bytes().to_vec().clone(); + id[33 - (bit_num / 8)] ^= 1 << (bit_num % 8); + (PeerId::from_bytes(id).unwrap(), bit_num) + }) + .collect::>(); + + let table = KBucketsTable::new(my_id, Duration::from_secs(5)); + let before_update = table.buckets().map(|b| b.last_update()).collect::>(); + + thread::sleep(Duration::from_secs(2)); + for &(ref id, _) in &other_ids { + let _ = table.update(id.clone(), ()); + } + + let after_update = table.buckets().map(|b| b.last_update()).collect::>(); + + for (offset, (bef, aft)) in before_update.iter().zip(after_update.iter()).enumerate() { + if other_ids.iter().any(|&(_, bucket)| bucket == offset) { + assert_ne!(bef, aft); + } else { + assert_eq!(bef, aft); + } + } + } + + #[test] + fn full_kbucket() { + let my_id = { + let mut bytes = vec![random(); 34]; + bytes[0] = 18; + bytes[1] = 32; + PeerId::from_bytes(bytes).unwrap() + }; + + assert!(MAX_NODES_PER_BUCKET <= 251); // Test doesn't work otherwise. + let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3) + .map(|n| { + let mut id = my_id.clone().into_bytes(); + id[2] ^= 0x80; // Flip the first bit so that we get in the most distant bucket. + id[33] = id[33].wrapping_add(n as u8); + PeerId::from_bytes(id).unwrap() + }) + .collect::>(); + + let first_node = fill_ids[0].clone(); + let second_node = fill_ids[1].clone(); + + let table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1)); + + for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() { + assert_eq!(table.update(id, ()), UpdateOutcome::Added); + assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1); + } + + assert_eq!( + table.buckets().nth(255).unwrap().num_entries(), + MAX_NODES_PER_BUCKET + ); + assert!(!table.buckets().nth(255).unwrap().has_pending()); + assert_eq!( + table.update(fill_ids.remove(0), ()), + UpdateOutcome::NeedPing(first_node) + ); + + assert_eq!( + table.buckets().nth(255).unwrap().num_entries(), + MAX_NODES_PER_BUCKET + ); + assert!(table.buckets().nth(255).unwrap().has_pending()); + assert_eq!( + table.update(fill_ids.remove(0), ()), + UpdateOutcome::Discarded + ); + + thread::sleep(Duration::from_secs(2)); + assert!(!table.buckets().nth(255).unwrap().has_pending()); + assert_eq!( + table.update(fill_ids.remove(0), ()), + UpdateOutcome::NeedPing(second_node) + ); + } +} diff --git a/libp2p-kad/src/lib.rs b/libp2p-kad/src/lib.rs new file mode 100644 index 00000000..60594a2d --- /dev/null +++ b/libp2p-kad/src/lib.rs @@ -0,0 +1,92 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Kademlia protocol. Allows peer discovery, records store and records fetch. +//! +//! # Usage +//! +//! Usage is done in the following steps: +//! +//! - Build a `KademliaConfig` that contains the way you want the Kademlia protocol to behave. +//! +//! - Build a `KademliaControllerPrototype` from that configuration object. +//! +//! - Build a `KademliaUpgrade` from that prototype. Then create a swarm (from the *swarm* crate) +//! and pass the `KademliaUpgrade` you built as part of the list of protocols. +//! +//! - Then turn the controller prototype into an actual `KademliaController` by passing to it the +//! swarm controller you got. +//! +//! - You can now perform operations using that controller. +//! + +// TODO: we allow dead_code for now because this library contains a lot of unused code that will +// be useful later for record store +#![allow(dead_code)] + +// # Crate organization +// +// The crate contains three levels of abstractions over the Kademlia protocol. +// +// - The first level of abstraction is in `protocol`. The API of this module lets you turn a raw +// bytes stream (`AsyncRead + AsyncWrite`) into a `Sink + Stream` of raw but strongly-typed +// Kademlia messages. +// +// - The second level of abstraction is in `kad_server`. Its API lets you upgrade a connection and +// obtain a future (that must be driven to completion), plus a controller. Processing the future +// will automatically respond to Kad requests received by the remote. The controller lets you +// send your own requests to this remote and obtain strongly-typed responses. +// +// - The third level of abstraction is in `high_level`. This module also provides a +// `ConnectionUpgrade`, but all the upgraded connections share informations through a struct in +// an `Arc`. The user has a single clonable controller that operates on all the upgraded +// connections. This controller lets you perform peer discovery and record load operations over +// the whole network. +// + +extern crate arrayvec; +extern crate base58; +extern crate bigint; +extern crate bytes; +extern crate datastore; +extern crate fnv; +extern crate futures; +extern crate libp2p_identify; +extern crate libp2p_peerstore; +extern crate libp2p_ping; +extern crate libp2p_swarm; +extern crate multiaddr; +extern crate parking_lot; +extern crate protobuf; +extern crate rand; +extern crate smallvec; +extern crate tokio_io; +extern crate tokio_timer; +extern crate varint; + +pub use self::high_level::{KademliaConfig, KademliaController, KademliaControllerPrototype}; +pub use self::high_level::KademliaUpgrade; + +mod high_level; +mod kad_server; +mod kbucket; +mod protobuf_structs; +mod protocol; +mod query; diff --git a/libp2p-kad/src/protobuf_structs/dht.rs b/libp2p-kad/src/protobuf_structs/dht.rs new file mode 100644 index 00000000..187c0f5d --- /dev/null +++ b/libp2p-kad/src/protobuf_structs/dht.rs @@ -0,0 +1,990 @@ +// This file is generated. Do not edit +// @generated + +// https://github.com/Manishearth/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy)] + +#![cfg_attr(rustfmt, rustfmt_skip)] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unsafe_code)] +#![allow(unused_imports)] +#![allow(unused_results)] + +use protobuf::Message as Message_imported_for_functions; +use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions; + +#[derive(PartialEq,Clone,Default)] +pub struct Message { + // message fields + field_type: ::std::option::Option, + clusterLevelRaw: ::std::option::Option, + key: ::protobuf::SingularField<::std::vec::Vec>, + record: ::protobuf::SingularPtrField, + closerPeers: ::protobuf::RepeatedField, + providerPeers: ::protobuf::RepeatedField, + // special fields + unknown_fields: ::protobuf::UnknownFields, + cached_size: ::protobuf::CachedSize, +} + +// see codegen.rs for the explanation why impl Sync explicitly +unsafe impl ::std::marker::Sync for Message {} + +impl Message { + pub fn new() -> Message { + ::std::default::Default::default() + } + + pub fn default_instance() -> &'static Message { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const Message, + }; + unsafe { + instance.get(Message::new) + } + } + + // optional .dht.pb.Message.MessageType type = 1; + + pub fn clear_field_type(&mut self) { + self.field_type = ::std::option::Option::None; + } + + pub fn has_field_type(&self) -> bool { + self.field_type.is_some() + } + + // Param is passed by value, moved + pub fn set_field_type(&mut self, v: Message_MessageType) { + self.field_type = ::std::option::Option::Some(v); + } + + pub fn get_field_type(&self) -> Message_MessageType { + self.field_type.unwrap_or(Message_MessageType::PUT_VALUE) + } + + fn get_field_type_for_reflect(&self) -> &::std::option::Option { + &self.field_type + } + + fn mut_field_type_for_reflect(&mut self) -> &mut ::std::option::Option { + &mut self.field_type + } + + // optional int32 clusterLevelRaw = 10; + + pub fn clear_clusterLevelRaw(&mut self) { + self.clusterLevelRaw = ::std::option::Option::None; + } + + pub fn has_clusterLevelRaw(&self) -> bool { + self.clusterLevelRaw.is_some() + } + + // Param is passed by value, moved + pub fn set_clusterLevelRaw(&mut self, v: i32) { + self.clusterLevelRaw = ::std::option::Option::Some(v); + } + + pub fn get_clusterLevelRaw(&self) -> i32 { + self.clusterLevelRaw.unwrap_or(0) + } + + fn get_clusterLevelRaw_for_reflect(&self) -> &::std::option::Option { + &self.clusterLevelRaw + } + + fn mut_clusterLevelRaw_for_reflect(&mut self) -> &mut ::std::option::Option { + &mut self.clusterLevelRaw + } + + // optional bytes key = 2; + + pub fn clear_key(&mut self) { + self.key.clear(); + } + + pub fn has_key(&self) -> bool { + self.key.is_some() + } + + // Param is passed by value, moved + pub fn set_key(&mut self, v: ::std::vec::Vec) { + self.key = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_key(&mut self) -> &mut ::std::vec::Vec { + if self.key.is_none() { + self.key.set_default(); + } + self.key.as_mut().unwrap() + } + + // Take field + pub fn take_key(&mut self) -> ::std::vec::Vec { + self.key.take().unwrap_or_else(|| ::std::vec::Vec::new()) + } + + pub fn get_key(&self) -> &[u8] { + match self.key.as_ref() { + Some(v) => &v, + None => &[], + } + } + + fn get_key_for_reflect(&self) -> &::protobuf::SingularField<::std::vec::Vec> { + &self.key + } + + fn mut_key_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::vec::Vec> { + &mut self.key + } + + // optional .record.pb.Record record = 3; + + pub fn clear_record(&mut self) { + self.record.clear(); + } + + pub fn has_record(&self) -> bool { + self.record.is_some() + } + + // Param is passed by value, moved + pub fn set_record(&mut self, v: super::record::Record) { + self.record = ::protobuf::SingularPtrField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_record(&mut self) -> &mut super::record::Record { + if self.record.is_none() { + self.record.set_default(); + } + self.record.as_mut().unwrap() + } + + // Take field + pub fn take_record(&mut self) -> super::record::Record { + self.record.take().unwrap_or_else(|| super::record::Record::new()) + } + + pub fn get_record(&self) -> &super::record::Record { + self.record.as_ref().unwrap_or_else(|| super::record::Record::default_instance()) + } + + fn get_record_for_reflect(&self) -> &::protobuf::SingularPtrField { + &self.record + } + + fn mut_record_for_reflect(&mut self) -> &mut ::protobuf::SingularPtrField { + &mut self.record + } + + // repeated .dht.pb.Message.Peer closerPeers = 8; + + pub fn clear_closerPeers(&mut self) { + self.closerPeers.clear(); + } + + // Param is passed by value, moved + pub fn set_closerPeers(&mut self, v: ::protobuf::RepeatedField) { + self.closerPeers = v; + } + + // Mutable pointer to the field. + pub fn mut_closerPeers(&mut self) -> &mut ::protobuf::RepeatedField { + &mut self.closerPeers + } + + // Take field + pub fn take_closerPeers(&mut self) -> ::protobuf::RepeatedField { + ::std::mem::replace(&mut self.closerPeers, ::protobuf::RepeatedField::new()) + } + + pub fn get_closerPeers(&self) -> &[Message_Peer] { + &self.closerPeers + } + + fn get_closerPeers_for_reflect(&self) -> &::protobuf::RepeatedField { + &self.closerPeers + } + + fn mut_closerPeers_for_reflect(&mut self) -> &mut ::protobuf::RepeatedField { + &mut self.closerPeers + } + + // repeated .dht.pb.Message.Peer providerPeers = 9; + + pub fn clear_providerPeers(&mut self) { + self.providerPeers.clear(); + } + + // Param is passed by value, moved + pub fn set_providerPeers(&mut self, v: ::protobuf::RepeatedField) { + self.providerPeers = v; + } + + // Mutable pointer to the field. + pub fn mut_providerPeers(&mut self) -> &mut ::protobuf::RepeatedField { + &mut self.providerPeers + } + + // Take field + pub fn take_providerPeers(&mut self) -> ::protobuf::RepeatedField { + ::std::mem::replace(&mut self.providerPeers, ::protobuf::RepeatedField::new()) + } + + pub fn get_providerPeers(&self) -> &[Message_Peer] { + &self.providerPeers + } + + fn get_providerPeers_for_reflect(&self) -> &::protobuf::RepeatedField { + &self.providerPeers + } + + fn mut_providerPeers_for_reflect(&mut self) -> &mut ::protobuf::RepeatedField { + &mut self.providerPeers + } +} + +impl ::protobuf::Message for Message { + fn is_initialized(&self) -> bool { + for v in &self.record { + if !v.is_initialized() { + return false; + } + }; + for v in &self.closerPeers { + if !v.is_initialized() { + return false; + } + }; + for v in &self.providerPeers { + if !v.is_initialized() { + return false; + } + }; + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_enum()?; + self.field_type = ::std::option::Option::Some(tmp); + }, + 10 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_int32()?; + self.clusterLevelRaw = ::std::option::Option::Some(tmp); + }, + 2 => { + ::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.key)?; + }, + 3 => { + ::protobuf::rt::read_singular_message_into(wire_type, is, &mut self.record)?; + }, + 8 => { + ::protobuf::rt::read_repeated_message_into(wire_type, is, &mut self.closerPeers)?; + }, + 9 => { + ::protobuf::rt::read_repeated_message_into(wire_type, is, &mut self.providerPeers)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if let Some(v) = self.field_type { + my_size += ::protobuf::rt::enum_size(1, v); + } + if let Some(v) = self.clusterLevelRaw { + my_size += ::protobuf::rt::value_size(10, v, ::protobuf::wire_format::WireTypeVarint); + } + if let Some(ref v) = self.key.as_ref() { + my_size += ::protobuf::rt::bytes_size(2, &v); + } + if let Some(ref v) = self.record.as_ref() { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + } + for value in &self.closerPeers { + let len = value.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + }; + for value in &self.providerPeers { + let len = value.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + }; + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if let Some(v) = self.field_type { + os.write_enum(1, v.value())?; + } + if let Some(v) = self.clusterLevelRaw { + os.write_int32(10, v)?; + } + if let Some(ref v) = self.key.as_ref() { + os.write_bytes(2, &v)?; + } + if let Some(ref v) = self.record.as_ref() { + os.write_tag(3, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + } + for v in &self.closerPeers { + os.write_tag(8, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + }; + for v in &self.providerPeers { + os.write_tag(9, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + }; + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &::std::any::Any { + self as &::std::any::Any + } + fn as_any_mut(&mut self) -> &mut ::std::any::Any { + self as &mut ::std::any::Any + } + fn into_any(self: Box) -> ::std::boxed::Box<::std::any::Any> { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + ::protobuf::MessageStatic::descriptor_static(None::) + } +} + +impl ::protobuf::MessageStatic for Message { + fn new() -> Message { + Message::new() + } + + fn descriptor_static(_: ::std::option::Option) -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + "type", + Message::get_field_type_for_reflect, + Message::mut_field_type_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeInt32>( + "clusterLevelRaw", + Message::get_clusterLevelRaw_for_reflect, + Message::mut_clusterLevelRaw_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "key", + Message::get_key_for_reflect, + Message::mut_key_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_ptr_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "record", + Message::get_record_for_reflect, + Message::mut_record_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "closerPeers", + Message::get_closerPeers_for_reflect, + Message::mut_closerPeers_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "providerPeers", + Message::get_providerPeers_for_reflect, + Message::mut_providerPeers_for_reflect, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "Message", + fields, + file_descriptor_proto() + ) + }) + } + } +} + +impl ::protobuf::Clear for Message { + fn clear(&mut self) { + self.clear_field_type(); + self.clear_clusterLevelRaw(); + self.clear_key(); + self.clear_record(); + self.clear_closerPeers(); + self.clear_providerPeers(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for Message { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for Message { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + +#[derive(PartialEq,Clone,Default)] +pub struct Message_Peer { + // message fields + id: ::protobuf::SingularField<::std::vec::Vec>, + addrs: ::protobuf::RepeatedField<::std::vec::Vec>, + connection: ::std::option::Option, + // special fields + unknown_fields: ::protobuf::UnknownFields, + cached_size: ::protobuf::CachedSize, +} + +// see codegen.rs for the explanation why impl Sync explicitly +unsafe impl ::std::marker::Sync for Message_Peer {} + +impl Message_Peer { + pub fn new() -> Message_Peer { + ::std::default::Default::default() + } + + pub fn default_instance() -> &'static Message_Peer { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const Message_Peer, + }; + unsafe { + instance.get(Message_Peer::new) + } + } + + // optional bytes id = 1; + + pub fn clear_id(&mut self) { + self.id.clear(); + } + + pub fn has_id(&self) -> bool { + self.id.is_some() + } + + // Param is passed by value, moved + pub fn set_id(&mut self, v: ::std::vec::Vec) { + self.id = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_id(&mut self) -> &mut ::std::vec::Vec { + if self.id.is_none() { + self.id.set_default(); + } + self.id.as_mut().unwrap() + } + + // Take field + pub fn take_id(&mut self) -> ::std::vec::Vec { + self.id.take().unwrap_or_else(|| ::std::vec::Vec::new()) + } + + pub fn get_id(&self) -> &[u8] { + match self.id.as_ref() { + Some(v) => &v, + None => &[], + } + } + + fn get_id_for_reflect(&self) -> &::protobuf::SingularField<::std::vec::Vec> { + &self.id + } + + fn mut_id_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::vec::Vec> { + &mut self.id + } + + // repeated bytes addrs = 2; + + pub fn clear_addrs(&mut self) { + self.addrs.clear(); + } + + // Param is passed by value, moved + pub fn set_addrs(&mut self, v: ::protobuf::RepeatedField<::std::vec::Vec>) { + self.addrs = v; + } + + // Mutable pointer to the field. + pub fn mut_addrs(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec> { + &mut self.addrs + } + + // Take field + pub fn take_addrs(&mut self) -> ::protobuf::RepeatedField<::std::vec::Vec> { + ::std::mem::replace(&mut self.addrs, ::protobuf::RepeatedField::new()) + } + + pub fn get_addrs(&self) -> &[::std::vec::Vec] { + &self.addrs + } + + fn get_addrs_for_reflect(&self) -> &::protobuf::RepeatedField<::std::vec::Vec> { + &self.addrs + } + + fn mut_addrs_for_reflect(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec> { + &mut self.addrs + } + + // optional .dht.pb.Message.ConnectionType connection = 3; + + pub fn clear_connection(&mut self) { + self.connection = ::std::option::Option::None; + } + + pub fn has_connection(&self) -> bool { + self.connection.is_some() + } + + // Param is passed by value, moved + pub fn set_connection(&mut self, v: Message_ConnectionType) { + self.connection = ::std::option::Option::Some(v); + } + + pub fn get_connection(&self) -> Message_ConnectionType { + self.connection.unwrap_or(Message_ConnectionType::NOT_CONNECTED) + } + + fn get_connection_for_reflect(&self) -> &::std::option::Option { + &self.connection + } + + fn mut_connection_for_reflect(&mut self) -> &mut ::std::option::Option { + &mut self.connection + } +} + +impl ::protobuf::Message for Message_Peer { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.id)?; + }, + 2 => { + ::protobuf::rt::read_repeated_bytes_into(wire_type, is, &mut self.addrs)?; + }, + 3 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_enum()?; + self.connection = ::std::option::Option::Some(tmp); + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if let Some(ref v) = self.id.as_ref() { + my_size += ::protobuf::rt::bytes_size(1, &v); + } + for value in &self.addrs { + my_size += ::protobuf::rt::bytes_size(2, &value); + }; + if let Some(v) = self.connection { + my_size += ::protobuf::rt::enum_size(3, v); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if let Some(ref v) = self.id.as_ref() { + os.write_bytes(1, &v)?; + } + for v in &self.addrs { + os.write_bytes(2, &v)?; + }; + if let Some(v) = self.connection { + os.write_enum(3, v.value())?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &::std::any::Any { + self as &::std::any::Any + } + fn as_any_mut(&mut self) -> &mut ::std::any::Any { + self as &mut ::std::any::Any + } + fn into_any(self: Box) -> ::std::boxed::Box<::std::any::Any> { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + ::protobuf::MessageStatic::descriptor_static(None::) + } +} + +impl ::protobuf::MessageStatic for Message_Peer { + fn new() -> Message_Peer { + Message_Peer::new() + } + + fn descriptor_static(_: ::std::option::Option) -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "id", + Message_Peer::get_id_for_reflect, + Message_Peer::mut_id_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "addrs", + Message_Peer::get_addrs_for_reflect, + Message_Peer::mut_addrs_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_option_accessor::<_, ::protobuf::types::ProtobufTypeEnum>( + "connection", + Message_Peer::get_connection_for_reflect, + Message_Peer::mut_connection_for_reflect, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "Message_Peer", + fields, + file_descriptor_proto() + ) + }) + } + } +} + +impl ::protobuf::Clear for Message_Peer { + fn clear(&mut self) { + self.clear_id(); + self.clear_addrs(); + self.clear_connection(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for Message_Peer { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for Message_Peer { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + +#[derive(Clone,PartialEq,Eq,Debug,Hash)] +pub enum Message_MessageType { + PUT_VALUE = 0, + GET_VALUE = 1, + ADD_PROVIDER = 2, + GET_PROVIDERS = 3, + FIND_NODE = 4, + PING = 5, +} + +impl ::protobuf::ProtobufEnum for Message_MessageType { + fn value(&self) -> i32 { + *self as i32 + } + + fn from_i32(value: i32) -> ::std::option::Option { + match value { + 0 => ::std::option::Option::Some(Message_MessageType::PUT_VALUE), + 1 => ::std::option::Option::Some(Message_MessageType::GET_VALUE), + 2 => ::std::option::Option::Some(Message_MessageType::ADD_PROVIDER), + 3 => ::std::option::Option::Some(Message_MessageType::GET_PROVIDERS), + 4 => ::std::option::Option::Some(Message_MessageType::FIND_NODE), + 5 => ::std::option::Option::Some(Message_MessageType::PING), + _ => ::std::option::Option::None + } + } + + fn values() -> &'static [Self] { + static values: &'static [Message_MessageType] = &[ + Message_MessageType::PUT_VALUE, + Message_MessageType::GET_VALUE, + Message_MessageType::ADD_PROVIDER, + Message_MessageType::GET_PROVIDERS, + Message_MessageType::FIND_NODE, + Message_MessageType::PING, + ]; + values + } + + fn enum_descriptor_static(_: ::std::option::Option) -> &'static ::protobuf::reflect::EnumDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::EnumDescriptor, + }; + unsafe { + descriptor.get(|| { + ::protobuf::reflect::EnumDescriptor::new("Message_MessageType", file_descriptor_proto()) + }) + } + } +} + +impl ::std::marker::Copy for Message_MessageType { +} + +impl ::protobuf::reflect::ProtobufValue for Message_MessageType { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor()) + } +} + +#[derive(Clone,PartialEq,Eq,Debug,Hash)] +pub enum Message_ConnectionType { + NOT_CONNECTED = 0, + CONNECTED = 1, + CAN_CONNECT = 2, + CANNOT_CONNECT = 3, +} + +impl ::protobuf::ProtobufEnum for Message_ConnectionType { + fn value(&self) -> i32 { + *self as i32 + } + + fn from_i32(value: i32) -> ::std::option::Option { + match value { + 0 => ::std::option::Option::Some(Message_ConnectionType::NOT_CONNECTED), + 1 => ::std::option::Option::Some(Message_ConnectionType::CONNECTED), + 2 => ::std::option::Option::Some(Message_ConnectionType::CAN_CONNECT), + 3 => ::std::option::Option::Some(Message_ConnectionType::CANNOT_CONNECT), + _ => ::std::option::Option::None + } + } + + fn values() -> &'static [Self] { + static values: &'static [Message_ConnectionType] = &[ + Message_ConnectionType::NOT_CONNECTED, + Message_ConnectionType::CONNECTED, + Message_ConnectionType::CAN_CONNECT, + Message_ConnectionType::CANNOT_CONNECT, + ]; + values + } + + fn enum_descriptor_static(_: ::std::option::Option) -> &'static ::protobuf::reflect::EnumDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::EnumDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::EnumDescriptor, + }; + unsafe { + descriptor.get(|| { + ::protobuf::reflect::EnumDescriptor::new("Message_ConnectionType", file_descriptor_proto()) + }) + } + } +} + +impl ::std::marker::Copy for Message_ConnectionType { +} + +impl ::protobuf::reflect::ProtobufValue for Message_ConnectionType { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Enum(self.descriptor()) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\tdht.proto\x12\x06dht.pb\x1a\x0crecord.proto\"\xc7\x04\n\x07Message\ + \x12/\n\x04type\x18\x01\x20\x01(\x0e2\x1b.dht.pb.Message.MessageTypeR\ + \x04type\x12(\n\x0fclusterLevelRaw\x18\n\x20\x01(\x05R\x0fclusterLevelRa\ + w\x12\x10\n\x03key\x18\x02\x20\x01(\x0cR\x03key\x12)\n\x06record\x18\x03\ + \x20\x01(\x0b2\x11.record.pb.RecordR\x06record\x126\n\x0bcloserPeers\x18\ + \x08\x20\x03(\x0b2\x14.dht.pb.Message.PeerR\x0bcloserPeers\x12:\n\rprovi\ + derPeers\x18\t\x20\x03(\x0b2\x14.dht.pb.Message.PeerR\rproviderPeers\x1a\ + l\n\x04Peer\x12\x0e\n\x02id\x18\x01\x20\x01(\x0cR\x02id\x12\x14\n\x05add\ + rs\x18\x02\x20\x03(\x0cR\x05addrs\x12>\n\nconnection\x18\x03\x20\x01(\ + \x0e2\x1e.dht.pb.Message.ConnectionTypeR\nconnection\"i\n\x0bMessageType\ + \x12\r\n\tPUT_VALUE\x10\0\x12\r\n\tGET_VALUE\x10\x01\x12\x10\n\x0cADD_PR\ + OVIDER\x10\x02\x12\x11\n\rGET_PROVIDERS\x10\x03\x12\r\n\tFIND_NODE\x10\ + \x04\x12\x08\n\x04PING\x10\x05\"W\n\x0eConnectionType\x12\x11\n\rNOT_CON\ + NECTED\x10\0\x12\r\n\tCONNECTED\x10\x01\x12\x0f\n\x0bCAN_CONNECT\x10\x02\ + \x12\x12\n\x0eCANNOT_CONNECT\x10\x03J\xc9\x10\n\x06\x12\x04\0\0>\x01\n\ + \x08\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\x08\x0e\n\t\n\ + \x02\x03\0\x12\x03\x03\x07\x15\n\n\n\x02\x04\0\x12\x04\x05\0>\x01\n\n\n\ + \x03\x04\0\x01\x12\x03\x05\x08\x0f\n\x0c\n\x04\x04\0\x04\0\x12\x04\x06\ + \x08\r\t\n\x0c\n\x05\x04\0\x04\0\x01\x12\x03\x06\r\x18\n\r\n\x06\x04\0\ + \x04\0\x02\0\x12\x03\x07\x10\x1e\n\x0e\n\x07\x04\0\x04\0\x02\0\x01\x12\ + \x03\x07\x10\x19\n\x0e\n\x07\x04\0\x04\0\x02\0\x02\x12\x03\x07\x1c\x1d\n\ + \r\n\x06\x04\0\x04\0\x02\x01\x12\x03\x08\x10\x1e\n\x0e\n\x07\x04\0\x04\0\ + \x02\x01\x01\x12\x03\x08\x10\x19\n\x0e\n\x07\x04\0\x04\0\x02\x01\x02\x12\ + \x03\x08\x1c\x1d\n\r\n\x06\x04\0\x04\0\x02\x02\x12\x03\t\x10!\n\x0e\n\ + \x07\x04\0\x04\0\x02\x02\x01\x12\x03\t\x10\x1c\n\x0e\n\x07\x04\0\x04\0\ + \x02\x02\x02\x12\x03\t\x1f\x20\n\r\n\x06\x04\0\x04\0\x02\x03\x12\x03\n\ + \x10\"\n\x0e\n\x07\x04\0\x04\0\x02\x03\x01\x12\x03\n\x10\x1d\n\x0e\n\x07\ + \x04\0\x04\0\x02\x03\x02\x12\x03\n\x20!\n\r\n\x06\x04\0\x04\0\x02\x04\ + \x12\x03\x0b\x10\x1e\n\x0e\n\x07\x04\0\x04\0\x02\x04\x01\x12\x03\x0b\x10\ + \x19\n\x0e\n\x07\x04\0\x04\0\x02\x04\x02\x12\x03\x0b\x1c\x1d\n\r\n\x06\ + \x04\0\x04\0\x02\x05\x12\x03\x0c\x10\x19\n\x0e\n\x07\x04\0\x04\0\x02\x05\ + \x01\x12\x03\x0c\x10\x14\n\x0e\n\x07\x04\0\x04\0\x02\x05\x02\x12\x03\x0c\ + \x17\x18\n\x0c\n\x04\x04\0\x04\x01\x12\x04\x0f\x08\x1c\t\n\x0c\n\x05\x04\ + \0\x04\x01\x01\x12\x03\x0f\r\x1b\n^\n\x06\x04\0\x04\x01\x02\0\x12\x03\ + \x11\x10\"\x1aO\x20sender\x20does\x20not\x20have\x20a\x20connection\x20t\ + o\x20peer,\x20and\x20no\x20extra\x20information\x20(default)\n\n\x0e\n\ + \x07\x04\0\x04\x01\x02\0\x01\x12\x03\x11\x10\x1d\n\x0e\n\x07\x04\0\x04\ + \x01\x02\0\x02\x12\x03\x11\x20!\n5\n\x06\x04\0\x04\x01\x02\x01\x12\x03\ + \x14\x10\x1e\x1a&\x20sender\x20has\x20a\x20live\x20connection\x20to\x20p\ + eer\n\n\x0e\n\x07\x04\0\x04\x01\x02\x01\x01\x12\x03\x14\x10\x19\n\x0e\n\ + \x07\x04\0\x04\x01\x02\x01\x02\x12\x03\x14\x1c\x1d\n2\n\x06\x04\0\x04\ + \x01\x02\x02\x12\x03\x17\x10\x20\x1a#\x20sender\x20recently\x20connected\ + \x20to\x20peer\n\n\x0e\n\x07\x04\0\x04\x01\x02\x02\x01\x12\x03\x17\x10\ + \x1b\n\x0e\n\x07\x04\0\x04\x01\x02\x02\x02\x12\x03\x17\x1e\x1f\n\xa7\x01\ + \n\x06\x04\0\x04\x01\x02\x03\x12\x03\x1b\x10#\x1a\x97\x01\x20sender\x20r\ + ecently\x20tried\x20to\x20connect\x20to\x20peer\x20repeatedly\x20but\x20\ + failed\x20to\x20connect\n\x20(\"try\"\x20here\x20is\x20loose,\x20but\x20\ + this\x20should\x20signal\x20\"made\x20strong\x20effort,\x20failed\")\n\n\ + \x0e\n\x07\x04\0\x04\x01\x02\x03\x01\x12\x03\x1b\x10\x1e\n\x0e\n\x07\x04\ + \0\x04\x01\x02\x03\x02\x12\x03\x1b!\"\n\x0c\n\x04\x04\0\x03\0\x12\x04\ + \x1e\x08'\t\n\x0c\n\x05\x04\0\x03\0\x01\x12\x03\x1e\x10\x14\n$\n\x06\x04\ + \0\x03\0\x02\0\x12\x03\x20\x10&\x1a\x15\x20ID\x20of\x20a\x20given\x20pee\ + r.\n\n\x0e\n\x07\x04\0\x03\0\x02\0\x04\x12\x03\x20\x10\x18\n\x0e\n\x07\ + \x04\0\x03\0\x02\0\x05\x12\x03\x20\x19\x1e\n\x0e\n\x07\x04\0\x03\0\x02\0\ + \x01\x12\x03\x20\x1f!\n\x0e\n\x07\x04\0\x03\0\x02\0\x03\x12\x03\x20$%\n,\ + \n\x06\x04\0\x03\0\x02\x01\x12\x03#\x10)\x1a\x1d\x20multiaddrs\x20for\ + \x20a\x20given\x20peer\n\n\x0e\n\x07\x04\0\x03\0\x02\x01\x04\x12\x03#\ + \x10\x18\n\x0e\n\x07\x04\0\x03\0\x02\x01\x05\x12\x03#\x19\x1e\n\x0e\n\ + \x07\x04\0\x03\0\x02\x01\x01\x12\x03#\x1f$\n\x0e\n\x07\x04\0\x03\0\x02\ + \x01\x03\x12\x03#'(\nP\n\x06\x04\0\x03\0\x02\x02\x12\x03&\x107\x1aA\x20u\ + sed\x20to\x20signal\x20the\x20sender's\x20connection\x20capabilities\x20\ + to\x20the\x20peer\n\n\x0e\n\x07\x04\0\x03\0\x02\x02\x04\x12\x03&\x10\x18\ + \n\x0e\n\x07\x04\0\x03\0\x02\x02\x06\x12\x03&\x19'\n\x0e\n\x07\x04\0\x03\ + \0\x02\x02\x01\x12\x03&(2\n\x0e\n\x07\x04\0\x03\0\x02\x02\x03\x12\x03&56\ + \n2\n\x04\x04\0\x02\0\x12\x03*\x08&\x1a%\x20defines\x20what\x20type\x20o\ + f\x20message\x20it\x20is.\n\n\x0c\n\x05\x04\0\x02\0\x04\x12\x03*\x08\x10\ + \n\x0c\n\x05\x04\0\x02\0\x06\x12\x03*\x11\x1c\n\x0c\n\x05\x04\0\x02\0\ + \x01\x12\x03*\x1d!\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03*$%\nO\n\x04\x04\0\ + \x02\x01\x12\x03-\x08,\x1aB\x20defines\x20what\x20coral\x20cluster\x20le\ + vel\x20this\x20query/response\x20belongs\x20to.\n\n\x0c\n\x05\x04\0\x02\ + \x01\x04\x12\x03-\x08\x10\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03-\x11\x16\ + \n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03-\x17&\n\x0c\n\x05\x04\0\x02\x01\ + \x03\x12\x03-)+\nw\n\x04\x04\0\x02\x02\x12\x031\x08\x1f\x1aj\x20Used\x20\ + to\x20specify\x20the\x20key\x20associated\x20with\x20this\x20message.\n\ + \x20PUT_VALUE,\x20GET_VALUE,\x20ADD_PROVIDER,\x20GET_PROVIDERS\n\n\x0c\n\ + \x05\x04\0\x02\x02\x04\x12\x031\x08\x10\n\x0c\n\x05\x04\0\x02\x02\x05\ + \x12\x031\x11\x16\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x031\x17\x1a\n\x0c\n\ + \x05\x04\0\x02\x02\x03\x12\x031\x1d\x1e\n;\n\x04\x04\0\x02\x03\x12\x035\ + \x08-\x1a.\x20Used\x20to\x20return\x20a\x20value\n\x20PUT_VALUE,\x20GET_\ + VALUE\n\n\x0c\n\x05\x04\0\x02\x03\x04\x12\x035\x08\x10\n\x0c\n\x05\x04\0\ + \x02\x03\x06\x12\x035\x11!\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x035\"(\n\ + \x0c\n\x05\x04\0\x02\x03\x03\x12\x035+,\nc\n\x04\x04\0\x02\x04\x12\x039\ + \x08&\x1aV\x20Used\x20to\x20return\x20peers\x20closer\x20to\x20a\x20key\ + \x20in\x20a\x20query\n\x20GET_VALUE,\x20GET_PROVIDERS,\x20FIND_NODE\n\n\ + \x0c\n\x05\x04\0\x02\x04\x04\x12\x039\x08\x10\n\x0c\n\x05\x04\0\x02\x04\ + \x06\x12\x039\x11\x15\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x039\x16!\n\x0c\ + \n\x05\x04\0\x02\x04\x03\x12\x039$%\nO\n\x04\x04\0\x02\x05\x12\x03=\x08(\ + \x1aB\x20Used\x20to\x20return\x20Providers\n\x20GET_VALUE,\x20ADD_PROVID\ + ER,\x20GET_PROVIDERS\n\n\x0c\n\x05\x04\0\x02\x05\x04\x12\x03=\x08\x10\n\ + \x0c\n\x05\x04\0\x02\x05\x06\x12\x03=\x11\x15\n\x0c\n\x05\x04\0\x02\x05\ + \x01\x12\x03=\x16#\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03=&'\ +"; + +static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto, +}; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + unsafe { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) + } +} diff --git a/libp2p-kad/src/protobuf_structs/mod.rs b/libp2p-kad/src/protobuf_structs/mod.rs new file mode 100644 index 00000000..c5f182e2 --- /dev/null +++ b/libp2p-kad/src/protobuf_structs/mod.rs @@ -0,0 +1,22 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +pub mod dht; +pub mod record; diff --git a/libp2p-kad/src/protobuf_structs/record.rs b/libp2p-kad/src/protobuf_structs/record.rs new file mode 100644 index 00000000..f6ce5936 --- /dev/null +++ b/libp2p-kad/src/protobuf_structs/record.rs @@ -0,0 +1,498 @@ +// This file is generated. Do not edit +// @generated + +// https://github.com/Manishearth/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy)] + +#![cfg_attr(rustfmt, rustfmt_skip)] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unsafe_code)] +#![allow(unused_imports)] +#![allow(unused_results)] + +use protobuf::Message as Message_imported_for_functions; +use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions; + +#[derive(PartialEq,Clone,Default)] +pub struct Record { + // message fields + key: ::protobuf::SingularField<::std::string::String>, + value: ::protobuf::SingularField<::std::vec::Vec>, + author: ::protobuf::SingularField<::std::string::String>, + signature: ::protobuf::SingularField<::std::vec::Vec>, + timeReceived: ::protobuf::SingularField<::std::string::String>, + // special fields + unknown_fields: ::protobuf::UnknownFields, + cached_size: ::protobuf::CachedSize, +} + +// see codegen.rs for the explanation why impl Sync explicitly +unsafe impl ::std::marker::Sync for Record {} + +impl Record { + pub fn new() -> Record { + ::std::default::Default::default() + } + + pub fn default_instance() -> &'static Record { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const Record, + }; + unsafe { + instance.get(Record::new) + } + } + + // optional string key = 1; + + pub fn clear_key(&mut self) { + self.key.clear(); + } + + pub fn has_key(&self) -> bool { + self.key.is_some() + } + + // Param is passed by value, moved + pub fn set_key(&mut self, v: ::std::string::String) { + self.key = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_key(&mut self) -> &mut ::std::string::String { + if self.key.is_none() { + self.key.set_default(); + } + self.key.as_mut().unwrap() + } + + // Take field + pub fn take_key(&mut self) -> ::std::string::String { + self.key.take().unwrap_or_else(|| ::std::string::String::new()) + } + + pub fn get_key(&self) -> &str { + match self.key.as_ref() { + Some(v) => &v, + None => "", + } + } + + fn get_key_for_reflect(&self) -> &::protobuf::SingularField<::std::string::String> { + &self.key + } + + fn mut_key_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::string::String> { + &mut self.key + } + + // optional bytes value = 2; + + pub fn clear_value(&mut self) { + self.value.clear(); + } + + pub fn has_value(&self) -> bool { + self.value.is_some() + } + + // Param is passed by value, moved + pub fn set_value(&mut self, v: ::std::vec::Vec) { + self.value = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_value(&mut self) -> &mut ::std::vec::Vec { + if self.value.is_none() { + self.value.set_default(); + } + self.value.as_mut().unwrap() + } + + // Take field + pub fn take_value(&mut self) -> ::std::vec::Vec { + self.value.take().unwrap_or_else(|| ::std::vec::Vec::new()) + } + + pub fn get_value(&self) -> &[u8] { + match self.value.as_ref() { + Some(v) => &v, + None => &[], + } + } + + fn get_value_for_reflect(&self) -> &::protobuf::SingularField<::std::vec::Vec> { + &self.value + } + + fn mut_value_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::vec::Vec> { + &mut self.value + } + + // optional string author = 3; + + pub fn clear_author(&mut self) { + self.author.clear(); + } + + pub fn has_author(&self) -> bool { + self.author.is_some() + } + + // Param is passed by value, moved + pub fn set_author(&mut self, v: ::std::string::String) { + self.author = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_author(&mut self) -> &mut ::std::string::String { + if self.author.is_none() { + self.author.set_default(); + } + self.author.as_mut().unwrap() + } + + // Take field + pub fn take_author(&mut self) -> ::std::string::String { + self.author.take().unwrap_or_else(|| ::std::string::String::new()) + } + + pub fn get_author(&self) -> &str { + match self.author.as_ref() { + Some(v) => &v, + None => "", + } + } + + fn get_author_for_reflect(&self) -> &::protobuf::SingularField<::std::string::String> { + &self.author + } + + fn mut_author_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::string::String> { + &mut self.author + } + + // optional bytes signature = 4; + + pub fn clear_signature(&mut self) { + self.signature.clear(); + } + + pub fn has_signature(&self) -> bool { + self.signature.is_some() + } + + // Param is passed by value, moved + pub fn set_signature(&mut self, v: ::std::vec::Vec) { + self.signature = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_signature(&mut self) -> &mut ::std::vec::Vec { + if self.signature.is_none() { + self.signature.set_default(); + } + self.signature.as_mut().unwrap() + } + + // Take field + pub fn take_signature(&mut self) -> ::std::vec::Vec { + self.signature.take().unwrap_or_else(|| ::std::vec::Vec::new()) + } + + pub fn get_signature(&self) -> &[u8] { + match self.signature.as_ref() { + Some(v) => &v, + None => &[], + } + } + + fn get_signature_for_reflect(&self) -> &::protobuf::SingularField<::std::vec::Vec> { + &self.signature + } + + fn mut_signature_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::vec::Vec> { + &mut self.signature + } + + // optional string timeReceived = 5; + + pub fn clear_timeReceived(&mut self) { + self.timeReceived.clear(); + } + + pub fn has_timeReceived(&self) -> bool { + self.timeReceived.is_some() + } + + // Param is passed by value, moved + pub fn set_timeReceived(&mut self, v: ::std::string::String) { + self.timeReceived = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_timeReceived(&mut self) -> &mut ::std::string::String { + if self.timeReceived.is_none() { + self.timeReceived.set_default(); + } + self.timeReceived.as_mut().unwrap() + } + + // Take field + pub fn take_timeReceived(&mut self) -> ::std::string::String { + self.timeReceived.take().unwrap_or_else(|| ::std::string::String::new()) + } + + pub fn get_timeReceived(&self) -> &str { + match self.timeReceived.as_ref() { + Some(v) => &v, + None => "", + } + } + + fn get_timeReceived_for_reflect(&self) -> &::protobuf::SingularField<::std::string::String> { + &self.timeReceived + } + + fn mut_timeReceived_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::string::String> { + &mut self.timeReceived + } +} + +impl ::protobuf::Message for Record { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.key)?; + }, + 2 => { + ::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.value)?; + }, + 3 => { + ::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.author)?; + }, + 4 => { + ::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.signature)?; + }, + 5 => { + ::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.timeReceived)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if let Some(ref v) = self.key.as_ref() { + my_size += ::protobuf::rt::string_size(1, &v); + } + if let Some(ref v) = self.value.as_ref() { + my_size += ::protobuf::rt::bytes_size(2, &v); + } + if let Some(ref v) = self.author.as_ref() { + my_size += ::protobuf::rt::string_size(3, &v); + } + if let Some(ref v) = self.signature.as_ref() { + my_size += ::protobuf::rt::bytes_size(4, &v); + } + if let Some(ref v) = self.timeReceived.as_ref() { + my_size += ::protobuf::rt::string_size(5, &v); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if let Some(ref v) = self.key.as_ref() { + os.write_string(1, &v)?; + } + if let Some(ref v) = self.value.as_ref() { + os.write_bytes(2, &v)?; + } + if let Some(ref v) = self.author.as_ref() { + os.write_string(3, &v)?; + } + if let Some(ref v) = self.signature.as_ref() { + os.write_bytes(4, &v)?; + } + if let Some(ref v) = self.timeReceived.as_ref() { + os.write_string(5, &v)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &::std::any::Any { + self as &::std::any::Any + } + fn as_any_mut(&mut self) -> &mut ::std::any::Any { + self as &mut ::std::any::Any + } + fn into_any(self: Box) -> ::std::boxed::Box<::std::any::Any> { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + ::protobuf::MessageStatic::descriptor_static(None::) + } +} + +impl ::protobuf::MessageStatic for Record { + fn new() -> Record { + Record::new() + } + + fn descriptor_static(_: ::std::option::Option) -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "key", + Record::get_key_for_reflect, + Record::mut_key_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "value", + Record::get_value_for_reflect, + Record::mut_value_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "author", + Record::get_author_for_reflect, + Record::mut_author_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "signature", + Record::get_signature_for_reflect, + Record::mut_signature_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "timeReceived", + Record::get_timeReceived_for_reflect, + Record::mut_timeReceived_for_reflect, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "Record", + fields, + file_descriptor_proto() + ) + }) + } + } +} + +impl ::protobuf::Clear for Record { + fn clear(&mut self) { + self.clear_key(); + self.clear_value(); + self.clear_author(); + self.clear_signature(); + self.clear_timeReceived(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for Record { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for Record { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\x0crecord.proto\x12\trecord.pb\"\x8a\x01\n\x06Record\x12\x10\n\x03key\ + \x18\x01\x20\x01(\tR\x03key\x12\x14\n\x05value\x18\x02\x20\x01(\x0cR\x05\ + value\x12\x16\n\x06author\x18\x03\x20\x01(\tR\x06author\x12\x1c\n\tsigna\ + ture\x18\x04\x20\x01(\x0cR\tsignature\x12\"\n\x0ctimeReceived\x18\x05\ + \x20\x01(\tR\x0ctimeReceivedJ\xac\x05\n\x06\x12\x04\0\0\x14\x01\n\x08\n\ + \x01\x0c\x12\x03\0\0\x12\n\x08\n\x01\x02\x12\x03\x01\x08\x11\nX\n\x02\ + \x04\0\x12\x04\x05\0\x14\x01\x1aL\x20Record\x20represents\x20a\x20dht\ + \x20record\x20that\x20contains\x20a\x20value\n\x20for\x20a\x20key\x20val\ + ue\x20pair\n\n\n\n\x03\x04\0\x01\x12\x03\x05\x08\x0e\n2\n\x04\x04\0\x02\ + \0\x12\x03\x07\x08\x20\x1a%\x20The\x20key\x20that\x20references\x20this\ + \x20record\n\n\x0c\n\x05\x04\0\x02\0\x04\x12\x03\x07\x08\x10\n\x0c\n\x05\ + \x04\0\x02\0\x05\x12\x03\x07\x11\x17\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\ + \x07\x18\x1b\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x07\x1e\x1f\n6\n\x04\ + \x04\0\x02\x01\x12\x03\n\x08!\x1a)\x20The\x20actual\x20value\x20this\x20\ + record\x20is\x20storing\n\n\x0c\n\x05\x04\0\x02\x01\x04\x12\x03\n\x08\ + \x10\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\n\x11\x16\n\x0c\n\x05\x04\0\ + \x02\x01\x01\x12\x03\n\x17\x1c\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\n\ + \x1f\x20\n-\n\x04\x04\0\x02\x02\x12\x03\r\x08#\x1a\x20\x20hash\x20of\x20\ + the\x20authors\x20public\x20key\n\n\x0c\n\x05\x04\0\x02\x02\x04\x12\x03\ + \r\x08\x10\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\r\x11\x17\n\x0c\n\x05\ + \x04\0\x02\x02\x01\x12\x03\r\x18\x1e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\ + \x03\r!\"\n7\n\x04\x04\0\x02\x03\x12\x03\x10\x08%\x1a*\x20A\x20PKI\x20si\ + gnature\x20for\x20the\x20key+value+author\n\n\x0c\n\x05\x04\0\x02\x03\ + \x04\x12\x03\x10\x08\x10\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x10\x11\ + \x16\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x10\x17\x20\n\x0c\n\x05\x04\0\ + \x02\x03\x03\x12\x03\x10#$\n<\n\x04\x04\0\x02\x04\x12\x03\x13\x08)\x1a/\ + \x20Time\x20the\x20record\x20was\x20received,\x20set\x20by\x20receiver\n\ + \n\x0c\n\x05\x04\0\x02\x04\x04\x12\x03\x13\x08\x10\n\x0c\n\x05\x04\0\x02\ + \x04\x05\x12\x03\x13\x11\x17\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x13\ + \x18$\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x13'(\ +"; + +static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto, +}; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + unsafe { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) + } +} diff --git a/libp2p-kad/src/protocol.rs b/libp2p-kad/src/protocol.rs new file mode 100644 index 00000000..73947fb7 --- /dev/null +++ b/libp2p-kad/src/protocol.rs @@ -0,0 +1,382 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Provides the `KadMsg` enum of all the possible messages transmitted with the Kademlia protocol, +//! and the `KademliaProtocolConfig` connection upgrade whose output is a +//! `Stream + Sink`. +//! +//! The `Stream` component is used to poll the underlying transport, and the `Sink` component is +//! used to send messages. + +use bytes::Bytes; +use futures::{Sink, Stream}; +use futures::future; +use libp2p_peerstore::PeerId; +use libp2p_swarm::{ConnectionUpgrade, Endpoint, Multiaddr}; +use protobuf::{self, Message}; +use protobuf_structs; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::iter; +use tokio_io::{AsyncRead, AsyncWrite}; +use varint::VarintCodec; + +#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)] +pub enum ConnectionType { + /// Sender hasn't tried to connect to peer. + NotConnected = 0, + /// Sender is currently connected to peer. + Connected = 1, + /// Sender was recently connected to peer. + CanConnect = 2, + /// Sender tried to connect to peer but failed. + CannotConnect = 3, +} + +impl From for ConnectionType { + #[inline] + fn from(raw: protobuf_structs::dht::Message_ConnectionType) -> ConnectionType { + use protobuf_structs::dht::Message_ConnectionType::*; + match raw { + NOT_CONNECTED => ConnectionType::NotConnected, + CONNECTED => ConnectionType::Connected, + CAN_CONNECT => ConnectionType::CanConnect, + CANNOT_CONNECT => ConnectionType::CannotConnect, + } + } +} + +impl Into for ConnectionType { + #[inline] + fn into(self) -> protobuf_structs::dht::Message_ConnectionType { + use protobuf_structs::dht::Message_ConnectionType::*; + match self { + ConnectionType::NotConnected => NOT_CONNECTED, + ConnectionType::Connected => CONNECTED, + ConnectionType::CanConnect => CAN_CONNECT, + ConnectionType::CannotConnect => CANNOT_CONNECT, + } + } +} + +/// Information about a peer, as known by the sender. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Peer { + pub node_id: PeerId, + /// The multiaddresses that are known for that peer. + pub multiaddrs: Vec, + pub connection_ty: ConnectionType, +} + +impl<'a> From<&'a mut protobuf_structs::dht::Message_Peer> for Peer { + fn from(peer: &'a mut protobuf_structs::dht::Message_Peer) -> Peer { + let node_id = PeerId::from_bytes(peer.get_id().to_vec()).unwrap(); // TODO: don't unwrap + let addrs = peer.take_addrs() + .into_iter() + .map(|a| Multiaddr::from_bytes(a).unwrap()) // TODO: don't unwrap + .collect(); + let connection_ty = peer.get_connection().into(); + + Peer { + node_id: node_id, + multiaddrs: addrs, + connection_ty: connection_ty, + } + } +} + +impl Into for Peer { + fn into(self) -> protobuf_structs::dht::Message_Peer { + let mut out = protobuf_structs::dht::Message_Peer::new(); + out.set_id(self.node_id.into_bytes()); + for addr in self.multiaddrs { + out.mut_addrs().push(addr.into_bytes()); + } + out.set_connection(self.connection_ty.into()); + out + } +} + +/// Configuration for a Kademlia connection upgrade. When applied to a connection, turns this +/// connection into a `Stream + Sink` whose items are of type `KadMsg`. +#[derive(Debug, Default, Copy, Clone)] +pub struct KademliaProtocolConfig; + +impl ConnectionUpgrade for KademliaProtocolConfig +where + C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ +{ + type Output = + Box>; + type Future = future::FutureResult; + type NamesIter = iter::Once<(Bytes, ())>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once(("/ipfs/kad/1.0.0".into(), ())) + } + + #[inline] + fn upgrade(self, incoming: C, _: (), _: Endpoint, _: &Multiaddr) -> Self::Future { + future::ok(kademlia_protocol(incoming)) + } +} + +// Upgrades a socket to use the Kademlia protocol. +fn kademlia_protocol<'a, S>( + socket: S, +) -> Box + 'a> +where + S: AsyncRead + AsyncWrite + 'a, +{ + let wrapped = socket + .framed(VarintCodec::default()) + .from_err::() + .with(|request| -> Result<_, IoError> { + let proto_struct = msg_to_proto(request); + Ok(proto_struct.write_to_bytes().unwrap()) // TODO: error? + }) + .and_then(|bytes| { + let response = protobuf::parse_from_bytes(&bytes)?; + proto_to_msg(response) + }); + + Box::new(wrapped) +} + +/// Custom trait that derives `Sink` and `Stream`, so that we can box it. +pub trait KadStreamSink + : Stream + Sink + { +} +impl KadStreamSink for T +where + T: Stream + Sink, +{ +} + +/// Message that we can send to a peer or received from a peer. +// TODO: document the rest +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum KadMsg { + /// Ping request or response. + Ping, + /// Target must save the given record, can be queried later with `GetValueReq`. + PutValue { + /// Identifier of the record. + key: Vec, + /// The record itself. + record: (), //record: protobuf_structs::record::Record, // TODO: no + }, + GetValueReq { + /// Identifier of the record. + key: Vec, + }, + GetValueRes { + /// Identifier of the returned record. + key: Vec, + record: (), //record: Option, // TODO: no + closer_peers: Vec, + }, + /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes + /// returned is not specified, but should be around 20. + FindNodeReq { + /// Identifier of the node. + key: Vec, + }, + /// Response to a `FindNodeReq`. + FindNodeRes { + /// Results of the request. + closer_peers: Vec, + }, +} + +// Turns a type-safe kadmelia message into the corresponding row protobuf message. +fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message { + match kad_msg { + KadMsg::Ping => { + let mut msg = protobuf_structs::dht::Message::new(); + msg.set_field_type(protobuf_structs::dht::Message_MessageType::PING); + msg + } + KadMsg::PutValue { key, .. } => { + let mut msg = protobuf_structs::dht::Message::new(); + msg.set_field_type(protobuf_structs::dht::Message_MessageType::PUT_VALUE); + msg.set_key(key); + //msg.set_record(record); // TODO: + msg + } + KadMsg::GetValueReq { key } => { + let mut msg = protobuf_structs::dht::Message::new(); + msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_VALUE); + msg.set_key(key); + msg.set_clusterLevelRaw(10); + msg + } + KadMsg::GetValueRes { .. } => unimplemented!(), + KadMsg::FindNodeReq { key } => { + let mut msg = protobuf_structs::dht::Message::new(); + msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE); + msg.set_key(key); + msg.set_clusterLevelRaw(10); + msg + } + KadMsg::FindNodeRes { closer_peers } => { + // TODO: if empty, the remote will think it's a request + assert!(!closer_peers.is_empty()); + let mut msg = protobuf_structs::dht::Message::new(); + msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE); + msg.set_clusterLevelRaw(9); + for peer in closer_peers { + msg.mut_closerPeers().push(peer.into()); + } + msg + } + } +} + +/// Turns a raw Kademlia message into a type-safe message. +fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result { + match message.get_field_type() { + protobuf_structs::dht::Message_MessageType::PING => Ok(KadMsg::Ping), + + protobuf_structs::dht::Message_MessageType::PUT_VALUE => { + let key = message.take_key(); + let _record = message.take_record(); + Ok(KadMsg::PutValue { + key: key, + record: (), + }) + } + + protobuf_structs::dht::Message_MessageType::GET_VALUE => { + let key = message.take_key(); + Ok(KadMsg::GetValueReq { key: key }) + } + + protobuf_structs::dht::Message_MessageType::FIND_NODE => { + if message.get_closerPeers().is_empty() { + Ok(KadMsg::FindNodeReq { + key: message.take_key(), + }) + } else { + Ok(KadMsg::FindNodeRes { + closer_peers: message + .mut_closerPeers() + .iter_mut() + .map(|peer| peer.into()) + .collect(), + }) + } + } + + protobuf_structs::dht::Message_MessageType::GET_PROVIDERS + | protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => { + // These messages don't seem to be used in the protocol in practice, so if we receive + // them we suppose that it's a mistake in the protocol usage. + Err(IoError::new( + IoErrorKind::InvalidData, + "received an unsupported kad message type", + )) + } + } +} + +#[cfg(test)] +mod tests { + extern crate libp2p_tcp_transport; + extern crate tokio_core; + + use self::libp2p_tcp_transport::TcpConfig; + use self::tokio_core::reactor::Core; + use futures::{Future, Sink, Stream}; + use libp2p_peerstore::PeerId; + use libp2p_swarm::Transport; + use protocol::{ConnectionType, KadMsg, KademliaProtocolConfig, Peer}; + use std::sync::mpsc; + use std::thread; + + #[test] + fn correct_transfer() { + // We open a server and a client, send a message between the two, and check that they were + // successfully received. + + test_one(KadMsg::Ping); + test_one(KadMsg::PutValue { + key: vec![1, 2, 3, 4], + record: (), + }); + test_one(KadMsg::GetValueReq { + key: vec![10, 11, 12], + }); + test_one(KadMsg::FindNodeReq { + key: vec![9, 12, 0, 245, 245, 201, 28, 95], + }); + test_one(KadMsg::FindNodeRes { + closer_peers: vec![ + Peer { + node_id: PeerId::from_public_key(&[93, 80, 12, 250]), + multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()], + connection_ty: ConnectionType::Connected, + }, + ], + }); + // TODO: all messages + + fn test_one(msg_server: KadMsg) { + let msg_client = msg_server.clone(); + let (tx, rx) = mpsc::channel(); + + let bg_thread = thread::spawn(move || { + let mut core = Core::new().unwrap(); + let transport = TcpConfig::new(core.handle()).with_upgrade(KademliaProtocolConfig); + + let (listener, addr) = transport + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + .unwrap(); + tx.send(addr).unwrap(); + + let future = listener + .into_future() + .map_err(|(err, _)| err) + .and_then(|(client, _)| client.unwrap().map(|v| v.0)) + .and_then(|proto| proto.into_future().map_err(|(err, _)| err).map(|(v, _)| v)) + .map(|recv_msg| { + assert_eq!(recv_msg.unwrap(), msg_server); + () + }); + + let _ = core.run(future).unwrap(); + }); + + let mut core = Core::new().unwrap(); + let transport = TcpConfig::new(core.handle()).with_upgrade(KademliaProtocolConfig); + + let future = transport + .dial(rx.recv().unwrap()) + .unwrap_or_else(|_| panic!()) + .and_then(|proto| proto.0.send(msg_client)) + .map(|_| ()); + + let _ = core.run(future).unwrap(); + bg_thread.join().unwrap(); + } + } +} diff --git a/libp2p-kad/src/query.rs b/libp2p-kad/src/query.rs new file mode 100644 index 00000000..e905365e --- /dev/null +++ b/libp2p-kad/src/query.rs @@ -0,0 +1,336 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! This module handles performing iterative queries about the network. + +use fnv::FnvHashSet; +use futures::{future, Future}; +use kad_server::KademliaServerController; +use kbucket::KBucketsPeerId; +use libp2p_peerstore::PeerId; +use multiaddr::{AddrComponent, Multiaddr}; +use protocol; +use rand; +use smallvec::SmallVec; +use std::cmp::Ordering; +use std::io::Error as IoError; +use std::mem; +use std::time::Duration; + +/// Interface that the query uses to communicate with the rest of the system. +pub trait QueryInterface: Clone { + /// Returns the peer ID of the local node. + fn local_id(&self) -> &PeerId; + + /// Finds the nodes closest to a peer ID. + fn kbuckets_find_closest(&self, addr: &PeerId) -> Vec; + + /// Adds new known multiaddrs for the given peer. + fn peer_add_addrs(&self, peer: &PeerId, multiaddrs: I, ttl: Duration) + where + I: Iterator; + + /// Returns the level of parallelism wanted for this query. + fn parallelism(&self) -> usize; + + /// Attempts to contact the given multiaddress, then calls `and_then` on success. Returns a + /// future that contains the output of `and_then`, or an error if we failed to contact the + /// remote. + // TODO: use HKTB once Rust supports that, to avoid boxing the future + fn send( + &self, + addr: Multiaddr, + and_then: F, + ) -> Box> + where + F: FnOnce(&KademliaServerController) -> FRet + 'static, + FRet: 'static; +} + +/// Starts a query for an iterative `FIND_NODE` request. +#[inline] +pub fn find_node<'a, I>( + query_interface: I, + searched_key: PeerId, +) -> Box, Error = IoError> + 'a> +where + I: QueryInterface + 'a, +{ + query(query_interface, searched_key, 20) // TODO: constant +} + +/// Refreshes a specific bucket by performing an iterative `FIND_NODE` on a random ID of this +/// bucket. +/// +/// Returns a dummy no-op future if `bucket_num` is out of range. +pub fn refresh<'a, I>( + query_interface: I, + bucket_num: usize, +) -> Box + 'a> +where + I: QueryInterface + 'a, +{ + let peer_id = match gen_random_id(&query_interface, bucket_num) { + Ok(p) => p, + Err(()) => return Box::new(future::ok(())), + }; + + let future = find_node(query_interface, peer_id).map(|_| ()); + Box::new(future) as Box<_> +} + +// Generates a random `PeerId` that belongs to the given bucket. +// +// Returns an error if `bucket_num` is out of range. +fn gen_random_id(query_interface: &I, bucket_num: usize) -> Result +where + I: ?Sized + QueryInterface, +{ + let my_id = query_interface.local_id(); + let my_id_len = my_id.as_bytes().len(); + + // TODO: this 2 is magic here ; it is the length of the hash of the multihash + let bits_diff = bucket_num + 1; + if bits_diff > 8 * (my_id_len - 2) { + return Err(()); + } + + let mut random_id = [0; 64]; + for byte in 0..my_id_len { + match byte.cmp(&(my_id_len - bits_diff / 8 - 1)) { + Ordering::Less => { + random_id[byte] = my_id.as_bytes()[byte]; + } + Ordering::Equal => { + let mask: u8 = (1 << (bits_diff % 8)) - 1; + random_id[byte] = (my_id.as_bytes()[byte] & !mask) | (rand::random::() & mask); + } + Ordering::Greater => { + random_id[byte] = rand::random(); + } + } + } + + let peer_id = PeerId::from_bytes(random_id[..my_id_len].to_owned()) + .expect("randomly-generated peer ID should always be valid"); + Ok(peer_id) +} + +// Generic query-performing function. +fn query<'a, I>( + query_interface: I, + searched_key: PeerId, + num_results: usize, +) -> Box, Error = IoError> + 'a> +where + I: QueryInterface + 'a, +{ + // State of the current iterative process. + struct State<'a> { + // If true, we are still in the first step of the algorithm where we try to find the + // closest node. If false, then we are contacting the k closest nodes in order to fill the + // list with enough results. + looking_for_closer: bool, + // Final output of the iteration. + result: Vec, + // For each open connection, a future with the response of the remote. + // Note that don't use a `SmallVec` here because `select_all` produces a `Vec`. + current_attempts_fut: Vec, Error = IoError> + 'a>>, + // For each open connection, the peer ID that we are connected to. + // Must always have the same length as `current_attempts_fut`. + current_attempts_addrs: SmallVec<[PeerId; 32]>, + // Nodes that need to be attempted. + pending_nodes: Vec, + // Peers that we tried to contact but failed. + failed_to_contact: FnvHashSet, + } + + let initial_state = State { + looking_for_closer: true, + result: Vec::with_capacity(num_results), + current_attempts_fut: Vec::new(), + current_attempts_addrs: SmallVec::new(), + pending_nodes: query_interface.kbuckets_find_closest(&searched_key), + failed_to_contact: Default::default(), + }; + + let parallelism = query_interface.parallelism(); + + // Start of the iterative process. + let stream = future::loop_fn(initial_state, move |mut state| { + let searched_key = searched_key.clone(); + let query_interface = query_interface.clone(); + let query_interface2 = query_interface.clone(); + + // Find out which nodes to contact at this iteration. + let to_contact = { + let wanted_len = if state.looking_for_closer { + parallelism.saturating_sub(state.current_attempts_fut.len()) + } else { + num_results.saturating_sub(state.current_attempts_fut.len()) + }; + let mut to_contact = SmallVec::<[_; 16]>::new(); + while to_contact.len() < wanted_len && !state.pending_nodes.is_empty() { + // Move the first element of `pending_nodes` to `to_contact`, but ignore nodes that + // are already part of the results or of a current attempt or if we failed to + // contact it before. + let peer = state.pending_nodes.remove(0); + if state.result.iter().any(|p| p == &peer) { + continue; + } + if state.current_attempts_addrs.iter().any(|p| p == &peer) { + continue; + } + if state.failed_to_contact.iter().any(|p| p == &peer) { + continue; + } + to_contact.push(peer); + } + to_contact + }; + + // For each node in `to_contact`, start an RPC query and a corresponding entry in the two + // `state.current_attempts_*` fields. + for peer in to_contact { + let multiaddr: Multiaddr = AddrComponent::P2P(peer.clone().into_bytes()).into(); + + let searched_key2 = searched_key.clone(); + let resp_rx = + query_interface.send(multiaddr.clone(), move |ctl| ctl.find_node(&searched_key2)); + state.current_attempts_addrs.push(peer.clone()); + let current_attempt = resp_rx.flatten(); + state + .current_attempts_fut + .push(Box::new(current_attempt) as Box<_>); + } + debug_assert_eq!( + state.current_attempts_addrs.len(), + state.current_attempts_fut.len() + ); + + // Extract `current_attempts_fut` so that we can pass it to `select_all`. We will push the + // values back when inside the loop. + let current_attempts_fut = mem::replace(&mut state.current_attempts_fut, Vec::new()); + if current_attempts_fut.is_empty() { + // If `current_attempts_fut` is empty, then `select_all` would panic. It attempts + // when we have no additional node to query. + let future = future::ok(future::Loop::Break(state)); + return future::Either::A(future); + } + + // This is the future that continues or breaks the `loop_fn`. + let future = future::select_all(current_attempts_fut.into_iter()).then(move |result| { + let (message, trigger_idx, other_current_attempts) = match result { + Err((err, trigger_idx, other_current_attempts)) => { + (Err(err), trigger_idx, other_current_attempts) + } + Ok((message, trigger_idx, other_current_attempts)) => { + (Ok(message), trigger_idx, other_current_attempts) + } + }; + + // Putting back the extracted elements in `state`. + let remote_id = state.current_attempts_addrs.remove(trigger_idx); + debug_assert!(state.current_attempts_fut.is_empty()); + state.current_attempts_fut = other_current_attempts; + + // `message` contains the reason why the current future was woken up. + let closer_peers = match message { + Ok(msg) => msg, + Err(_) => { + state.failed_to_contact.insert(remote_id); + return Ok(future::Loop::Continue(state)); + } + }; + + // Update `state.result` with the fact that we received a valid message from a node. + if let Some(insert_pos) = state.result.iter().position(|e| { + e.distance_with(&searched_key) >= remote_id.distance_with(&searched_key) + }) { + if state.result[insert_pos] != remote_id { + state.result.pop(); + state.result.insert(insert_pos, remote_id); + } + } else if state.result.len() < num_results { + state.result.push(remote_id); + } + + // The loop below will set this variable to `true` if we find a new element to put at + // the top of the result. This would mean that we have to continue looping. + let mut local_nearest_node_updated = false; + + // Update `state` with the actual content of the message. + for mut peer in closer_peers { + // Update the peerstore with the information sent by + // the remote. + { + let valid_multiaddrs = peer.multiaddrs.drain(..); + query_interface2.peer_add_addrs( + &peer.node_id, + valid_multiaddrs, + Duration::from_secs(3600), + ); // TODO: which TTL? + } + + if peer.node_id.distance_with(&searched_key) + <= state.result[0].distance_with(&searched_key) + { + local_nearest_node_updated = true; + } + + if state.result.iter().any(|ma| ma == &peer.node_id) { + continue; + } + + // Insert the node into `pending_nodes` at the right position, or do not + // insert it if it is already in there. + if let Some(insert_pos) = state.pending_nodes.iter().position(|e| { + e.distance_with(&searched_key) >= peer.node_id.distance_with(&searched_key) + }) { + if state.pending_nodes[insert_pos] != peer.node_id { + state.pending_nodes.insert(insert_pos, peer.node_id.clone()); + } + } else { + state.pending_nodes.push(peer.node_id.clone()); + } + } + + if state.result.len() >= num_results + || (!state.looking_for_closer && state.current_attempts_fut.is_empty()) + { + // Check that our `Vec::with_capacity` is correct. + debug_assert_eq!(state.result.capacity(), num_results); + + Ok(future::Loop::Break(state)) + } else { + if !local_nearest_node_updated { + state.looking_for_closer = false; + } + + Ok(future::Loop::Continue(state)) + } + }); + + future::Either::B(future) + }); + + let stream = stream.map(|state| state.result); + Box::new(stream) as Box<_> +} diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 49bf8daa..92cc1a75 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -48,6 +48,7 @@ use multiaddr::Multiaddr; use muxing::StreamMuxer; use parking_lot::Mutex; use std::io::Error as IoError; +use std::mem; use std::sync::Arc; use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode}; diff --git a/libp2p-swarm/src/swarm.rs b/libp2p-swarm/src/swarm.rs index 300917a2..ecc10b57 100644 --- a/libp2p-swarm/src/swarm.rs +++ b/libp2p-swarm/src/swarm.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::fmt; use std::io::Error as IoError; use futures::{future, Async, Future, IntoFuture, Poll, Stream}; use futures::sync::mpsc; @@ -93,6 +94,18 @@ where new_toprocess: mpsc::UnboundedSender>>, } +impl fmt::Debug for SwarmController +where + T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/ + C: fmt::Debug + ConnectionUpgrade + 'static, // TODO: 'static :-/ +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("SwarmController") + .field(&self.upgraded) + .finish() + } +} + impl Clone for SwarmController where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ diff --git a/varint-rs/src/lib.rs b/varint-rs/src/lib.rs index 216b1aee..6898deb3 100644 --- a/varint-rs/src/lib.rs +++ b/varint-rs/src/lib.rs @@ -467,6 +467,7 @@ where fn encode(&mut self, item: D, dst: &mut BytesMut) -> Result<(), io::Error> { let encoded_len = encode(item.as_ref().len()); + dst.reserve(encoded_len.len() + item.as_ref().len()); dst.put(encoded_len); dst.put(item); Ok(())