diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 44715bfd..6a904536 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,26 +11,26 @@ jobs: name: Build and test runs-on: ubuntu-latest steps: - - uses: actions/checkout@v1 - - name: Cache cargo registry - uses: actions/cache@v1 - with: - path: ~/.cargo/registry - key: cargo-registry-${{ hashFiles('Cargo.toml') }} - - name: Cache cargo index - uses: actions/cache@v1 - with: - path: ~/.cargo/git - key: cargo-index-${{ hashFiles('Cargo.toml') }} - - name: Cache cargo build - uses: actions/cache@v1 - with: - path: target - key: cargo-build-target-${{ hashFiles('Cargo.toml') }} - - name: Run tests, with no feature - run: cargo test --workspace --no-default-features - - name: Run tests, with all features - run: cargo test --workspace --all-features + - uses: actions/checkout@v1 + - name: Cache cargo registry + uses: actions/cache@v1 + with: + path: ~/.cargo/registry + key: cargo-registry-${{ hashFiles('Cargo.toml') }} + - name: Cache cargo index + uses: actions/cache@v1 + with: + path: ~/.cargo/git + key: cargo-index-${{ hashFiles('Cargo.toml') }} + - name: Cache cargo build + uses: actions/cache@v1 + with: + path: target + key: cargo-build-target-${{ hashFiles('Cargo.toml') }} + - name: Run tests, with no feature + run: cargo test --workspace --no-default-features + - name: Run tests, with all features + run: cargo test --workspace --all-features test-wasm: name: Build on WASM @@ -40,40 +40,40 @@ jobs: env: CC: clang-10 steps: - - uses: actions/checkout@v1 - - name: Install Rust - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - target: wasm32-unknown-unknown - override: true - - name: Install a recent version of clang - run: | - wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | apt-key add - - echo "deb http://apt.llvm.org/bionic/ llvm-toolchain-bionic-10 main" >> /etc/apt/sources.list - apt-get update - apt-get install -y clang-10 - - name: Install CMake - run: apt-get install -y cmake - - name: Cache cargo registry - uses: actions/cache@v1 - with: - path: ~/.cargo/registry - key: wasm-cargo-registry-${{ hashFiles('Cargo.toml') }} - - name: Cache cargo index - uses: actions/cache@v1 - with: - path: ~/.cargo/git - key: wasm-cargo-index-${{ hashFiles('Cargo.toml') }} - - name: Cache cargo build - uses: actions/cache@v1 - with: - path: target - key: wasm-cargo-build-target-${{ hashFiles('Cargo.toml') }} - - name: Build on WASM - # TODO: also run `cargo test` - # TODO: ideally we would build `--workspace`, but not all crates compile for WASM - run: cargo build --target=wasm32-unknown-unknown + - uses: actions/checkout@v1 + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + target: wasm32-unknown-unknown + override: true + - name: Install a recent version of clang + run: | + wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | apt-key add - + echo "deb http://apt.llvm.org/bionic/ llvm-toolchain-bionic-10 main" >> /etc/apt/sources.list + apt-get update + apt-get install -y clang-10 + - name: Install CMake + run: apt-get install -y cmake + - name: Cache cargo registry + uses: actions/cache@v1 + with: + path: ~/.cargo/registry + key: wasm-cargo-registry-${{ hashFiles('Cargo.toml') }} + - name: Cache cargo index + uses: actions/cache@v1 + with: + path: ~/.cargo/git + key: wasm-cargo-index-${{ hashFiles('Cargo.toml') }} + - name: Cache cargo build + uses: actions/cache@v1 + with: + path: target + key: wasm-cargo-build-target-${{ hashFiles('Cargo.toml') }} + - name: Build on WASM + # TODO: also run `cargo test` + # TODO: ideally we would build `--workspace`, but not all crates compile for WASM + run: cargo build --target=wasm32-unknown-unknown check-rustdoc-links: name: Check rustdoc intra-doc links @@ -81,13 +81,13 @@ jobs: container: image: rust steps: - - uses: actions/checkout@v1 - - name: Install nightly Rust - # TODO: intra-doc links are available on nightly only - # see https://doc.rust-lang.org/nightly/rustdoc/lints.html#intra_doc_link_resolution_failure - run: rustup default nightly - - name: Check rustdoc links - run: RUSTDOCFLAGS="--deny intra_doc_link_resolution_failure" cargo +nightly doc --verbose --workspace --no-deps --document-private-items + - uses: actions/checkout@v1 + - name: Install nightly Rust + # TODO: intra-doc links are available on nightly only + # see https://doc.rust-lang.org/nightly/rustdoc/lints.html#intra_doc_link_resolution_failure + run: rustup default nightly-2020-05-20 + - name: Check rustdoc links + run: RUSTDOCFLAGS="--deny intra_doc_link_resolution_failure" cargo doc --verbose --workspace --no-deps --document-private-items integration-test: name: Integration tests @@ -95,21 +95,21 @@ jobs: container: image: rust steps: - - uses: actions/checkout@v1 - - name: Cache cargo registry - uses: actions/cache@v1 - with: - path: ~/.cargo/registry - key: cargo-registry-${{ hashFiles('Cargo.toml') }} - - name: Cache cargo index - uses: actions/cache@v1 - with: - path: ~/.cargo/git - key: cargo-index-${{ hashFiles('Cargo.toml') }} - - name: Cache cargo build - uses: actions/cache@v1 - with: - path: target - key: cargo-build-target-${{ hashFiles('Cargo.toml') }} - - name: Run ipfs-kad example - run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad + - uses: actions/checkout@v1 + - name: Cache cargo registry + uses: actions/cache@v1 + with: + path: ~/.cargo/registry + key: cargo-registry-${{ hashFiles('Cargo.toml') }} + - name: Cache cargo index + uses: actions/cache@v1 + with: + path: ~/.cargo/git + key: cargo-index-${{ hashFiles('Cargo.toml') }} + - name: Cache cargo build + uses: actions/cache@v1 + with: + path: target + key: cargo-build-target-${{ hashFiles('Cargo.toml') }} + - name: Run ipfs-kad example + run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad diff --git a/Cargo.toml b/Cargo.toml index 26d2bd78..f5e6abf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ libp2p-tcp = { version = "0.19.0", path = "transports/tcp", optional = true } libp2p-websocket = { version = "0.19.0", path = "transports/websocket", optional = true } [dev-dependencies] -async-std = "1.0" +async-std = "~1.5.0" env_logger = "0.7.1" [workspace] diff --git a/core/Cargo.toml b/core/Cargo.toml index c2f8166b..98db09f0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -39,7 +39,7 @@ zeroize = "1" ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false } [dev-dependencies] -async-std = "1.0" +async-std = "~1.5.0" libp2p-mplex = { version = "0.19.0", path = "../muxers/mplex" } libp2p-secio = { version = "0.19.0", path = "../protocols/secio" } libp2p-tcp = { version = "0.19.0", path = "../transports/tcp" } diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index fa4f03e9..fa7233e5 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -18,7 +18,7 @@ smallvec = "1.0" unsigned-varint = "0.3.2" [dev-dependencies] -async-std = "1.2" +async-std = "~1.5.0" quickcheck = "0.9.0" rand = "0.7.2" rw-stream-sink = "0.2.1" diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 23398a67..1a00aeb8 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -20,5 +20,5 @@ parking_lot = "0.10" unsigned-varint = { version = "0.3", features = ["futures-codec"] } [dev-dependencies] -async-std = "1.0" +async-std = "~1.5.0" libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" } diff --git a/protocols/deflate/Cargo.toml b/protocols/deflate/Cargo.toml index ab7f0018..dc7b5938 100644 --- a/protocols/deflate/Cargo.toml +++ b/protocols/deflate/Cargo.toml @@ -15,7 +15,7 @@ libp2p-core = { version = "0.19.0", path = "../../core" } flate2 = "1.0" [dev-dependencies] -async-std = "1.0" +async-std = "~1.5.0" libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" } rand = "0.7" quickcheck = "0.9" diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 8d91104d..9944963d 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -28,7 +28,7 @@ smallvec = "1.1.0" prost = "0.6.1" [dev-dependencies] -async-std = "1.4.0" +async-std = "~1.5.0" env_logger = "0.7.1" libp2p-plaintext = { version = "0.19.0", path = "../plaintext" } libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" } diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 77f3dec9..5b680848 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -19,7 +19,7 @@ smallvec = "1.0" wasm-timer = "0.2" [dev-dependencies] -async-std = "1.0" +async-std = "~1.5.0" libp2p-mplex = { version = "0.19.0", path = "../../muxers/mplex" } libp2p-secio = { version = "0.19.0", path = "../../protocols/secio" } libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" } diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index d2a46b26..c16a4ef5 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -31,12 +31,13 @@ void = "1.0" bs58 = "0.3.0" derivative = "2.0.2" -trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "master" } +trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "weighted_routing" } [dev-dependencies] libp2p-secio = { version = "0.19.0", path = "../secio" } libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" } quickcheck = "0.9.0" +env_logger = "0.7.1" [build-dependencies] prost-build = "0.6" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 1e432822..16c584ab 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1,5 +1,4 @@ // Copyright 2018 Parity Technologies (UK) Ltd. -#![allow(clippy::needless_lifetimes)] // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -21,15 +20,17 @@ //! Implementation of the `Kademlia` network behaviour. +#![allow(clippy::needless_lifetimes)] + mod test; use crate::K_VALUE; use crate::addresses::{Addresses, Remove}; use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn}; use crate::jobs::*; -use crate::kbucket::{self, KBucketsTable, NodeStatus, KBucketRef}; +use crate::kbucket::{self, KBucketsTable, NodeStatus, KBucketRef, KeyBytes}; use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer}; -use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState}; +use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState, WeightedPeer}; use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord}; use crate::contact::Contact; use fnv::{FnvHashMap, FnvHashSet}; @@ -52,7 +53,7 @@ use std::task::{Context, Poll}; use std::vec; use wasm_timer::Instant; use libp2p_core::identity::ed25519::{Keypair, PublicKey}; -use trust_graph::TrustGraph; +use trust_graph::{TrustGraph, Certificate}; use derivative::Derivative; pub use crate::query::QueryStats; @@ -412,7 +413,7 @@ where { let info = QueryInfo::GetClosestPeers { key: key.borrow().to_vec() }; let target = kbucket::Key::new(key); - let peers = self.kbuckets.closest_keys(&target); + let peers = Self::closest_keys(&mut self.kbuckets, &target); let inner = QueryInner::new(info); self.queries.add_iter_closest(target.clone(), peers, inner) } @@ -436,7 +437,7 @@ where let done = records.len() >= quorum.get(); let target = kbucket::Key::new(key.clone()); let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None }; - let peers = self.kbuckets.closest_keys(&target); + let peers = Self::closest_keys(&mut self.kbuckets, &target); let inner = QueryInner::new(info); let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*) @@ -472,7 +473,8 @@ where self.record_ttl.map(|ttl| Instant::now() + ttl)); let quorum = quorum.eval(self.queries.config().replication_factor); let target = kbucket::Key::new(record.key.clone()); - let peers = self.kbuckets.closest_keys(&target); + + let peers = Self::closest_keys(&mut self.kbuckets, &target); let context = PutRecordContext::Publish; let info = QueryInfo::PutRecord { context, @@ -531,7 +533,7 @@ where peer: local_key.preimage().clone(), remaining: None }; - let peers = self.kbuckets.closest_keys(&local_key).collect::>(); + let peers = Self::closest_keys(&mut self.kbuckets, &local_key).collect::>(); if peers.is_empty() { Err(NoKnownPeers()) } else { @@ -574,13 +576,15 @@ where bs58::encode(target.as_ref()).into_string(), // sha256 ); let provider_key = self.kbuckets.local_public_key(); - let peers = self.kbuckets.closest_keys(&target); + let certificates = self.trust.get_all_certs(&provider_key, &[]); + let peers = Self::closest_keys(&mut self.kbuckets, &target); let context = AddProviderContext::Publish; let info = QueryInfo::AddProvider { context, key, phase: AddProviderPhase::GetClosestPeers, - provider_key + provider_key, + certificates }; let inner = QueryInner::new(info); let id = self.queries.add_iter_closest(target.clone(), peers, inner); @@ -617,7 +621,7 @@ where bs58::encode(target.preimage().as_ref()).into_string(), // peer id bs58::encode(target.as_ref()).into_string(), // sha256 ); - let peers = self.kbuckets.closest_keys(&target); + let peers = Self::closest_keys(&mut self.kbuckets, &target); let inner = QueryInner::new(info); self.queries.add_iter_closest(target.clone(), peers, inner) } @@ -627,6 +631,16 @@ where where I: Iterator + Clone { + // Add certificates to trust graph + let cur_time = trust_graph::current_time(); + for peer in peers.clone() { + for cert in peer.certificates.iter() { + self.trust.add(cert, cur_time).unwrap_or_else(|err| { + log::warn!("Unable to add certificate for peer {}: {}", peer.node_id, err); + }) + } + } + let local_id = self.kbuckets.local_key().preimage().clone(); let others_iter = peers.filter(|p| p.node_id != local_id); @@ -641,6 +655,8 @@ where )); } + let trust = &self.trust; + if let Some(query) = self.queries.get_mut(query_id) { log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id); for peer in others_iter.clone() { @@ -648,7 +664,10 @@ where peer, source, query_id); query.inner.contacts.insert(peer.node_id.clone(), peer.clone().into()); } - query.on_success(source, others_iter.cloned().map(|kp| kp.node_id)) + query.on_success(source, others_iter.map(|kp| WeightedPeer { + peer_id: kp.node_id.clone().into(), + weight: trust.weight(&kp.public_key).unwrap_or_default() + })) } } @@ -659,23 +678,26 @@ where if target == self.kbuckets.local_key() { Vec::new() } else { - self.kbuckets + let mut peers: Vec<_> = self.kbuckets .closest(target) .filter(|e| e.node.key.preimage() != source) .take(self.queries.config().replication_factor.get()) .map(KadPeer::from) - .collect() + .collect(); + peers.iter_mut().for_each(|mut peer| + peer.certificates = self.trust.get_all_certs(&peer.public_key, &[]) + ); + peers } } /// Collects all peers who are known to be providers of the value for a given `Multihash`. fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec { let kbuckets = &mut self.kbuckets; - self.store.providers(key) + let mut peers = self.store.providers(key) .into_iter() .filter_map(move |p| { - - let entry = if &p.provider != source { + let kad_peer = if &p.provider != source { let key = kbucket::Key::new(p.provider.clone()); kbuckets.entry(&key).view().map(|e| KadPeer::from(e)) } else { @@ -686,25 +708,33 @@ where bs58::encode(key).into_string(), p.provider, source, - entry.is_some() + kad_peer.is_some() ); - entry + kad_peer }) .take(self.queries.config().replication_factor.get()) - .collect() + .collect::>(); + + peers.iter_mut().for_each(|peer| + peer.certificates = self.trust.get_all_certs(&peer.public_key, &[]) + ); + + peers } /// Starts an iterative `ADD_PROVIDER` query for the given key. fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) { let provider_key = self.kbuckets.local_public_key(); + let certificates = self.trust.get_all_certs(&provider_key, &[]); let info = QueryInfo::AddProvider { context, key: key.clone(), phase: AddProviderPhase::GetClosestPeers, - provider_key + provider_key, + certificates }; let target = kbucket::Key::new(key); - let peers = self.kbuckets.closest_keys(&target); + let peers = Self::closest_keys(&mut self.kbuckets, &target); let inner = QueryInner::new(info); self.queries.add_iter_closest(target.clone(), peers, inner); } @@ -713,7 +743,7 @@ where fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) { let quorum = quorum.eval(self.queries.config().replication_factor); let target = kbucket::Key::new(record.key.clone()); - let peers = self.kbuckets.closest_keys(&target); + let peers = Self::closest_keys(&mut self.kbuckets, &target); let info = QueryInfo::PutRecord { record, quorum, context, phase: PutRecordPhase::GetClosestPeers }; @@ -874,7 +904,7 @@ where peer: target.clone().into_preimage(), remaining: Some(remaining) }; - let peers = self.kbuckets.closest_keys(&target); + let peers = Self::closest_keys(&mut self.kbuckets, &target); let inner = QueryInner::new(info); self.queries.continue_iter_closest(query_id, target.clone(), peers, inner); } @@ -919,6 +949,7 @@ where let provider_id = params.local_peer_id().clone(); let external_addresses = params.external_addresses().collect(); let provider_key = self.kbuckets.local_public_key(); + let certificates = self.trust.get_all_certs(&provider_key, &[]); let inner = QueryInner::new(QueryInfo::AddProvider { context, key, @@ -927,9 +958,22 @@ where external_addresses, get_closest_peers_stats: result.stats }, - provider_key + provider_key, + certificates }); - self.queries.continue_fixed(query_id, result.peers, inner); + let contacts = &result.inner.contacts; + let trust = &self.trust; + let peers = result.peers.into_iter().map(|peer_id| { + let weight = contacts + .get(&peer_id) + .and_then(|c| trust.weight(&c.public_key)) + .unwrap_or_default(); + WeightedPeer { + peer_id: peer_id.into(), + weight, + } + }); + self.queries.continue_fixed(query_id, peers, inner); None } @@ -976,7 +1020,16 @@ where } }; let inner = QueryInner::new(info); - self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner); + let peer_id = cache_key.preimage(); + let trust = &self.trust; + let weight = + result.inner.contacts.get(peer_id) + .and_then(|c| trust.weight(&c.public_key)).unwrap_or_default(); + let peer = WeightedPeer { + weight, + peer_id: cache_key + }; + self.queries.add_fixed(iter::once(peer), inner); } Ok(GetRecordOk { records }) } else if records.is_empty() { @@ -1010,7 +1063,20 @@ where } }; let inner = QueryInner::new(info); - self.queries.continue_fixed(query_id, result.peers, inner); + let contacts = &result.inner.contacts; + let trust = &self.trust; + let peers = result.peers.into_iter().map(|peer_id| { + let weight = + contacts.get(&peer_id).and_then(|c| + trust.weight(&c.public_key) + ).unwrap_or_default(); + + WeightedPeer { + peer_id: peer_id.into(), + weight, + } + }); + self.queries.continue_fixed(query_id, peers, inner); None } @@ -1069,7 +1135,7 @@ where peer: target.clone().into_preimage(), remaining: Some(remaining) }; - let peers = self.kbuckets.closest_keys(&target); + let peers = Self::closest_keys(&mut self.kbuckets, &target); let inner = QueryInner::new(info); self.queries.continue_iter_closest(query_id, target.clone(), peers, inner); } @@ -1282,8 +1348,28 @@ where }) } + fn closest_keys<'a, T>(table: &'a mut KBucketsTable, Contact>, target: &'a T) + -> impl Iterator + 'a + where + T: Clone + AsRef, + { + table.closest(target).map(|e| WeightedPeer { + peer_id: e.node.key, + // TODO: is node weight up to date? + weight: e.node.weight + }) + } + /// Processes a provider record received from a peer. fn provider_received(&mut self, key: record::Key, provider: KadPeer) { + // Add certificates to trust graph + let cur_time = trust_graph::current_time(); + for cert in provider.certificates.iter() { + self.trust.add(cert, cur_time).unwrap_or_else(|err| { + log::warn!("unable to add certificate for peer {}: {}", provider.node_id, err); + }); + } + self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::Discovered { peer_id: provider.node_id.clone(), @@ -2127,7 +2213,8 @@ impl From, Contact>> for KadPeer connection_ty: match e.status { NodeStatus::Connected => KadConnectionType::Connected, NodeStatus::Disconnected => KadConnectionType::NotConnected - } + }, + certificates: vec![] } } } @@ -2142,7 +2229,8 @@ impl From, Contact>> for KadPeer { connection_ty: match e.status { NodeStatus::Connected => KadConnectionType::Connected, NodeStatus::Disconnected => KadConnectionType::NotConnected - } + }, + certificates: vec![] } } } @@ -2223,7 +2311,10 @@ pub enum QueryInfo { phase: AddProviderPhase, /// The execution context of the query. context: AddProviderContext, - provider_key: PublicKey + /// Public key of the provider + provider_key: PublicKey, + /// Certificates known for the provider + certificates: Vec, }, /// A (repeated) query initiated by [`Kademlia::put_record`]. @@ -2270,7 +2361,7 @@ impl QueryInfo { key: key.clone(), user_data: query_id, }, - QueryInfo::AddProvider { key, phase, provider_key, .. } => match phase { + QueryInfo::AddProvider { key, phase, provider_key, certificates, .. } => match phase { AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq { key: key.to_vec(), user_data: query_id, @@ -2281,12 +2372,13 @@ impl QueryInfo { .. } => { KademliaHandlerIn::AddProvider { - key: key.clone(), - provider: crate::protocol::KadPeer { - public_key: provider_key.clone(), + key: key.clone(), + provider: crate::protocol::KadPeer { + public_key: provider_key.clone(), node_id: provider_id.clone(), multiaddrs: external_addresses.clone(), connection_ty: crate::protocol::KadConnectionType::Connected, + certificates: certificates.clone(), } } } diff --git a/protocols/kad/src/dht.proto b/protocols/kad/src/dht.proto index 63950f1f..4be383ca 100644 --- a/protocols/kad/src/dht.proto +++ b/protocols/kad/src/dht.proto @@ -28,6 +28,21 @@ message Record { uint32 ttl = 777; }; +message Certificate { + repeated Trust chain = 1; +} + +message Trust { + // For whom this certificate is issued + bytes issuedFor = 1; + // Expiration date of a trust + uint64 expires_at_secs = 2; + // Signature of a previous trust in a chain + bytes signature = 3; + // Signature is self-signed if it is a root trust + uint64 issued_at_secs = 4; +} + message Message { enum MessageType { PUT_VALUE = 0; @@ -63,7 +78,11 @@ message Message { // used to signal the sender's connection capabilities to the peer ConnectionType connection = 3; + // public key of the peer bytes publicKey = 4; + + // known certificates of the peer + repeated Certificate certificates = 5; } // defines what type of message it is. diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 05774252..d7facddc 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -36,7 +36,7 @@ use libp2p_core::{ either::EitherOutput, upgrade::{self, InboundUpgrade, OutboundUpgrade} }; -use log::warn; +use log::trace; use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; @@ -832,11 +832,11 @@ fn advance_substream( false, ), Poll::Ready(None) => { - warn!("Inbound substream id {:?}: EOF", id); + trace!("Inbound substream id {:?}: EOF", id); (None, None, false) } Poll::Ready(Some(Err(e))) => { - warn!("Inbound substream error id {:?}: {:?}", id, e); + trace!("Inbound substream error id {:?}: {:?}", id, e); (None, None, false) }, }, diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index bbc0ccdc..0923eb2a 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -72,17 +72,19 @@ mod key; mod sub_bucket; mod swamp; mod weighted; +mod weighted_iter; pub use entry::*; pub use sub_bucket::*; +use crate::kbucket::weighted_iter::WeightedIter; use bucket::KBucket; +use libp2p_core::identity::ed25519; use libp2p_core::identity::ed25519::{Keypair, PublicKey}; -use std::collections::{VecDeque}; +use log::debug; +use std::collections::VecDeque; use std::fmt::Debug; use std::time::Duration; -use libp2p_core::identity::ed25519; -use log::debug; /// Maximum number of k-buckets. const NUM_BUCKETS: usize = 256; @@ -94,7 +96,7 @@ pub struct KBucketsTable { /// The key identifying the local peer that owns the routing table. local_key: TKey, /// The buckets comprising the routing table. - buckets: Vec>, + pub(super) buckets: Vec>, /// The list of evicted entries that have been replaced with pending /// entries since the last call to [`KBucketsTable::take_applied_pending`]. applied_pending: VecDeque>, @@ -114,6 +116,9 @@ impl BucketIndex { /// `local_key` is the `local_key` itself, which does not belong in any /// bucket. fn new(d: &Distance) -> Option { + // local = 101010101010; + // target = 101010101011; + // xor = 000000000001; (NUM_BUCKETS - d.0.leading_zeros() as usize) .checked_sub(1) .map(BucketIndex) @@ -175,7 +180,11 @@ where pub fn entry<'a>(&'a mut self, key: &'a TKey) -> Entry<'a, TKey, TVal> { let index = BucketIndex::new(&self.local_key.as_ref().distance(key)); if let Some(i) = index { - debug!("Node {} belongs to bucket {}", bs58::encode(key.as_ref()).into_string(), i.get()); + debug!( + "Node {} belongs to bucket {}", + bs58::encode(key.as_ref()).into_string(), + i.get() + ); let bucket = &mut self.buckets[i.get()]; self.applied_pending.extend(bucket.apply_pending()); Entry::new(bucket, key) @@ -238,15 +247,7 @@ where T: Clone + AsRef, { let distance = self.local_key.as_ref().distance(target); - ClosestIter { - target, - iter: None, - table: self, - buckets_iter: ClosestBucketsIter::new(distance), - fmap: |b: &KBucket| -> Vec<_> { - b.iter().map(|(n, _)| n.key.clone()).collect() - }, - } + WeightedIter::new(self, distance, target.as_ref()).map(|(n, _)| n.key.clone()) } /// Returns an iterator over the nodes closest to the `target` key, ordered by @@ -260,20 +261,10 @@ where TVal: Clone, { let distance = self.local_key.as_ref().distance(target); - ClosestIter { - target, - iter: None, - table: self, - buckets_iter: ClosestBucketsIter::new(distance), - fmap: |b: &KBucket<_, TVal>| -> Vec<_> { - b.iter() - .map(|(n, status)| EntryView { - node: n.clone(), - status, - }) - .collect() - }, - } + WeightedIter::new(self, distance, target.as_ref()).map(|(n, status)| EntryView { + node: n.clone(), + status, + }) } /// Counts the number of nodes between the local node and the node @@ -331,7 +322,7 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> { /// An iterator over the bucket indices, in the order determined by the `Distance` of /// a target from the `local_key`, such that the entries in the buckets are incrementally /// further away from the target, starting with the bucket covering the target. -struct ClosestBucketsIter { +pub(super) struct ClosestBucketsIter { /// The distance to the `local_key`. distance: Distance, /// The current state of the iterator. @@ -339,11 +330,12 @@ struct ClosestBucketsIter { } /// Operating states of a `ClosestBucketsIter`. +#[derive(Debug)] enum ClosestBucketsIterState { /// The starting state of the iterator yields the first bucket index and /// then transitions to `ZoomIn`. Start(BucketIndex), - /// The iterator "zooms in" to to yield the next bucket cotaining nodes that + /// The iterator "zooms in" to to yield the next bucket containing nodes that /// are incrementally closer to the local node but further from the `target`. /// These buckets are identified by a `1` in the corresponding bit position /// of the distance bit string. When bucket `0` is reached, the iterator @@ -360,17 +352,20 @@ enum ClosestBucketsIterState { } impl ClosestBucketsIter { - fn new(distance: Distance) -> Self { + pub(super) fn new(distance: Distance) -> Self { let state = match BucketIndex::new(&distance) { Some(i) => ClosestBucketsIterState::Start(i), None => ClosestBucketsIterState::Start(BucketIndex(0)), }; + // println!("ClosestBucketsIter new: distance {} {}, state {:?}", Self::u256_binary(&distance.0, 256), distance.0.leading_zeros(), state); Self { distance, state } } - fn next_in(&self, i: BucketIndex) -> Option { - (0..i.get()).rev().find_map(|i| { - if self.distance.0.bit(i) { + fn next_in(&self, idx: BucketIndex) -> Option { + (0..idx.get()).rev().find_map(|i| { + let bit = self.distance.0.bit(i); + if bit { + // println!("next_in {} [{}th = {}] bucket_idx: {:?}", Self::u256_binary(&self.distance.0, i), i, (bit as usize), idx); Some(BucketIndex(i)) } else { None @@ -378,15 +373,35 @@ impl ClosestBucketsIter { }) } - fn next_out(&self, i: BucketIndex) -> Option { - (i.get() + 1..NUM_BUCKETS).find_map(|i| { - if !self.distance.0.bit(i) { + fn next_out(&self, idx: BucketIndex) -> Option { + (idx.get() + 1..NUM_BUCKETS).find_map(|i| { + let bit = self.distance.0.bit(i); + if !bit { + // println!("next_out {} [{}th = !{}] bucket_idx: {:?}", Self::u256_binary(&self.distance.0, i), i, (bit as usize), idx); Some(BucketIndex(i)) } else { None } }) } + + fn u256_binary(u: &U256, highlight: usize) -> String { + let mut arr: [u8; 256] = [0; 256]; + for i in 0..256 { + arr[i] = u.bit(i) as u8; + } + + arr.iter() + .enumerate() + .map(|(i, u)| { + if i == highlight { + format!("-[{}]-", u) + } else { + u.to_string() + } + }) + .collect() + } } impl Iterator for ClosestBucketsIter { @@ -395,57 +410,29 @@ impl Iterator for ClosestBucketsIter { fn next(&mut self) -> Option { match self.state { ClosestBucketsIterState::Start(i) => { - debug!( - "ClosestBucketsIter: distance = {}; Start({}) -> ZoomIn({})", - BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, i.0, i.0 - ); self.state = ClosestBucketsIterState::ZoomIn(i); Some(i) } ClosestBucketsIterState::ZoomIn(i) => { - let old_i = i.0; if let Some(i) = self.next_in(i) { - debug!( - "ClosestBucketsIter: distance = {}; ZoomIn({}) -> ZoomIn({})", - BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, old_i, i.0 - ); self.state = ClosestBucketsIterState::ZoomIn(i); Some(i) } else { - debug!( - "ClosestBucketsIter: distance = {}; ZoomIn({}) -> ZoomOut(0)", - BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, i.0 - ); let i = BucketIndex(0); self.state = ClosestBucketsIterState::ZoomOut(i); Some(i) } } ClosestBucketsIterState::ZoomOut(i) => { - let old_i = i.0; if let Some(i) = self.next_out(i) { - debug!( - "ClosestBucketsIter: distance = {}; ZoomOut({}) -> ZoomOut({})", - BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, old_i, i.0 - ); self.state = ClosestBucketsIterState::ZoomOut(i); Some(i) } else { - debug!( - "ClosestBucketsIter: distance = {}; ZoomOut({}) -> Done", - BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, i.0 - ); self.state = ClosestBucketsIterState::Done; None } } - ClosestBucketsIterState::Done => { - debug!( - "ClosestBucketsIter: distance = {}; Done", - BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0 - ); - None - }, + ClosestBucketsIterState::Done => None, } } } @@ -470,8 +457,8 @@ where bs58::encode(&self.target.as_ref()).into_string(), bs58::encode(k.as_ref()).into_string() ); - return Some(k) - }, + return Some(k); + } None => self.iter = None, }, None => { @@ -482,13 +469,14 @@ where v.sort_by(|a, b| { Ord::cmp( &self.target.as_ref().distance(a.as_ref()), - &self.target.as_ref().distance(b.as_ref()) + &self.target.as_ref().distance(b.as_ref()), ) }); debug!( "ClosestIter: target = {}; next bucket {} with {} nodes", bs58::encode(&self.target.as_ref()).into_string(), - i.0, v.len() + i.0, + v.len() ); self.iter = Some(v.into_iter()); } else { @@ -544,10 +532,10 @@ where #[cfg(test)] mod tests { use super::*; + use libp2p_core::identity; use libp2p_core::PeerId; use quickcheck::*; use rand::Rng; - use libp2p_core::identity; use std::time::Instant; type TestTable = KBucketsTable; @@ -565,14 +553,18 @@ mod tests { let ix = BucketIndex(i); let num = g.gen_range(0, usize::min(K_VALUE.get(), num_total) + 1); num_total -= num; - for _ in 0 .. num { + for _ in 0..num { let distance = ix.rand_distance(g); let key = local_key.for_distance(distance); - let node = Node { key: key.clone(), value: (), weight: 0 }; // TODO: arbitrary weight + let node = Node { + key: key.clone(), + value: (), + weight: 0, + }; // TODO: arbitrary weight let status = NodeStatus::arbitrary(g); match b.insert(node, status) { InsertResult::Inserted => {} - _ => panic!() + _ => panic!(), } } } @@ -606,7 +598,7 @@ mod tests { if let Entry::Absent(entry) = table.entry(&other_id) { match entry.insert((), NodeStatus::Connected, other_weight) { InsertResult::Inserted => (), - _ => panic!() + _ => panic!(), } } else { panic!() @@ -622,7 +614,8 @@ mod tests { let keypair = ed25519::Keypair::generate(); let public_key = identity::PublicKey::Ed25519(keypair.public()); let local_key = Key::from(PeerId::from(public_key)); - let mut table = KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_secs(5)); + let mut table = + KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_secs(5)); match table.entry(&local_key) { Entry::SelfEntry => (), _ => panic!(), @@ -637,10 +630,13 @@ mod tests { let mut table = KBucketsTable::<_, ()>::new(keypair, local_key, Duration::from_secs(5)); let mut count = 0; loop { - if count == 100 { break; } + if count == 100 { + break; + } let key = Key::from(PeerId::random()); if let Entry::Absent(e) = table.entry(&key) { - match e.insert((), NodeStatus::Connected, 0) { // TODO: random weight + match e.insert((), NodeStatus::Connected, 0) { + // TODO: random weight InsertResult::Inserted => count += 1, _ => continue, } @@ -649,12 +645,13 @@ mod tests { } } - let mut expected_keys: Vec<_> = table.buckets + let mut expected_keys: Vec<_> = table + .buckets .iter() - .flat_map(|t| t.iter().map(|(n,_)| n.key.clone())) + .flat_map(|t| t.iter().map(|(n, _)| n.key.clone())) .collect(); - for _ in 0 .. 10 { + for _ in 0..10 { let target_key = Key::from(PeerId::random()); let keys = table.closest_keys(&target_key).collect::>(); // The list of keys is expected to match the result of a full-table scan. @@ -668,33 +665,48 @@ mod tests { let keypair = ed25519::Keypair::generate(); let public_key = identity::PublicKey::Ed25519(keypair.public()); let local_key = Key::from(PeerId::from(public_key)); - let mut table = KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_millis(1)); + let mut table = + KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_millis(1)); let expected_applied; let full_bucket_index; loop { let key = Key::from(PeerId::random()); // generate random peer_id - if let Entry::Absent(e) = table.entry(&key) { // check it's not yet in any bucket + if let Entry::Absent(e) = table.entry(&key) { + // check it's not yet in any bucket // TODO: random weight - match e.insert((), NodeStatus::Disconnected, 0) { // insert it into some bucket (node Disconnected status) - InsertResult::Full => { // keep inserting until some random bucket is full (see continue below) - if let Entry::Absent(e) = table.entry(&key) { // insertion didn't succeeded => no such key in a table + match e.insert((), NodeStatus::Disconnected, 0) { + // insert it into some bucket (node Disconnected status) + InsertResult::Full => { + // keep inserting until some random bucket is full (see continue below) + if let Entry::Absent(e) = table.entry(&key) { + // insertion didn't succeeded => no such key in a table // TODO: random weight - match e.insert((), NodeStatus::Connected, 0) { // insert it but now with Connected status - InsertResult::Pending { disconnected } => { // insertion of a connected node into full bucket should produce Pending + match e.insert((), NodeStatus::Connected, 0) { + // insert it but now with Connected status + InsertResult::Pending { disconnected } => { + // insertion of a connected node into full bucket should produce Pending expected_applied = AppliedPending { - inserted: Node { key: key.clone(), value: (), weight: 0 }, // TODO: random weight - evicted: Some(Node { key: disconnected, value: (), weight: 0 }) // TODO: random weight + inserted: Node { + key: key.clone(), + value: (), + weight: 0, + }, // TODO: random weight + evicted: Some(Node { + key: disconnected, + value: (), + weight: 0, + }), // TODO: random weight }; full_bucket_index = BucketIndex::new(&key.distance(&local_key)); - break - }, - _ => panic!() + break; + } + _ => panic!(), } } else { panic!() } - }, + } _ => continue, } } else { @@ -705,17 +717,20 @@ mod tests { // Expire the timeout for the pending entry on the full bucket.` let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()]; let elapsed = Instant::now() - Duration::from_secs(1); - full_bucket.pending_mut(&expected_applied.inserted.key).unwrap().set_ready_at(elapsed); + full_bucket + .pending_mut(&expected_applied.inserted.key) + .unwrap() + .set_ready_at(elapsed); // Calling table.entry() has a side-effect of applying pending nodes match table.entry(&expected_applied.inserted.key) { Entry::Present(_, NodeStatus::Connected) => {} - x => panic!("Unexpected entry: {:?}", x) + x => panic!("Unexpected entry: {:?}", x), } match table.entry(&expected_applied.evicted.as_ref().unwrap().key) { Entry::Absent(_) => {} - x => panic!("Unexpected entry: {:?}", x) + x => panic!("Unexpected entry: {:?}", x), } assert_eq!(Some(expected_applied), table.take_applied_pending()); @@ -743,6 +758,8 @@ mod tests { }) } - QuickCheck::new().tests(10).quickcheck(prop as fn(_,_) -> _) + QuickCheck::new() + .tests(10) + .quickcheck(prop as fn(_, _) -> _) } } diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs index fd2bf12d..b933684c 100644 --- a/protocols/kad/src/kbucket/bucket.rs +++ b/protocols/kad/src/kbucket/bucket.rs @@ -180,7 +180,17 @@ where /// Returns an iterator over the nodes in the bucket, together with their status. pub fn iter(&self) -> impl Iterator, NodeStatus)> { - Iterator::chain(self.weighted.iter(), self.swamp.iter()) + Iterator::chain(self.weighted(), self.swamp()) + } + + /// Returns an iterator over the weighted nodes in the bucket, together with their status. + pub fn weighted(&self) -> impl Iterator, NodeStatus)> { + self.weighted.iter() + } + + /// Returns an iterator over the swamp nodes in the bucket, together with their status. + pub fn swamp(&self) -> impl Iterator, NodeStatus)> { + self.swamp.iter() } /// Inserts the pending node into the bucket, if its timeout has elapsed, diff --git a/protocols/kad/src/kbucket/weighted_iter.rs b/protocols/kad/src/kbucket/weighted_iter.rs new file mode 100644 index 00000000..d965ac01 --- /dev/null +++ b/protocols/kad/src/kbucket/weighted_iter.rs @@ -0,0 +1,305 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::kbucket::{ + BucketIndex, ClosestBucketsIter, Distance, KBucketsTable, KeyBytes, Node, NodeStatus, +}; + +#[derive(Copy, Clone, Debug)] +struct Progress { + need: usize, + got: usize, +} + +#[derive(Copy, Clone, Debug)] +enum State { + Nowhere, + Weighted(Progress), + Swamp { + /// After taking one element from swamp, go back to weighted, keeping saved progress + saved: Option, + }, + Empty, +} + +pub struct WeightedIter<'a, TKey, TVal> { + target: &'a KeyBytes, + start: BucketIndex, + weighted_buckets: ClosestBucketsIter, + weighted_iter: Option>, + swamp_buckets: ClosestBucketsIter, + swamp_iter: Option>, + table: &'a KBucketsTable, // TODO: make table &mut and call apply_pending? + state: State, +} + +type Entry<'a, TKey, TVal> = (&'a Node, NodeStatus); +type DynIter<'a, TKey, TVal> = Box, NodeStatus)> + 'a>; + +impl<'a, TKey, TVal> WeightedIter<'a, TKey, TVal> +where + TKey: Clone + AsRef, + TVal: Clone, +{ + pub fn new( + table: &'a KBucketsTable, + distance: Distance, + target: &'a KeyBytes, + ) -> impl Iterator> + 'a + where + TKey: Clone + AsRef, + TVal: Clone, + { + let start = BucketIndex::new(&distance).unwrap_or(BucketIndex(0)); + WeightedIter { + target, + start: start.clone(), + weighted_buckets: ClosestBucketsIter::new(distance), + weighted_iter: None, + swamp_buckets: ClosestBucketsIter::new(distance), + swamp_iter: None, + table, + state: State::Nowhere, + } + } + + pub fn num_weighted(&self, idx: BucketIndex) -> usize { + let distance = (self.start.get() as isize - idx.get() as isize).abs(); + // After distance > 2, result will always be equal to 1 + let pow = (distance + 2).min(4); + // 16/2^(distance+2) + 16 / 2usize.pow(pow as u32) // TODO: check overflows? + } + + // Take iterator for next weighted bucket, save it to self.weighted_iter, and return + pub fn next_weighted_bucket(&mut self) -> Option<(BucketIndex, &mut DynIter<'a, TKey, TVal>)> { + let idx = self.weighted_buckets.next()?; + let bucket = self.table.buckets.get(idx.get()); + let v = bucket.map(|b| self.sort_bucket(b.weighted().collect::>())); + self.weighted_iter = v; + // self.weighted_iter = bucket.map(|b| Box::new(b.weighted()) as _); + self.weighted_iter.as_mut().map(|iter| (idx, iter)) + } + + // Take next weighted element, return None when there's no weighted elements + pub fn next_weighted(&mut self) -> Option> { + // Take current weighted_iter or the next one + if let Some(iter) = self.weighted_iter.as_deref_mut() { + return iter.next(); + } + + let iter = self.next_weighted_bucket().map(|(_, i)| i)?; + iter.next() + } + + pub fn next_swamp_bucket(&mut self) -> Option<&'_ mut DynIter<'a, TKey, TVal>> { + let idx = self.swamp_buckets.next()?; + let bucket = self.table.buckets.get(idx.get()); + let v = bucket.map(|b| self.sort_bucket(b.swamp().collect::>())); + self.swamp_iter = v; + self.swamp_iter.as_mut() + } + + pub fn next_swamp(&mut self) -> Option> { + if let Some(iter) = self.swamp_iter.as_deref_mut() { + return iter.next(); + } + + let iter = self.next_swamp_bucket()?; + iter.next() + } + + pub fn sort_bucket(&self, mut bucket: Vec>) -> DynIter<'a, TKey, TVal> { + bucket.sort_by_cached_key(|(e, _)| self.target.distance(e.key.as_ref())); + + Box::new(bucket.into_iter()) + } +} + +impl<'a, TKey, TVal> Iterator for WeightedIter<'a, TKey, TVal> +where + TKey: Clone + AsRef, + TVal: Clone, +{ + type Item = Entry<'a, TKey, TVal>; + + fn next(&mut self) -> Option { + use State::*; + + loop { + let (state, result) = match self.state { + // Not yet started, or just finished a bucket + // Here we decide where to go next + Nowhere => { + // If there is a weighted bucket, try take element from it + if let Some((idx, iter)) = self.next_weighted_bucket() { + if let Some(elem) = iter.next() { + // Found weighted element + let state = Weighted(Progress { + got: 1, + need: self.num_weighted(idx), + }); + (state, Some(elem)) + } else { + // Weighted bucket was empty: go decide again + (Nowhere, None) + } + } else { + // There are no weighted buckets, go to swamp + (Swamp { saved: None }, None) + } + } + // Iterating through a weighted bucket, need more elements + Weighted(Progress { got, need }) if got < need => { + if let Some(elem) = self.next_weighted() { + // Found weighted element, go take more + let state = Weighted(Progress { got: got + 1, need }); + (state, Some(elem)) + } else { + // Current bucket is empty, we're nowhere: need to decide where to go next + (Nowhere, None) + } + } + // Got enough weighted, go to swamp (saving progress, to return back with it) + Weighted(Progress { need, .. }) => { + ( + Swamp { + // Set 'got' to zero, so when we get element from swarm, we start afresh + saved: Some(Progress { need, got: 0 }), + }, + None, + ) + } + // Take one element from swamp, and go to Weighted + Swamp { saved: Some(saved) } => { + if let Some(elem) = self.next_swamp() { + // We always take just a single element from the swamp + // And then go back to weighted + (Weighted(saved), Some(elem)) + } else if self.next_swamp_bucket().is_some() { + // Current bucket was empty, take next one + (Swamp { saved: Some(saved) }, None) + } else { + // No more swamp buckets, go drain weighted + (Weighted(saved), None) + } + } + // Weighted buckets are empty + Swamp { saved } => { + if let Some(elem) = self.next_swamp() { + // Keep draining bucket until it's empty + (Swamp { saved }, Some(elem)) + } else if self.next_swamp_bucket().is_some() { + // Current bucket was empty, go try next one + (Swamp { saved }, None) + } else { + // Routing table is empty + (Empty, None) + } + } + Empty => (Empty, None), + }; + + self.state = state; + + if result.is_some() { + return result; + } + + if let Empty = &self.state { + return None; + } + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use libp2p_core::identity::ed25519; + use libp2p_core::{identity, PeerId}; + + use crate::kbucket::{Entry, InsertResult, Key}; + + use super::*; + + #[test] + /// Insert: + /// 1 weighted far away + /// lots of swamp near + /// + /// Expect: + /// weighted still in the results + fn weighted_first() { + env_logger::builder() + .filter_level(log::LevelFilter::Debug) + .try_init() + .ok(); + + let keypair = ed25519::Keypair::generate(); + let public_key = identity::PublicKey::Ed25519(keypair.public()); + let local_key = Key::from(PeerId::from(public_key)); + + let mut table = + KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_secs(5)); + + let mut insert = |id: Key, weight: u32| { + if let Entry::Absent(e) = table.entry(&id) { + if let InsertResult::Inserted = e.insert((), NodeStatus::Connected, weight) { + return Some(id); + } + } + None + }; + + let target_min_distance = 250; + // Generate weighted target + let target = loop { + let id = Key::from(PeerId::random()); + let distance = local_key.distance(&id); + let idx = BucketIndex::new(&distance).unwrap_or(BucketIndex(0)).get(); + if idx > target_min_distance { + let target = insert(id, 10).expect("inserted"); + break target; + } + }; + + let mut swamp = 100; + let max_distance = 252; + while swamp > 0 { + let id = Key::from(PeerId::random()); + let distance = local_key.distance(&id); + let idx = BucketIndex::new(&distance).unwrap_or(BucketIndex(0)).get(); + if idx < max_distance && insert(id, 0).is_some() { + swamp -= 1; + } + } + + // Start from bucket 0 + let closest = table.closest_keys(&local_key).take(1).collect::>(); + assert!(closest.contains(&target)); + + // Start from target + let closest = table.closest_keys(&target).take(1).collect::>(); + assert!(closest.contains(&target)); + + // Start from random + let random = Key::from(PeerId::random()); + let closest = table.closest_keys(&random).take(1).collect::>(); + assert!(closest.contains(&target)); + } +} diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index a440cf3b..f5f74b77 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -42,6 +42,7 @@ use wasm_timer::Instant; use derivative::Derivative; use libp2p_core::identity::ed25519::PublicKey; +use trust_graph::{Certificate, Trust}; /// The protocol name used for negotiating with multistream-select. pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0"; @@ -95,6 +96,7 @@ pub struct KadPeer { pub multiaddrs: Vec, /// How the sender is connected to that remote. pub connection_ty: KadConnectionType, + pub certificates: Vec, } // Builds a `KadPeer` from a corresponding protobuf message. @@ -123,17 +125,50 @@ impl TryFrom for KadPeer { invalid_data(format!("invalid public key: {}", e).as_str()) )?; + + let mut certificates = Vec::with_capacity(peer.certificates.len()); + for cert in peer.certificates.into_iter() { + let mut chain = Vec::with_capacity(cert.chain.len()); + for trust in cert.chain.into_iter() { + let issued_for = PublicKey::decode(trust.issued_for.as_slice()) + .map_err(|e| + invalid_data(format!("invalid issued_for: {}", e).as_str()) + )?; + let expires_at: Duration = Duration::from_secs(trust.expires_at_secs); + let issued_at: Duration = Duration::from_secs(trust.issued_at_secs); + let signature: Vec = trust.signature; + + let trust = Trust::new(issued_for, expires_at, issued_at, signature); + chain.push(trust); + } + certificates.push(Certificate::new_unverified(chain)); + } + Ok(KadPeer { public_key, node_id, multiaddrs: addrs, - connection_ty + connection_ty, + certificates }) } } impl Into for KadPeer { fn into(self) -> proto::message::Peer { + let certificates = self.certificates.into_iter().map(|cert| + proto::Certificate { + chain: cert.chain.into_iter().map(|trust| { + proto::Trust { + issued_for: trust.issued_for.encode().to_vec(), + expires_at_secs: trust.expires_at.as_secs(), + signature: trust.signature, + issued_at_secs: trust.issued_at.as_secs(), + } + }).collect(), + } + ).collect(); + proto::message::Peer { id: self.node_id.into_bytes(), addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(), @@ -141,7 +176,8 @@ impl Into for KadPeer { let ct: proto::message::ConnectionType = self.connection_ty.into(); ct as i32 }, - public_key: self.public_key.encode().to_vec() + public_key: self.public_key.encode().to_vec(), + certificates } } } diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 706ca622..c334aba1 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -32,6 +32,14 @@ use libp2p_core::PeerId; use std::{time::Duration, num::NonZeroUsize}; use wasm_timer::Instant; +/// Peer along with its weight +pub struct WeightedPeer { + /// Kademlia key & id of the peer + pub peer_id: Key, + /// Weight, calculated locally + pub weight: u32 +} + /// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion. /// /// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter` @@ -89,7 +97,7 @@ impl QueryPool { /// Adds a query to the pool that contacts a fixed set of peers. pub fn add_fixed(&mut self, peers: I, inner: TInner) -> QueryId where - I: IntoIterator + I: IntoIterator { let id = self.next_query_id(); self.continue_fixed(id, peers, inner); @@ -101,20 +109,27 @@ impl QueryPool { /// earlier. pub fn continue_fixed(&mut self, id: QueryId, peers: I, inner: TInner) where - I: IntoIterator + I: IntoIterator { assert!(!self.queries.contains_key(&id)); + // TODO: why not alpha? let parallelism = self.config.replication_factor.get(); - let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism)); - let query = Query::new(id, peer_iter, inner); + + let (swamp, weighted) = peers.into_iter().partition::, _>(|p| p.weight == 0); + let swamp = swamp.into_iter().map(|p| p.peer_id.into_preimage()); + let weighted = weighted.into_iter().map(|p| p.peer_id.into_preimage()); + + let weighted_iter = QueryPeerIter::Fixed(FixedPeersIter::new(weighted, parallelism)); + let swamp_iter = QueryPeerIter::Fixed(FixedPeersIter::new(swamp, parallelism)); + let query = Query::new(id, weighted_iter, swamp_iter, inner); self.queries.insert(id, query); } /// Adds a query to the pool that iterates towards the closest peers to the target. pub fn add_iter_closest(&mut self, target: T, peers: I, inner: TInner) -> QueryId where - T: Into, - I: IntoIterator> + T: Into + Clone, + I: IntoIterator { let id = self.next_query_id(); self.continue_iter_closest(id, target, peers, inner); @@ -124,15 +139,21 @@ impl QueryPool { /// Adds a query to the pool that iterates towards the closest peers to the target. pub fn continue_iter_closest(&mut self, id: QueryId, target: T, peers: I, inner: TInner) where - T: Into, - I: IntoIterator> + T: Into + Clone, + I: IntoIterator { let cfg = ClosestPeersIterConfig { num_results: self.config.replication_factor.get(), .. ClosestPeersIterConfig::default() }; - let peer_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers)); - let query = Query::new(id, peer_iter, inner); + + let (swamp, weighted) = peers.into_iter().partition::, _>(|p| p.weight == 0); + let swamp = swamp.into_iter().map(|p| p.peer_id); + let weighted = weighted.into_iter().map(|p| p.peer_id); + + let weighted_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg.clone(), target.clone(), weighted)); + let swamp_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, swamp)); + let query = Query::new(id, weighted_iter, swamp_iter, inner); self.queries.insert(id, query); } @@ -230,7 +251,9 @@ pub struct Query { /// The unique ID of the query. id: QueryId, /// The peer iterator that drives the query state. - peer_iter: QueryPeerIter, + weighted_iter: QueryPeerIter, + /// The peer iterator that drives the query state. + swamp_iter: QueryPeerIter, /// Execution statistics of the query. stats: QueryStats, /// The opaque inner query state. @@ -245,8 +268,8 @@ enum QueryPeerIter { impl Query { /// Creates a new query without starting it. - fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self { - Query { id, inner, peer_iter, stats: QueryStats::empty() } + fn new(id: QueryId, weighted_iter: QueryPeerIter, swamp_iter: QueryPeerIter, inner: TInner) -> Self { + Query { id, inner, weighted_iter, swamp_iter, stats: QueryStats::empty() } } /// Gets the unique ID of the query. @@ -261,11 +284,17 @@ impl Query { /// Informs the query that the attempt to contact `peer` failed. pub fn on_failure(&mut self, peer: &PeerId) { - let updated = match &mut self.peer_iter { + let updated_swamp = match &mut self.swamp_iter { QueryPeerIter::Closest(iter) => iter.on_failure(peer), - QueryPeerIter::Fixed(iter) => iter.on_failure(peer) + QueryPeerIter::Fixed(iter) => iter.on_failure(peer), }; - if updated { + + let updated_weighted = match &mut self.weighted_iter { + QueryPeerIter::Closest(iter) => iter.on_failure(peer), + QueryPeerIter::Fixed(iter) => iter.on_failure(peer), + }; + + if updated_swamp || updated_weighted { self.stats.failure += 1; } } @@ -275,37 +304,77 @@ impl Query { /// the query, if applicable. pub fn on_success(&mut self, peer: &PeerId, new_peers: I) where - I: IntoIterator + I: IntoIterator { - let updated = match &mut self.peer_iter { - QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers), - QueryPeerIter::Fixed(iter) => iter.on_success(peer) + let (swamp, weighted) = new_peers.into_iter().partition::, _>(|p| p.weight == 0); + + let updated_swamp = match &mut self.swamp_iter { + QueryPeerIter::Closest(iter) => iter.on_success(peer, swamp.into_iter().map(|p| p.peer_id)), + QueryPeerIter::Fixed(iter) => iter.on_success(peer), }; - if updated { + + let updated_weighted = match &mut self.weighted_iter { + QueryPeerIter::Closest(iter) => iter.on_success(peer, weighted.into_iter().map(|p| p.peer_id)), + QueryPeerIter::Fixed(iter) => iter.on_success(peer), + }; + + if updated_swamp || updated_weighted { self.stats.success += 1; } } /// Checks whether the query is currently waiting for a result from `peer`. pub fn is_waiting(&self, peer: &PeerId) -> bool { - match &self.peer_iter { + let weighted_waiting = match &self.weighted_iter { QueryPeerIter::Closest(iter) => iter.is_waiting(peer), QueryPeerIter::Fixed(iter) => iter.is_waiting(peer) - } + }; + + let swamp_waiting = match &self.swamp_iter { + QueryPeerIter::Closest(iter) => iter.is_waiting(peer), + QueryPeerIter::Fixed(iter) => iter.is_waiting(peer) + }; + + debug_assert_ne!(weighted_waiting, swamp_waiting); + + weighted_waiting || swamp_waiting } /// Advances the state of the underlying peer iterator. fn next(&mut self, now: Instant) -> PeersIterState { - let state = match &mut self.peer_iter { + use PeersIterState::*; + + // First query weighted iter + let weighted_state = match &mut self.weighted_iter { QueryPeerIter::Closest(iter) => iter.next(now), QueryPeerIter::Fixed(iter) => iter.next() }; - if let PeersIterState::Waiting(Some(_)) = state { + // If there's a new weighted peer to send rpc to, return it + if let Waiting(Some(_)) = weighted_state { self.stats.requests += 1; + return weighted_state; } - state + // If there was no new weighted peer, check swamp + let swamp_state = match &mut self.swamp_iter { + QueryPeerIter::Closest(iter) => iter.next(now), + QueryPeerIter::Fixed(iter) => iter.next() + }; + + // If there's a new swamp peer to send rpc to, return it + if let Waiting(Some(_)) = swamp_state { + self.stats.requests += 1; + return swamp_state; + } + + // Return remaining state: weighted has higher priority + match (weighted_state, swamp_state) { + // If weighted finished, return swamp state + (Finished, swamp) => swamp, + // Otherwise, return weighted state first + (weighted, _) => weighted + } } /// Finishes the query prematurely. @@ -313,10 +382,15 @@ impl Query { /// A finished query immediately stops yielding new peers to contact and will be /// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`]. pub fn finish(&mut self) { - match &mut self.peer_iter { + match &mut self.weighted_iter { QueryPeerIter::Closest(iter) => iter.finish(), QueryPeerIter::Fixed(iter) => iter.finish() - } + }; + + match &mut self.swamp_iter { + QueryPeerIter::Closest(iter) => iter.finish(), + QueryPeerIter::Fixed(iter) => iter.finish() + }; } /// Checks whether the query has finished. @@ -324,18 +398,32 @@ impl Query { /// A finished query is eventually reported by `QueryPool::next()` and /// removed from the pool. pub fn is_finished(&self) -> bool { - match &self.peer_iter { + let weighted_finished = match &self.weighted_iter { QueryPeerIter::Closest(iter) => iter.is_finished(), QueryPeerIter::Fixed(iter) => iter.is_finished() - } + }; + + let swamp_finished = match &self.swamp_iter { + QueryPeerIter::Closest(iter) => iter.is_finished(), + QueryPeerIter::Fixed(iter) => iter.is_finished() + }; + + weighted_finished && swamp_finished } /// Consumes the query, producing the final `QueryResult`. pub fn into_result(self) -> QueryResult> { - let peers = match self.peer_iter { + let weighted = match self.weighted_iter { QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()), QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()) }; + + let swamp = match self.swamp_iter { + QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()), + QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result()) + }; + + let peers = weighted.chain(swamp); QueryResult { peers, inner: self.inner, stats: self.stats } } } diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index e0cb1912..fa57eeb0 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -117,7 +117,8 @@ impl ClosestPeersIter { let log = vec![(Instant::now(), state.clone())]; (distance, Peer { key, state, log }) }) - .take(K_VALUE.into())); + .take(K_VALUE.into()) + ); // The iterator initially makes progress by iterating towards the target. let state = State::Iterating { no_progress : 0 }; @@ -150,7 +151,7 @@ impl ClosestPeersIter { /// calling this function has no effect and `false` is returned. pub fn on_success(&mut self, peer: &PeerId, closer_peers: I) -> bool where - I: IntoIterator + I: IntoIterator> { if let State::Finished = self.state { return false @@ -161,7 +162,10 @@ impl ClosestPeersIter { // Mark the peer as succeeded. match self.closest_peers.entry(distance) { - Entry::Vacant(..) => return false, + Entry::Vacant(..) => { + log::warn!("peer {} not found in closest_peers, ignoring result", peer); + return false + }, Entry::Occupied(mut e) => match e.get().state { PeerState::Waiting(..) => { debug_assert!(self.num_waiting > 0); @@ -173,7 +177,10 @@ impl ClosestPeersIter { } PeerState::NotContacted | PeerState::Failed - | PeerState::Succeeded => return false + | PeerState::Succeeded => { + log::warn!("peer {} is incorrect state {:?}, ignoring result", peer, e.get().state); + return false + } } } @@ -181,8 +188,7 @@ impl ClosestPeersIter { let mut progress = false; // Incorporate the reported closer peers into the iterator. - for peer in closer_peers { - let key = peer.into(); + for key in closer_peers { let distance = self.target.distance(&key); let peer = Peer { key, state: PeerState::NotContacted, log: vec![(Instant::now(), PeerState::NotContacted)] }; self.closest_peers.entry(distance).or_insert(peer); @@ -470,6 +476,8 @@ enum State { /// results is increased to `num_results` in an attempt to finish the iterator. /// If the iterator can make progress again upon receiving the remaining /// results, it switches back to `Iterating`. Otherwise it will be finished. + /// + /// // TODO: is it used? Stalled, /// The iterator is finished. @@ -652,8 +660,11 @@ mod tests { for (i, k) in expected.iter().enumerate() { if rng.gen_bool(0.75) { let num_closer = rng.gen_range(0, iter.config.num_results + 1); - let closer_peers = random_peers(num_closer, &mut rng); - remaining.extend(closer_peers.iter().cloned().map(Key::from)); + let closer_peers = random_peers(num_closer, &mut rng) + .into_iter() + .map(|p| p.into()) + .collect::>(); + remaining.extend(closer_peers.clone()); iter.on_success(k.preimage(), closer_peers); } else { num_failures += 1; @@ -709,7 +720,10 @@ mod tests { let now = Instant::now(); let mut rng = StdRng::from_seed(seed.0); - let closer = random_peers(1, &mut rng); + let closer = random_peers(1, &mut rng) + .into_iter() + .map(|p| p.into()) + .collect::>(); // A first peer reports a "closer" peer. let peer1 = match iter.next(now) { @@ -731,7 +745,7 @@ mod tests { }; // The "closer" peer must only be in the iterator once. - let n = iter.closest_peers.values().filter(|e| e.key.preimage() == &closer[0]).count(); + let n = iter.closest_peers.values().filter(|e| e.key == closer[0]).count(); assert_eq!(n, 1); true diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index f61572d1..66890a8c 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-std = "1.0" +async-std = "~1.5.0" data-encoding = "2.0" dns-parser = "0.8" either = "1.5.3" diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 0b6a9960..73a13183 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -19,7 +19,7 @@ void = "1.0" wasm-timer = "0.2" [dev-dependencies] -async-std = "1.0" +async-std = "~1.5.0" libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" } libp2p-secio = { version = "0.19.0", path = "../../protocols/secio" } libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" } diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index a24f4990..ced1078f 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -46,7 +46,7 @@ secp256k1 = [] aes-all = ["aesni"] [dev-dependencies] -async-std = "1.0" +async-std = "~1.5.0" criterion = "0.3" libp2p-mplex = { version = "0.19.0", path = "../../muxers/mplex" } libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" } diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 8b6a5af4..f7b096b6 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-std = { version = "1.0", optional = true } +async-std = { version = "~1.5.0", optional = true } futures = "0.3.1" futures-timer = "3.0" get_if_addrs = "0.5.3" diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index 0fcfa6ff..ac298c0d 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))'.dependencies] -async-std = { version = "1.0", optional = true } +async-std = { version = "~1.5.0", optional = true } libp2p-core = { version = "0.19.0", path = "../../core" } log = "0.4.1" futures = "0.3.1"