Weighted routing (#17)

This commit is contained in:
folex 2020-06-05 20:04:53 +03:00 committed by GitHub
parent 0bd3d9b6b6
commit 3e2ed64faa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 855 additions and 273 deletions

View File

@ -11,26 +11,26 @@ jobs:
name: Build and test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: cargo-registry-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: cargo-index-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: cargo-build-target-${{ hashFiles('Cargo.toml') }}
- name: Run tests, with no feature
run: cargo test --workspace --no-default-features
- name: Run tests, with all features
run: cargo test --workspace --all-features
- uses: actions/checkout@v1
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: cargo-registry-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: cargo-index-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: cargo-build-target-${{ hashFiles('Cargo.toml') }}
- name: Run tests, with no feature
run: cargo test --workspace --no-default-features
- name: Run tests, with all features
run: cargo test --workspace --all-features
test-wasm:
name: Build on WASM
@ -40,40 +40,40 @@ jobs:
env:
CC: clang-10
steps:
- uses: actions/checkout@v1
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
target: wasm32-unknown-unknown
override: true
- name: Install a recent version of clang
run: |
wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | apt-key add -
echo "deb http://apt.llvm.org/bionic/ llvm-toolchain-bionic-10 main" >> /etc/apt/sources.list
apt-get update
apt-get install -y clang-10
- name: Install CMake
run: apt-get install -y cmake
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: wasm-cargo-registry-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: wasm-cargo-index-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: wasm-cargo-build-target-${{ hashFiles('Cargo.toml') }}
- name: Build on WASM
# TODO: also run `cargo test`
# TODO: ideally we would build `--workspace`, but not all crates compile for WASM
run: cargo build --target=wasm32-unknown-unknown
- uses: actions/checkout@v1
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
target: wasm32-unknown-unknown
override: true
- name: Install a recent version of clang
run: |
wget -O - https://apt.llvm.org/llvm-snapshot.gpg.key | apt-key add -
echo "deb http://apt.llvm.org/bionic/ llvm-toolchain-bionic-10 main" >> /etc/apt/sources.list
apt-get update
apt-get install -y clang-10
- name: Install CMake
run: apt-get install -y cmake
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: wasm-cargo-registry-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: wasm-cargo-index-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: wasm-cargo-build-target-${{ hashFiles('Cargo.toml') }}
- name: Build on WASM
# TODO: also run `cargo test`
# TODO: ideally we would build `--workspace`, but not all crates compile for WASM
run: cargo build --target=wasm32-unknown-unknown
check-rustdoc-links:
name: Check rustdoc intra-doc links
@ -81,13 +81,13 @@ jobs:
container:
image: rust
steps:
- uses: actions/checkout@v1
- name: Install nightly Rust
# TODO: intra-doc links are available on nightly only
# see https://doc.rust-lang.org/nightly/rustdoc/lints.html#intra_doc_link_resolution_failure
run: rustup default nightly
- name: Check rustdoc links
run: RUSTDOCFLAGS="--deny intra_doc_link_resolution_failure" cargo +nightly doc --verbose --workspace --no-deps --document-private-items
- uses: actions/checkout@v1
- name: Install nightly Rust
# TODO: intra-doc links are available on nightly only
# see https://doc.rust-lang.org/nightly/rustdoc/lints.html#intra_doc_link_resolution_failure
run: rustup default nightly-2020-05-20
- name: Check rustdoc links
run: RUSTDOCFLAGS="--deny intra_doc_link_resolution_failure" cargo doc --verbose --workspace --no-deps --document-private-items
integration-test:
name: Integration tests
@ -95,21 +95,21 @@ jobs:
container:
image: rust
steps:
- uses: actions/checkout@v1
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: cargo-registry-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: cargo-index-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: cargo-build-target-${{ hashFiles('Cargo.toml') }}
- name: Run ipfs-kad example
run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad
- uses: actions/checkout@v1
- name: Cache cargo registry
uses: actions/cache@v1
with:
path: ~/.cargo/registry
key: cargo-registry-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo index
uses: actions/cache@v1
with:
path: ~/.cargo/git
key: cargo-index-${{ hashFiles('Cargo.toml') }}
- name: Cache cargo build
uses: actions/cache@v1
with:
path: target
key: cargo-build-target-${{ hashFiles('Cargo.toml') }}
- name: Run ipfs-kad example
run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad

View File

@ -87,7 +87,7 @@ libp2p-tcp = { version = "0.19.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.19.0", path = "transports/websocket", optional = true }
[dev-dependencies]
async-std = "1.0"
async-std = "~1.5.0"
env_logger = "0.7.1"
[workspace]

View File

@ -39,7 +39,7 @@ zeroize = "1"
ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false }
[dev-dependencies]
async-std = "1.0"
async-std = "~1.5.0"
libp2p-mplex = { version = "0.19.0", path = "../muxers/mplex" }
libp2p-secio = { version = "0.19.0", path = "../protocols/secio" }
libp2p-tcp = { version = "0.19.0", path = "../transports/tcp" }

View File

@ -18,7 +18,7 @@ smallvec = "1.0"
unsigned-varint = "0.3.2"
[dev-dependencies]
async-std = "1.2"
async-std = "~1.5.0"
quickcheck = "0.9.0"
rand = "0.7.2"
rw-stream-sink = "0.2.1"

View File

@ -20,5 +20,5 @@ parking_lot = "0.10"
unsigned-varint = { version = "0.3", features = ["futures-codec"] }
[dev-dependencies]
async-std = "1.0"
async-std = "~1.5.0"
libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" }

View File

@ -15,7 +15,7 @@ libp2p-core = { version = "0.19.0", path = "../../core" }
flate2 = "1.0"
[dev-dependencies]
async-std = "1.0"
async-std = "~1.5.0"
libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" }
rand = "0.7"
quickcheck = "0.9"

View File

@ -28,7 +28,7 @@ smallvec = "1.1.0"
prost = "0.6.1"
[dev-dependencies]
async-std = "1.4.0"
async-std = "~1.5.0"
env_logger = "0.7.1"
libp2p-plaintext = { version = "0.19.0", path = "../plaintext" }
libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" }

View File

@ -19,7 +19,7 @@ smallvec = "1.0"
wasm-timer = "0.2"
[dev-dependencies]
async-std = "1.0"
async-std = "~1.5.0"
libp2p-mplex = { version = "0.19.0", path = "../../muxers/mplex" }
libp2p-secio = { version = "0.19.0", path = "../../protocols/secio" }
libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" }

View File

@ -31,12 +31,13 @@ void = "1.0"
bs58 = "0.3.0"
derivative = "2.0.2"
trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "master" }
trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "weighted_routing" }
[dev-dependencies]
libp2p-secio = { version = "0.19.0", path = "../secio" }
libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" }
quickcheck = "0.9.0"
env_logger = "0.7.1"
[build-dependencies]
prost-build = "0.6"

View File

@ -1,5 +1,4 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
#![allow(clippy::needless_lifetimes)]
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
@ -21,15 +20,17 @@
//! Implementation of the `Kademlia` network behaviour.
#![allow(clippy::needless_lifetimes)]
mod test;
use crate::K_VALUE;
use crate::addresses::{Addresses, Remove};
use crate::handler::{KademliaHandler, KademliaHandlerConfig, KademliaRequestId, KademliaHandlerEvent, KademliaHandlerIn};
use crate::jobs::*;
use crate::kbucket::{self, KBucketsTable, NodeStatus, KBucketRef};
use crate::kbucket::{self, KBucketsTable, NodeStatus, KBucketRef, KeyBytes};
use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState, WeightedPeer};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use crate::contact::Contact;
use fnv::{FnvHashMap, FnvHashSet};
@ -52,7 +53,7 @@ use std::task::{Context, Poll};
use std::vec;
use wasm_timer::Instant;
use libp2p_core::identity::ed25519::{Keypair, PublicKey};
use trust_graph::TrustGraph;
use trust_graph::{TrustGraph, Certificate};
use derivative::Derivative;
pub use crate::query::QueryStats;
@ -412,7 +413,7 @@ where
{
let info = QueryInfo::GetClosestPeers { key: key.borrow().to_vec() };
let target = kbucket::Key::new(key);
let peers = self.kbuckets.closest_keys(&target);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner)
}
@ -436,7 +437,7 @@ where
let done = records.len() >= quorum.get();
let target = kbucket::Key::new(key.clone());
let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
let peers = self.kbuckets.closest_keys(&target);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let inner = QueryInner::new(info);
let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*)
@ -472,7 +473,8 @@ where
self.record_ttl.map(|ttl| Instant::now() + ttl));
let quorum = quorum.eval(self.queries.config().replication_factor);
let target = kbucket::Key::new(record.key.clone());
let peers = self.kbuckets.closest_keys(&target);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let context = PutRecordContext::Publish;
let info = QueryInfo::PutRecord {
context,
@ -531,7 +533,7 @@ where
peer: local_key.preimage().clone(),
remaining: None
};
let peers = self.kbuckets.closest_keys(&local_key).collect::<Vec<_>>();
let peers = Self::closest_keys(&mut self.kbuckets, &local_key).collect::<Vec<_>>();
if peers.is_empty() {
Err(NoKnownPeers())
} else {
@ -574,13 +576,15 @@ where
bs58::encode(target.as_ref()).into_string(), // sha256
);
let provider_key = self.kbuckets.local_public_key();
let peers = self.kbuckets.closest_keys(&target);
let certificates = self.trust.get_all_certs(&provider_key, &[]);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let context = AddProviderContext::Publish;
let info = QueryInfo::AddProvider {
context,
key,
phase: AddProviderPhase::GetClosestPeers,
provider_key
provider_key,
certificates
};
let inner = QueryInner::new(info);
let id = self.queries.add_iter_closest(target.clone(), peers, inner);
@ -617,7 +621,7 @@ where
bs58::encode(target.preimage().as_ref()).into_string(), // peer id
bs58::encode(target.as_ref()).into_string(), // sha256
);
let peers = self.kbuckets.closest_keys(&target);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner)
}
@ -627,6 +631,16 @@ where
where
I: Iterator<Item = &'a KadPeer> + Clone
{
// Add certificates to trust graph
let cur_time = trust_graph::current_time();
for peer in peers.clone() {
for cert in peer.certificates.iter() {
self.trust.add(cert, cur_time).unwrap_or_else(|err| {
log::warn!("Unable to add certificate for peer {}: {}", peer.node_id, err);
})
}
}
let local_id = self.kbuckets.local_key().preimage().clone();
let others_iter = peers.filter(|p| p.node_id != local_id);
@ -641,6 +655,8 @@ where
));
}
let trust = &self.trust;
if let Some(query) = self.queries.get_mut(query_id) {
log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
for peer in others_iter.clone() {
@ -648,7 +664,10 @@ where
peer, source, query_id);
query.inner.contacts.insert(peer.node_id.clone(), peer.clone().into());
}
query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
query.on_success(source, others_iter.map(|kp| WeightedPeer {
peer_id: kp.node_id.clone().into(),
weight: trust.weight(&kp.public_key).unwrap_or_default()
}))
}
}
@ -659,23 +678,26 @@ where
if target == self.kbuckets.local_key() {
Vec::new()
} else {
self.kbuckets
let mut peers: Vec<_> = self.kbuckets
.closest(target)
.filter(|e| e.node.key.preimage() != source)
.take(self.queries.config().replication_factor.get())
.map(KadPeer::from)
.collect()
.collect();
peers.iter_mut().for_each(|mut peer|
peer.certificates = self.trust.get_all_certs(&peer.public_key, &[])
);
peers
}
}
/// Collects all peers who are known to be providers of the value for a given `Multihash`.
fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
let kbuckets = &mut self.kbuckets;
self.store.providers(key)
let mut peers = self.store.providers(key)
.into_iter()
.filter_map(move |p| {
let entry = if &p.provider != source {
let kad_peer = if &p.provider != source {
let key = kbucket::Key::new(p.provider.clone());
kbuckets.entry(&key).view().map(|e| KadPeer::from(e))
} else {
@ -686,25 +708,33 @@ where
bs58::encode(key).into_string(),
p.provider,
source,
entry.is_some()
kad_peer.is_some()
);
entry
kad_peer
})
.take(self.queries.config().replication_factor.get())
.collect()
.collect::<Vec<_>>();
peers.iter_mut().for_each(|peer|
peer.certificates = self.trust.get_all_certs(&peer.public_key, &[])
);
peers
}
/// Starts an iterative `ADD_PROVIDER` query for the given key.
fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
let provider_key = self.kbuckets.local_public_key();
let certificates = self.trust.get_all_certs(&provider_key, &[]);
let info = QueryInfo::AddProvider {
context,
key: key.clone(),
phase: AddProviderPhase::GetClosestPeers,
provider_key
provider_key,
certificates
};
let target = kbucket::Key::new(key);
let peers = self.kbuckets.closest_keys(&target);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let inner = QueryInner::new(info);
self.queries.add_iter_closest(target.clone(), peers, inner);
}
@ -713,7 +743,7 @@ where
fn start_put_record(&mut self, record: Record, quorum: Quorum, context: PutRecordContext) {
let quorum = quorum.eval(self.queries.config().replication_factor);
let target = kbucket::Key::new(record.key.clone());
let peers = self.kbuckets.closest_keys(&target);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let info = QueryInfo::PutRecord {
record, quorum, context, phase: PutRecordPhase::GetClosestPeers
};
@ -874,7 +904,7 @@ where
peer: target.clone().into_preimage(),
remaining: Some(remaining)
};
let peers = self.kbuckets.closest_keys(&target);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let inner = QueryInner::new(info);
self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
}
@ -919,6 +949,7 @@ where
let provider_id = params.local_peer_id().clone();
let external_addresses = params.external_addresses().collect();
let provider_key = self.kbuckets.local_public_key();
let certificates = self.trust.get_all_certs(&provider_key, &[]);
let inner = QueryInner::new(QueryInfo::AddProvider {
context,
key,
@ -927,9 +958,22 @@ where
external_addresses,
get_closest_peers_stats: result.stats
},
provider_key
provider_key,
certificates
});
self.queries.continue_fixed(query_id, result.peers, inner);
let contacts = &result.inner.contacts;
let trust = &self.trust;
let peers = result.peers.into_iter().map(|peer_id| {
let weight = contacts
.get(&peer_id)
.and_then(|c| trust.weight(&c.public_key))
.unwrap_or_default();
WeightedPeer {
peer_id: peer_id.into(),
weight,
}
});
self.queries.continue_fixed(query_id, peers, inner);
None
}
@ -976,7 +1020,16 @@ where
}
};
let inner = QueryInner::new(info);
self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner);
let peer_id = cache_key.preimage();
let trust = &self.trust;
let weight =
result.inner.contacts.get(peer_id)
.and_then(|c| trust.weight(&c.public_key)).unwrap_or_default();
let peer = WeightedPeer {
weight,
peer_id: cache_key
};
self.queries.add_fixed(iter::once(peer), inner);
}
Ok(GetRecordOk { records })
} else if records.is_empty() {
@ -1010,7 +1063,20 @@ where
}
};
let inner = QueryInner::new(info);
self.queries.continue_fixed(query_id, result.peers, inner);
let contacts = &result.inner.contacts;
let trust = &self.trust;
let peers = result.peers.into_iter().map(|peer_id| {
let weight =
contacts.get(&peer_id).and_then(|c|
trust.weight(&c.public_key)
).unwrap_or_default();
WeightedPeer {
peer_id: peer_id.into(),
weight,
}
});
self.queries.continue_fixed(query_id, peers, inner);
None
}
@ -1069,7 +1135,7 @@ where
peer: target.clone().into_preimage(),
remaining: Some(remaining)
};
let peers = self.kbuckets.closest_keys(&target);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let inner = QueryInner::new(info);
self.queries.continue_iter_closest(query_id, target.clone(), peers, inner);
}
@ -1282,8 +1348,28 @@ where
})
}
fn closest_keys<'a, T>(table: &'a mut KBucketsTable<kbucket::Key<PeerId>, Contact>, target: &'a T)
-> impl Iterator<Item = WeightedPeer> + 'a
where
T: Clone + AsRef<KeyBytes>,
{
table.closest(target).map(|e| WeightedPeer {
peer_id: e.node.key,
// TODO: is node weight up to date?
weight: e.node.weight
})
}
/// Processes a provider record received from a peer.
fn provider_received(&mut self, key: record::Key, provider: KadPeer) {
// Add certificates to trust graph
let cur_time = trust_graph::current_time();
for cert in provider.certificates.iter() {
self.trust.add(cert, cur_time).unwrap_or_else(|err| {
log::warn!("unable to add certificate for peer {}: {}", provider.node_id, err);
});
}
self.queued_events.push_back(NetworkBehaviourAction::GenerateEvent(
KademliaEvent::Discovered {
peer_id: provider.node_id.clone(),
@ -2127,7 +2213,8 @@ impl From<kbucket::EntryRefView<'_, kbucket::Key<PeerId>, Contact>> for KadPeer
connection_ty: match e.status {
NodeStatus::Connected => KadConnectionType::Connected,
NodeStatus::Disconnected => KadConnectionType::NotConnected
}
},
certificates: vec![]
}
}
}
@ -2142,7 +2229,8 @@ impl From<kbucket::EntryView<kbucket::Key<PeerId>, Contact>> for KadPeer {
connection_ty: match e.status {
NodeStatus::Connected => KadConnectionType::Connected,
NodeStatus::Disconnected => KadConnectionType::NotConnected
}
},
certificates: vec![]
}
}
}
@ -2223,7 +2311,10 @@ pub enum QueryInfo {
phase: AddProviderPhase,
/// The execution context of the query.
context: AddProviderContext,
provider_key: PublicKey
/// Public key of the provider
provider_key: PublicKey,
/// Certificates known for the provider
certificates: Vec<Certificate>,
},
/// A (repeated) query initiated by [`Kademlia::put_record`].
@ -2270,7 +2361,7 @@ impl QueryInfo {
key: key.clone(),
user_data: query_id,
},
QueryInfo::AddProvider { key, phase, provider_key, .. } => match phase {
QueryInfo::AddProvider { key, phase, provider_key, certificates, .. } => match phase {
AddProviderPhase::GetClosestPeers => KademliaHandlerIn::FindNodeReq {
key: key.to_vec(),
user_data: query_id,
@ -2281,12 +2372,13 @@ impl QueryInfo {
..
} => {
KademliaHandlerIn::AddProvider {
key: key.clone(),
provider: crate::protocol::KadPeer {
public_key: provider_key.clone(),
key: key.clone(),
provider: crate::protocol::KadPeer {
public_key: provider_key.clone(),
node_id: provider_id.clone(),
multiaddrs: external_addresses.clone(),
connection_ty: crate::protocol::KadConnectionType::Connected,
certificates: certificates.clone(),
}
}
}

View File

@ -28,6 +28,21 @@ message Record {
uint32 ttl = 777;
};
message Certificate {
repeated Trust chain = 1;
}
message Trust {
// For whom this certificate is issued
bytes issuedFor = 1;
// Expiration date of a trust
uint64 expires_at_secs = 2;
// Signature of a previous trust in a chain
bytes signature = 3;
// Signature is self-signed if it is a root trust
uint64 issued_at_secs = 4;
}
message Message {
enum MessageType {
PUT_VALUE = 0;
@ -63,7 +78,11 @@ message Message {
// used to signal the sender's connection capabilities to the peer
ConnectionType connection = 3;
// public key of the peer
bytes publicKey = 4;
// known certificates of the peer
repeated Certificate certificates = 5;
}
// defines what type of message it is.

View File

@ -36,7 +36,7 @@ use libp2p_core::{
either::EitherOutput,
upgrade::{self, InboundUpgrade, OutboundUpgrade}
};
use log::warn;
use log::trace;
use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant;
@ -832,11 +832,11 @@ fn advance_substream<TUserData>(
false,
),
Poll::Ready(None) => {
warn!("Inbound substream id {:?}: EOF", id);
trace!("Inbound substream id {:?}: EOF", id);
(None, None, false)
}
Poll::Ready(Some(Err(e))) => {
warn!("Inbound substream error id {:?}: {:?}", id, e);
trace!("Inbound substream error id {:?}: {:?}", id, e);
(None, None, false)
},
},

View File

@ -72,17 +72,19 @@ mod key;
mod sub_bucket;
mod swamp;
mod weighted;
mod weighted_iter;
pub use entry::*;
pub use sub_bucket::*;
use crate::kbucket::weighted_iter::WeightedIter;
use bucket::KBucket;
use libp2p_core::identity::ed25519;
use libp2p_core::identity::ed25519::{Keypair, PublicKey};
use std::collections::{VecDeque};
use log::debug;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::time::Duration;
use libp2p_core::identity::ed25519;
use log::debug;
/// Maximum number of k-buckets.
const NUM_BUCKETS: usize = 256;
@ -94,7 +96,7 @@ pub struct KBucketsTable<TKey, TVal> {
/// The key identifying the local peer that owns the routing table.
local_key: TKey,
/// The buckets comprising the routing table.
buckets: Vec<KBucket<TKey, TVal>>,
pub(super) buckets: Vec<KBucket<TKey, TVal>>,
/// The list of evicted entries that have been replaced with pending
/// entries since the last call to [`KBucketsTable::take_applied_pending`].
applied_pending: VecDeque<AppliedPending<TKey, TVal>>,
@ -114,6 +116,9 @@ impl BucketIndex {
/// `local_key` is the `local_key` itself, which does not belong in any
/// bucket.
fn new(d: &Distance) -> Option<BucketIndex> {
// local = 101010101010;
// target = 101010101011;
// xor = 000000000001;
(NUM_BUCKETS - d.0.leading_zeros() as usize)
.checked_sub(1)
.map(BucketIndex)
@ -175,7 +180,11 @@ where
pub fn entry<'a>(&'a mut self, key: &'a TKey) -> Entry<'a, TKey, TVal> {
let index = BucketIndex::new(&self.local_key.as_ref().distance(key));
if let Some(i) = index {
debug!("Node {} belongs to bucket {}", bs58::encode(key.as_ref()).into_string(), i.get());
debug!(
"Node {} belongs to bucket {}",
bs58::encode(key.as_ref()).into_string(),
i.get()
);
let bucket = &mut self.buckets[i.get()];
self.applied_pending.extend(bucket.apply_pending());
Entry::new(bucket, key)
@ -238,15 +247,7 @@ where
T: Clone + AsRef<KeyBytes>,
{
let distance = self.local_key.as_ref().distance(target);
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: |b: &KBucket<TKey, _>| -> Vec<_> {
b.iter().map(|(n, _)| n.key.clone()).collect()
},
}
WeightedIter::new(self, distance, target.as_ref()).map(|(n, _)| n.key.clone())
}
/// Returns an iterator over the nodes closest to the `target` key, ordered by
@ -260,20 +261,10 @@ where
TVal: Clone,
{
let distance = self.local_key.as_ref().distance(target);
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: |b: &KBucket<_, TVal>| -> Vec<_> {
b.iter()
.map(|(n, status)| EntryView {
node: n.clone(),
status,
})
.collect()
},
}
WeightedIter::new(self, distance, target.as_ref()).map(|(n, status)| EntryView {
node: n.clone(),
status,
})
}
/// Counts the number of nodes between the local node and the node
@ -331,7 +322,7 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> {
/// An iterator over the bucket indices, in the order determined by the `Distance` of
/// a target from the `local_key`, such that the entries in the buckets are incrementally
/// further away from the target, starting with the bucket covering the target.
struct ClosestBucketsIter {
pub(super) struct ClosestBucketsIter {
/// The distance to the `local_key`.
distance: Distance,
/// The current state of the iterator.
@ -339,11 +330,12 @@ struct ClosestBucketsIter {
}
/// Operating states of a `ClosestBucketsIter`.
#[derive(Debug)]
enum ClosestBucketsIterState {
/// The starting state of the iterator yields the first bucket index and
/// then transitions to `ZoomIn`.
Start(BucketIndex),
/// The iterator "zooms in" to to yield the next bucket cotaining nodes that
/// The iterator "zooms in" to to yield the next bucket containing nodes that
/// are incrementally closer to the local node but further from the `target`.
/// These buckets are identified by a `1` in the corresponding bit position
/// of the distance bit string. When bucket `0` is reached, the iterator
@ -360,17 +352,20 @@ enum ClosestBucketsIterState {
}
impl ClosestBucketsIter {
fn new(distance: Distance) -> Self {
pub(super) fn new(distance: Distance) -> Self {
let state = match BucketIndex::new(&distance) {
Some(i) => ClosestBucketsIterState::Start(i),
None => ClosestBucketsIterState::Start(BucketIndex(0)),
};
// println!("ClosestBucketsIter new: distance {} {}, state {:?}", Self::u256_binary(&distance.0, 256), distance.0.leading_zeros(), state);
Self { distance, state }
}
fn next_in(&self, i: BucketIndex) -> Option<BucketIndex> {
(0..i.get()).rev().find_map(|i| {
if self.distance.0.bit(i) {
fn next_in(&self, idx: BucketIndex) -> Option<BucketIndex> {
(0..idx.get()).rev().find_map(|i| {
let bit = self.distance.0.bit(i);
if bit {
// println!("next_in {} [{}th = {}] bucket_idx: {:?}", Self::u256_binary(&self.distance.0, i), i, (bit as usize), idx);
Some(BucketIndex(i))
} else {
None
@ -378,15 +373,35 @@ impl ClosestBucketsIter {
})
}
fn next_out(&self, i: BucketIndex) -> Option<BucketIndex> {
(i.get() + 1..NUM_BUCKETS).find_map(|i| {
if !self.distance.0.bit(i) {
fn next_out(&self, idx: BucketIndex) -> Option<BucketIndex> {
(idx.get() + 1..NUM_BUCKETS).find_map(|i| {
let bit = self.distance.0.bit(i);
if !bit {
// println!("next_out {} [{}th = !{}] bucket_idx: {:?}", Self::u256_binary(&self.distance.0, i), i, (bit as usize), idx);
Some(BucketIndex(i))
} else {
None
}
})
}
fn u256_binary(u: &U256, highlight: usize) -> String {
let mut arr: [u8; 256] = [0; 256];
for i in 0..256 {
arr[i] = u.bit(i) as u8;
}
arr.iter()
.enumerate()
.map(|(i, u)| {
if i == highlight {
format!("-[{}]-", u)
} else {
u.to_string()
}
})
.collect()
}
}
impl Iterator for ClosestBucketsIter {
@ -395,57 +410,29 @@ impl Iterator for ClosestBucketsIter {
fn next(&mut self) -> Option<Self::Item> {
match self.state {
ClosestBucketsIterState::Start(i) => {
debug!(
"ClosestBucketsIter: distance = {}; Start({}) -> ZoomIn({})",
BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, i.0, i.0
);
self.state = ClosestBucketsIterState::ZoomIn(i);
Some(i)
}
ClosestBucketsIterState::ZoomIn(i) => {
let old_i = i.0;
if let Some(i) = self.next_in(i) {
debug!(
"ClosestBucketsIter: distance = {}; ZoomIn({}) -> ZoomIn({})",
BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, old_i, i.0
);
self.state = ClosestBucketsIterState::ZoomIn(i);
Some(i)
} else {
debug!(
"ClosestBucketsIter: distance = {}; ZoomIn({}) -> ZoomOut(0)",
BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, i.0
);
let i = BucketIndex(0);
self.state = ClosestBucketsIterState::ZoomOut(i);
Some(i)
}
}
ClosestBucketsIterState::ZoomOut(i) => {
let old_i = i.0;
if let Some(i) = self.next_out(i) {
debug!(
"ClosestBucketsIter: distance = {}; ZoomOut({}) -> ZoomOut({})",
BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, old_i, i.0
);
self.state = ClosestBucketsIterState::ZoomOut(i);
Some(i)
} else {
debug!(
"ClosestBucketsIter: distance = {}; ZoomOut({}) -> Done",
BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0, i.0
);
self.state = ClosestBucketsIterState::Done;
None
}
}
ClosestBucketsIterState::Done => {
debug!(
"ClosestBucketsIter: distance = {}; Done",
BucketIndex::new(&self.distance).unwrap_or(BucketIndex(0)).0
);
None
},
ClosestBucketsIterState::Done => None,
}
}
}
@ -470,8 +457,8 @@ where
bs58::encode(&self.target.as_ref()).into_string(),
bs58::encode(k.as_ref()).into_string()
);
return Some(k)
},
return Some(k);
}
None => self.iter = None,
},
None => {
@ -482,13 +469,14 @@ where
v.sort_by(|a, b| {
Ord::cmp(
&self.target.as_ref().distance(a.as_ref()),
&self.target.as_ref().distance(b.as_ref())
&self.target.as_ref().distance(b.as_ref()),
)
});
debug!(
"ClosestIter: target = {}; next bucket {} with {} nodes",
bs58::encode(&self.target.as_ref()).into_string(),
i.0, v.len()
i.0,
v.len()
);
self.iter = Some(v.into_iter());
} else {
@ -544,10 +532,10 @@ where
#[cfg(test)]
mod tests {
use super::*;
use libp2p_core::identity;
use libp2p_core::PeerId;
use quickcheck::*;
use rand::Rng;
use libp2p_core::identity;
use std::time::Instant;
type TestTable = KBucketsTable<KeyBytes, ()>;
@ -565,14 +553,18 @@ mod tests {
let ix = BucketIndex(i);
let num = g.gen_range(0, usize::min(K_VALUE.get(), num_total) + 1);
num_total -= num;
for _ in 0 .. num {
for _ in 0..num {
let distance = ix.rand_distance(g);
let key = local_key.for_distance(distance);
let node = Node { key: key.clone(), value: (), weight: 0 }; // TODO: arbitrary weight
let node = Node {
key: key.clone(),
value: (),
weight: 0,
}; // TODO: arbitrary weight
let status = NodeStatus::arbitrary(g);
match b.insert(node, status) {
InsertResult::Inserted => {}
_ => panic!()
_ => panic!(),
}
}
}
@ -606,7 +598,7 @@ mod tests {
if let Entry::Absent(entry) = table.entry(&other_id) {
match entry.insert((), NodeStatus::Connected, other_weight) {
InsertResult::Inserted => (),
_ => panic!()
_ => panic!(),
}
} else {
panic!()
@ -622,7 +614,8 @@ mod tests {
let keypair = ed25519::Keypair::generate();
let public_key = identity::PublicKey::Ed25519(keypair.public());
let local_key = Key::from(PeerId::from(public_key));
let mut table = KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_secs(5));
let mut table =
KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_secs(5));
match table.entry(&local_key) {
Entry::SelfEntry => (),
_ => panic!(),
@ -637,10 +630,13 @@ mod tests {
let mut table = KBucketsTable::<_, ()>::new(keypair, local_key, Duration::from_secs(5));
let mut count = 0;
loop {
if count == 100 { break; }
if count == 100 {
break;
}
let key = Key::from(PeerId::random());
if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), NodeStatus::Connected, 0) { // TODO: random weight
match e.insert((), NodeStatus::Connected, 0) {
// TODO: random weight
InsertResult::Inserted => count += 1,
_ => continue,
}
@ -649,12 +645,13 @@ mod tests {
}
}
let mut expected_keys: Vec<_> = table.buckets
let mut expected_keys: Vec<_> = table
.buckets
.iter()
.flat_map(|t| t.iter().map(|(n,_)| n.key.clone()))
.flat_map(|t| t.iter().map(|(n, _)| n.key.clone()))
.collect();
for _ in 0 .. 10 {
for _ in 0..10 {
let target_key = Key::from(PeerId::random());
let keys = table.closest_keys(&target_key).collect::<Vec<_>>();
// The list of keys is expected to match the result of a full-table scan.
@ -668,33 +665,48 @@ mod tests {
let keypair = ed25519::Keypair::generate();
let public_key = identity::PublicKey::Ed25519(keypair.public());
let local_key = Key::from(PeerId::from(public_key));
let mut table = KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_millis(1));
let mut table =
KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_millis(1));
let expected_applied;
let full_bucket_index;
loop {
let key = Key::from(PeerId::random()); // generate random peer_id
if let Entry::Absent(e) = table.entry(&key) { // check it's not yet in any bucket
if let Entry::Absent(e) = table.entry(&key) {
// check it's not yet in any bucket
// TODO: random weight
match e.insert((), NodeStatus::Disconnected, 0) { // insert it into some bucket (node Disconnected status)
InsertResult::Full => { // keep inserting until some random bucket is full (see continue below)
if let Entry::Absent(e) = table.entry(&key) { // insertion didn't succeeded => no such key in a table
match e.insert((), NodeStatus::Disconnected, 0) {
// insert it into some bucket (node Disconnected status)
InsertResult::Full => {
// keep inserting until some random bucket is full (see continue below)
if let Entry::Absent(e) = table.entry(&key) {
// insertion didn't succeeded => no such key in a table
// TODO: random weight
match e.insert((), NodeStatus::Connected, 0) { // insert it but now with Connected status
InsertResult::Pending { disconnected } => { // insertion of a connected node into full bucket should produce Pending
match e.insert((), NodeStatus::Connected, 0) {
// insert it but now with Connected status
InsertResult::Pending { disconnected } => {
// insertion of a connected node into full bucket should produce Pending
expected_applied = AppliedPending {
inserted: Node { key: key.clone(), value: (), weight: 0 }, // TODO: random weight
evicted: Some(Node { key: disconnected, value: (), weight: 0 }) // TODO: random weight
inserted: Node {
key: key.clone(),
value: (),
weight: 0,
}, // TODO: random weight
evicted: Some(Node {
key: disconnected,
value: (),
weight: 0,
}), // TODO: random weight
};
full_bucket_index = BucketIndex::new(&key.distance(&local_key));
break
},
_ => panic!()
break;
}
_ => panic!(),
}
} else {
panic!()
}
},
}
_ => continue,
}
} else {
@ -705,17 +717,20 @@ mod tests {
// Expire the timeout for the pending entry on the full bucket.`
let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()];
let elapsed = Instant::now() - Duration::from_secs(1);
full_bucket.pending_mut(&expected_applied.inserted.key).unwrap().set_ready_at(elapsed);
full_bucket
.pending_mut(&expected_applied.inserted.key)
.unwrap()
.set_ready_at(elapsed);
// Calling table.entry() has a side-effect of applying pending nodes
match table.entry(&expected_applied.inserted.key) {
Entry::Present(_, NodeStatus::Connected) => {}
x => panic!("Unexpected entry: {:?}", x)
x => panic!("Unexpected entry: {:?}", x),
}
match table.entry(&expected_applied.evicted.as_ref().unwrap().key) {
Entry::Absent(_) => {}
x => panic!("Unexpected entry: {:?}", x)
x => panic!("Unexpected entry: {:?}", x),
}
assert_eq!(Some(expected_applied), table.take_applied_pending());
@ -743,6 +758,8 @@ mod tests {
})
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_,_) -> _)
QuickCheck::new()
.tests(10)
.quickcheck(prop as fn(_, _) -> _)
}
}

View File

@ -180,7 +180,17 @@ where
/// Returns an iterator over the nodes in the bucket, together with their status.
pub fn iter(&self) -> impl Iterator<Item = (&Node<TKey, TVal>, NodeStatus)> {
Iterator::chain(self.weighted.iter(), self.swamp.iter())
Iterator::chain(self.weighted(), self.swamp())
}
/// Returns an iterator over the weighted nodes in the bucket, together with their status.
pub fn weighted(&self) -> impl Iterator<Item = (&Node<TKey, TVal>, NodeStatus)> {
self.weighted.iter()
}
/// Returns an iterator over the swamp nodes in the bucket, together with their status.
pub fn swamp(&self) -> impl Iterator<Item = (&Node<TKey, TVal>, NodeStatus)> {
self.swamp.iter()
}
/// Inserts the pending node into the bucket, if its timeout has elapsed,

View File

@ -0,0 +1,305 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::kbucket::{
BucketIndex, ClosestBucketsIter, Distance, KBucketsTable, KeyBytes, Node, NodeStatus,
};
#[derive(Copy, Clone, Debug)]
struct Progress {
need: usize,
got: usize,
}
#[derive(Copy, Clone, Debug)]
enum State {
Nowhere,
Weighted(Progress),
Swamp {
/// After taking one element from swamp, go back to weighted, keeping saved progress
saved: Option<Progress>,
},
Empty,
}
pub struct WeightedIter<'a, TKey, TVal> {
target: &'a KeyBytes,
start: BucketIndex,
weighted_buckets: ClosestBucketsIter,
weighted_iter: Option<DynIter<'a, TKey, TVal>>,
swamp_buckets: ClosestBucketsIter,
swamp_iter: Option<DynIter<'a, TKey, TVal>>,
table: &'a KBucketsTable<TKey, TVal>, // TODO: make table &mut and call apply_pending?
state: State,
}
type Entry<'a, TKey, TVal> = (&'a Node<TKey, TVal>, NodeStatus);
type DynIter<'a, TKey, TVal> = Box<dyn Iterator<Item = (&'a Node<TKey, TVal>, NodeStatus)> + 'a>;
impl<'a, TKey, TVal> WeightedIter<'a, TKey, TVal>
where
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone,
{
pub fn new(
table: &'a KBucketsTable<TKey, TVal>,
distance: Distance,
target: &'a KeyBytes,
) -> impl Iterator<Item = Entry<'a, TKey, TVal>> + 'a
where
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone,
{
let start = BucketIndex::new(&distance).unwrap_or(BucketIndex(0));
WeightedIter {
target,
start: start.clone(),
weighted_buckets: ClosestBucketsIter::new(distance),
weighted_iter: None,
swamp_buckets: ClosestBucketsIter::new(distance),
swamp_iter: None,
table,
state: State::Nowhere,
}
}
pub fn num_weighted(&self, idx: BucketIndex) -> usize {
let distance = (self.start.get() as isize - idx.get() as isize).abs();
// After distance > 2, result will always be equal to 1
let pow = (distance + 2).min(4);
// 16/2^(distance+2)
16 / 2usize.pow(pow as u32) // TODO: check overflows?
}
// Take iterator for next weighted bucket, save it to self.weighted_iter, and return
pub fn next_weighted_bucket(&mut self) -> Option<(BucketIndex, &mut DynIter<'a, TKey, TVal>)> {
let idx = self.weighted_buckets.next()?;
let bucket = self.table.buckets.get(idx.get());
let v = bucket.map(|b| self.sort_bucket(b.weighted().collect::<Vec<_>>()));
self.weighted_iter = v;
// self.weighted_iter = bucket.map(|b| Box::new(b.weighted()) as _);
self.weighted_iter.as_mut().map(|iter| (idx, iter))
}
// Take next weighted element, return None when there's no weighted elements
pub fn next_weighted(&mut self) -> Option<Entry<'a, TKey, TVal>> {
// Take current weighted_iter or the next one
if let Some(iter) = self.weighted_iter.as_deref_mut() {
return iter.next();
}
let iter = self.next_weighted_bucket().map(|(_, i)| i)?;
iter.next()
}
pub fn next_swamp_bucket(&mut self) -> Option<&'_ mut DynIter<'a, TKey, TVal>> {
let idx = self.swamp_buckets.next()?;
let bucket = self.table.buckets.get(idx.get());
let v = bucket.map(|b| self.sort_bucket(b.swamp().collect::<Vec<_>>()));
self.swamp_iter = v;
self.swamp_iter.as_mut()
}
pub fn next_swamp(&mut self) -> Option<Entry<'a, TKey, TVal>> {
if let Some(iter) = self.swamp_iter.as_deref_mut() {
return iter.next();
}
let iter = self.next_swamp_bucket()?;
iter.next()
}
pub fn sort_bucket(&self, mut bucket: Vec<Entry<'a, TKey, TVal>>) -> DynIter<'a, TKey, TVal> {
bucket.sort_by_cached_key(|(e, _)| self.target.distance(e.key.as_ref()));
Box::new(bucket.into_iter())
}
}
impl<'a, TKey, TVal> Iterator for WeightedIter<'a, TKey, TVal>
where
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone,
{
type Item = Entry<'a, TKey, TVal>;
fn next(&mut self) -> Option<Self::Item> {
use State::*;
loop {
let (state, result) = match self.state {
// Not yet started, or just finished a bucket
// Here we decide where to go next
Nowhere => {
// If there is a weighted bucket, try take element from it
if let Some((idx, iter)) = self.next_weighted_bucket() {
if let Some(elem) = iter.next() {
// Found weighted element
let state = Weighted(Progress {
got: 1,
need: self.num_weighted(idx),
});
(state, Some(elem))
} else {
// Weighted bucket was empty: go decide again
(Nowhere, None)
}
} else {
// There are no weighted buckets, go to swamp
(Swamp { saved: None }, None)
}
}
// Iterating through a weighted bucket, need more elements
Weighted(Progress { got, need }) if got < need => {
if let Some(elem) = self.next_weighted() {
// Found weighted element, go take more
let state = Weighted(Progress { got: got + 1, need });
(state, Some(elem))
} else {
// Current bucket is empty, we're nowhere: need to decide where to go next
(Nowhere, None)
}
}
// Got enough weighted, go to swamp (saving progress, to return back with it)
Weighted(Progress { need, .. }) => {
(
Swamp {
// Set 'got' to zero, so when we get element from swarm, we start afresh
saved: Some(Progress { need, got: 0 }),
},
None,
)
}
// Take one element from swamp, and go to Weighted
Swamp { saved: Some(saved) } => {
if let Some(elem) = self.next_swamp() {
// We always take just a single element from the swamp
// And then go back to weighted
(Weighted(saved), Some(elem))
} else if self.next_swamp_bucket().is_some() {
// Current bucket was empty, take next one
(Swamp { saved: Some(saved) }, None)
} else {
// No more swamp buckets, go drain weighted
(Weighted(saved), None)
}
}
// Weighted buckets are empty
Swamp { saved } => {
if let Some(elem) = self.next_swamp() {
// Keep draining bucket until it's empty
(Swamp { saved }, Some(elem))
} else if self.next_swamp_bucket().is_some() {
// Current bucket was empty, go try next one
(Swamp { saved }, None)
} else {
// Routing table is empty
(Empty, None)
}
}
Empty => (Empty, None),
};
self.state = state;
if result.is_some() {
return result;
}
if let Empty = &self.state {
return None;
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use libp2p_core::identity::ed25519;
use libp2p_core::{identity, PeerId};
use crate::kbucket::{Entry, InsertResult, Key};
use super::*;
#[test]
/// Insert:
/// 1 weighted far away
/// lots of swamp near
///
/// Expect:
/// weighted still in the results
fn weighted_first() {
env_logger::builder()
.filter_level(log::LevelFilter::Debug)
.try_init()
.ok();
let keypair = ed25519::Keypair::generate();
let public_key = identity::PublicKey::Ed25519(keypair.public());
let local_key = Key::from(PeerId::from(public_key));
let mut table =
KBucketsTable::<_, ()>::new(keypair, local_key.clone(), Duration::from_secs(5));
let mut insert = |id: Key<PeerId>, weight: u32| {
if let Entry::Absent(e) = table.entry(&id) {
if let InsertResult::Inserted = e.insert((), NodeStatus::Connected, weight) {
return Some(id);
}
}
None
};
let target_min_distance = 250;
// Generate weighted target
let target = loop {
let id = Key::from(PeerId::random());
let distance = local_key.distance(&id);
let idx = BucketIndex::new(&distance).unwrap_or(BucketIndex(0)).get();
if idx > target_min_distance {
let target = insert(id, 10).expect("inserted");
break target;
}
};
let mut swamp = 100;
let max_distance = 252;
while swamp > 0 {
let id = Key::from(PeerId::random());
let distance = local_key.distance(&id);
let idx = BucketIndex::new(&distance).unwrap_or(BucketIndex(0)).get();
if idx < max_distance && insert(id, 0).is_some() {
swamp -= 1;
}
}
// Start from bucket 0
let closest = table.closest_keys(&local_key).take(1).collect::<Vec<_>>();
assert!(closest.contains(&target));
// Start from target
let closest = table.closest_keys(&target).take(1).collect::<Vec<_>>();
assert!(closest.contains(&target));
// Start from random
let random = Key::from(PeerId::random());
let closest = table.closest_keys(&random).take(1).collect::<Vec<_>>();
assert!(closest.contains(&target));
}
}

View File

@ -42,6 +42,7 @@ use wasm_timer::Instant;
use derivative::Derivative;
use libp2p_core::identity::ed25519::PublicKey;
use trust_graph::{Certificate, Trust};
/// The protocol name used for negotiating with multistream-select.
pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0";
@ -95,6 +96,7 @@ pub struct KadPeer {
pub multiaddrs: Vec<Multiaddr>,
/// How the sender is connected to that remote.
pub connection_ty: KadConnectionType,
pub certificates: Vec<Certificate>,
}
// Builds a `KadPeer` from a corresponding protobuf message.
@ -123,17 +125,50 @@ impl TryFrom<proto::message::Peer> for KadPeer {
invalid_data(format!("invalid public key: {}", e).as_str())
)?;
let mut certificates = Vec::with_capacity(peer.certificates.len());
for cert in peer.certificates.into_iter() {
let mut chain = Vec::with_capacity(cert.chain.len());
for trust in cert.chain.into_iter() {
let issued_for = PublicKey::decode(trust.issued_for.as_slice())
.map_err(|e|
invalid_data(format!("invalid issued_for: {}", e).as_str())
)?;
let expires_at: Duration = Duration::from_secs(trust.expires_at_secs);
let issued_at: Duration = Duration::from_secs(trust.issued_at_secs);
let signature: Vec<u8> = trust.signature;
let trust = Trust::new(issued_for, expires_at, issued_at, signature);
chain.push(trust);
}
certificates.push(Certificate::new_unverified(chain));
}
Ok(KadPeer {
public_key,
node_id,
multiaddrs: addrs,
connection_ty
connection_ty,
certificates
})
}
}
impl Into<proto::message::Peer> for KadPeer {
fn into(self) -> proto::message::Peer {
let certificates = self.certificates.into_iter().map(|cert|
proto::Certificate {
chain: cert.chain.into_iter().map(|trust| {
proto::Trust {
issued_for: trust.issued_for.encode().to_vec(),
expires_at_secs: trust.expires_at.as_secs(),
signature: trust.signature,
issued_at_secs: trust.issued_at.as_secs(),
}
}).collect(),
}
).collect();
proto::message::Peer {
id: self.node_id.into_bytes(),
addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
@ -141,7 +176,8 @@ impl Into<proto::message::Peer> for KadPeer {
let ct: proto::message::ConnectionType = self.connection_ty.into();
ct as i32
},
public_key: self.public_key.encode().to_vec()
public_key: self.public_key.encode().to_vec(),
certificates
}
}
}

View File

@ -32,6 +32,14 @@ use libp2p_core::PeerId;
use std::{time::Duration, num::NonZeroUsize};
use wasm_timer::Instant;
/// Peer along with its weight
pub struct WeightedPeer {
/// Kademlia key & id of the peer
pub peer_id: Key<PeerId>,
/// Weight, calculated locally
pub weight: u32
}
/// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion.
///
/// Internally, a `Query` is in turn driven by an underlying `QueryPeerIter`
@ -89,7 +97,7 @@ impl<TInner> QueryPool<TInner> {
/// Adds a query to the pool that contacts a fixed set of peers.
pub fn add_fixed<I>(&mut self, peers: I, inner: TInner) -> QueryId
where
I: IntoIterator<Item = PeerId>
I: IntoIterator<Item = WeightedPeer>
{
let id = self.next_query_id();
self.continue_fixed(id, peers, inner);
@ -101,20 +109,27 @@ impl<TInner> QueryPool<TInner> {
/// earlier.
pub fn continue_fixed<I>(&mut self, id: QueryId, peers: I, inner: TInner)
where
I: IntoIterator<Item = PeerId>
I: IntoIterator<Item = WeightedPeer>
{
assert!(!self.queries.contains_key(&id));
// TODO: why not alpha?
let parallelism = self.config.replication_factor.get();
let peer_iter = QueryPeerIter::Fixed(FixedPeersIter::new(peers, parallelism));
let query = Query::new(id, peer_iter, inner);
let (swamp, weighted) = peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
let swamp = swamp.into_iter().map(|p| p.peer_id.into_preimage());
let weighted = weighted.into_iter().map(|p| p.peer_id.into_preimage());
let weighted_iter = QueryPeerIter::Fixed(FixedPeersIter::new(weighted, parallelism));
let swamp_iter = QueryPeerIter::Fixed(FixedPeersIter::new(swamp, parallelism));
let query = Query::new(id, weighted_iter, swamp_iter, inner);
self.queries.insert(id, query);
}
/// Adds a query to the pool that iterates towards the closest peers to the target.
pub fn add_iter_closest<T, I>(&mut self, target: T, peers: I, inner: TInner) -> QueryId
where
T: Into<KeyBytes>,
I: IntoIterator<Item = Key<PeerId>>
T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = WeightedPeer>
{
let id = self.next_query_id();
self.continue_iter_closest(id, target, peers, inner);
@ -124,15 +139,21 @@ impl<TInner> QueryPool<TInner> {
/// Adds a query to the pool that iterates towards the closest peers to the target.
pub fn continue_iter_closest<T, I>(&mut self, id: QueryId, target: T, peers: I, inner: TInner)
where
T: Into<KeyBytes>,
I: IntoIterator<Item = Key<PeerId>>
T: Into<KeyBytes> + Clone,
I: IntoIterator<Item = WeightedPeer>
{
let cfg = ClosestPeersIterConfig {
num_results: self.config.replication_factor.get(),
.. ClosestPeersIterConfig::default()
};
let peer_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, peers));
let query = Query::new(id, peer_iter, inner);
let (swamp, weighted) = peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
let swamp = swamp.into_iter().map(|p| p.peer_id);
let weighted = weighted.into_iter().map(|p| p.peer_id);
let weighted_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg.clone(), target.clone(), weighted));
let swamp_iter = QueryPeerIter::Closest(ClosestPeersIter::with_config(cfg, target, swamp));
let query = Query::new(id, weighted_iter, swamp_iter, inner);
self.queries.insert(id, query);
}
@ -230,7 +251,9 @@ pub struct Query<TInner> {
/// The unique ID of the query.
id: QueryId,
/// The peer iterator that drives the query state.
peer_iter: QueryPeerIter,
weighted_iter: QueryPeerIter,
/// The peer iterator that drives the query state.
swamp_iter: QueryPeerIter,
/// Execution statistics of the query.
stats: QueryStats,
/// The opaque inner query state.
@ -245,8 +268,8 @@ enum QueryPeerIter {
impl<TInner> Query<TInner> {
/// Creates a new query without starting it.
fn new(id: QueryId, peer_iter: QueryPeerIter, inner: TInner) -> Self {
Query { id, inner, peer_iter, stats: QueryStats::empty() }
fn new(id: QueryId, weighted_iter: QueryPeerIter, swamp_iter: QueryPeerIter, inner: TInner) -> Self {
Query { id, inner, weighted_iter, swamp_iter, stats: QueryStats::empty() }
}
/// Gets the unique ID of the query.
@ -261,11 +284,17 @@ impl<TInner> Query<TInner> {
/// Informs the query that the attempt to contact `peer` failed.
pub fn on_failure(&mut self, peer: &PeerId) {
let updated = match &mut self.peer_iter {
let updated_swamp = match &mut self.swamp_iter {
QueryPeerIter::Closest(iter) => iter.on_failure(peer),
QueryPeerIter::Fixed(iter) => iter.on_failure(peer)
QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
};
if updated {
let updated_weighted = match &mut self.weighted_iter {
QueryPeerIter::Closest(iter) => iter.on_failure(peer),
QueryPeerIter::Fixed(iter) => iter.on_failure(peer),
};
if updated_swamp || updated_weighted {
self.stats.failure += 1;
}
}
@ -275,37 +304,77 @@ impl<TInner> Query<TInner> {
/// the query, if applicable.
pub fn on_success<I>(&mut self, peer: &PeerId, new_peers: I)
where
I: IntoIterator<Item = PeerId>
I: IntoIterator<Item = WeightedPeer>
{
let updated = match &mut self.peer_iter {
QueryPeerIter::Closest(iter) => iter.on_success(peer, new_peers),
QueryPeerIter::Fixed(iter) => iter.on_success(peer)
let (swamp, weighted) = new_peers.into_iter().partition::<Vec<_>, _>(|p| p.weight == 0);
let updated_swamp = match &mut self.swamp_iter {
QueryPeerIter::Closest(iter) => iter.on_success(peer, swamp.into_iter().map(|p| p.peer_id)),
QueryPeerIter::Fixed(iter) => iter.on_success(peer),
};
if updated {
let updated_weighted = match &mut self.weighted_iter {
QueryPeerIter::Closest(iter) => iter.on_success(peer, weighted.into_iter().map(|p| p.peer_id)),
QueryPeerIter::Fixed(iter) => iter.on_success(peer),
};
if updated_swamp || updated_weighted {
self.stats.success += 1;
}
}
/// Checks whether the query is currently waiting for a result from `peer`.
pub fn is_waiting(&self, peer: &PeerId) -> bool {
match &self.peer_iter {
let weighted_waiting = match &self.weighted_iter {
QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
}
};
let swamp_waiting = match &self.swamp_iter {
QueryPeerIter::Closest(iter) => iter.is_waiting(peer),
QueryPeerIter::Fixed(iter) => iter.is_waiting(peer)
};
debug_assert_ne!(weighted_waiting, swamp_waiting);
weighted_waiting || swamp_waiting
}
/// Advances the state of the underlying peer iterator.
fn next(&mut self, now: Instant) -> PeersIterState {
let state = match &mut self.peer_iter {
use PeersIterState::*;
// First query weighted iter
let weighted_state = match &mut self.weighted_iter {
QueryPeerIter::Closest(iter) => iter.next(now),
QueryPeerIter::Fixed(iter) => iter.next()
};
if let PeersIterState::Waiting(Some(_)) = state {
// If there's a new weighted peer to send rpc to, return it
if let Waiting(Some(_)) = weighted_state {
self.stats.requests += 1;
return weighted_state;
}
state
// If there was no new weighted peer, check swamp
let swamp_state = match &mut self.swamp_iter {
QueryPeerIter::Closest(iter) => iter.next(now),
QueryPeerIter::Fixed(iter) => iter.next()
};
// If there's a new swamp peer to send rpc to, return it
if let Waiting(Some(_)) = swamp_state {
self.stats.requests += 1;
return swamp_state;
}
// Return remaining state: weighted has higher priority
match (weighted_state, swamp_state) {
// If weighted finished, return swamp state
(Finished, swamp) => swamp,
// Otherwise, return weighted state first
(weighted, _) => weighted
}
}
/// Finishes the query prematurely.
@ -313,10 +382,15 @@ impl<TInner> Query<TInner> {
/// A finished query immediately stops yielding new peers to contact and will be
/// reported by [`QueryPool::poll`] via [`QueryPoolState::Finished`].
pub fn finish(&mut self) {
match &mut self.peer_iter {
match &mut self.weighted_iter {
QueryPeerIter::Closest(iter) => iter.finish(),
QueryPeerIter::Fixed(iter) => iter.finish()
}
};
match &mut self.swamp_iter {
QueryPeerIter::Closest(iter) => iter.finish(),
QueryPeerIter::Fixed(iter) => iter.finish()
};
}
/// Checks whether the query has finished.
@ -324,18 +398,32 @@ impl<TInner> Query<TInner> {
/// A finished query is eventually reported by `QueryPool::next()` and
/// removed from the pool.
pub fn is_finished(&self) -> bool {
match &self.peer_iter {
let weighted_finished = match &self.weighted_iter {
QueryPeerIter::Closest(iter) => iter.is_finished(),
QueryPeerIter::Fixed(iter) => iter.is_finished()
}
};
let swamp_finished = match &self.swamp_iter {
QueryPeerIter::Closest(iter) => iter.is_finished(),
QueryPeerIter::Fixed(iter) => iter.is_finished()
};
weighted_finished && swamp_finished
}
/// Consumes the query, producing the final `QueryResult`.
pub fn into_result(self) -> QueryResult<TInner, impl Iterator<Item = PeerId>> {
let peers = match self.peer_iter {
let weighted = match self.weighted_iter {
QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()),
QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
};
let swamp = match self.swamp_iter {
QueryPeerIter::Closest(iter) => Either::Left(iter.into_result()),
QueryPeerIter::Fixed(iter) => Either::Right(iter.into_result())
};
let peers = weighted.chain(swamp);
QueryResult { peers, inner: self.inner, stats: self.stats }
}
}

View File

@ -117,7 +117,8 @@ impl ClosestPeersIter {
let log = vec![(Instant::now(), state.clone())];
(distance, Peer { key, state, log })
})
.take(K_VALUE.into()));
.take(K_VALUE.into())
);
// The iterator initially makes progress by iterating towards the target.
let state = State::Iterating { no_progress : 0 };
@ -150,7 +151,7 @@ impl ClosestPeersIter {
/// calling this function has no effect and `false` is returned.
pub fn on_success<I>(&mut self, peer: &PeerId, closer_peers: I) -> bool
where
I: IntoIterator<Item = PeerId>
I: IntoIterator<Item = Key<PeerId>>
{
if let State::Finished = self.state {
return false
@ -161,7 +162,10 @@ impl ClosestPeersIter {
// Mark the peer as succeeded.
match self.closest_peers.entry(distance) {
Entry::Vacant(..) => return false,
Entry::Vacant(..) => {
log::warn!("peer {} not found in closest_peers, ignoring result", peer);
return false
},
Entry::Occupied(mut e) => match e.get().state {
PeerState::Waiting(..) => {
debug_assert!(self.num_waiting > 0);
@ -173,7 +177,10 @@ impl ClosestPeersIter {
}
PeerState::NotContacted
| PeerState::Failed
| PeerState::Succeeded => return false
| PeerState::Succeeded => {
log::warn!("peer {} is incorrect state {:?}, ignoring result", peer, e.get().state);
return false
}
}
}
@ -181,8 +188,7 @@ impl ClosestPeersIter {
let mut progress = false;
// Incorporate the reported closer peers into the iterator.
for peer in closer_peers {
let key = peer.into();
for key in closer_peers {
let distance = self.target.distance(&key);
let peer = Peer { key, state: PeerState::NotContacted, log: vec![(Instant::now(), PeerState::NotContacted)] };
self.closest_peers.entry(distance).or_insert(peer);
@ -470,6 +476,8 @@ enum State {
/// results is increased to `num_results` in an attempt to finish the iterator.
/// If the iterator can make progress again upon receiving the remaining
/// results, it switches back to `Iterating`. Otherwise it will be finished.
///
/// // TODO: is it used?
Stalled,
/// The iterator is finished.
@ -652,8 +660,11 @@ mod tests {
for (i, k) in expected.iter().enumerate() {
if rng.gen_bool(0.75) {
let num_closer = rng.gen_range(0, iter.config.num_results + 1);
let closer_peers = random_peers(num_closer, &mut rng);
remaining.extend(closer_peers.iter().cloned().map(Key::from));
let closer_peers = random_peers(num_closer, &mut rng)
.into_iter()
.map(|p| p.into())
.collect::<Vec<_>>();
remaining.extend(closer_peers.clone());
iter.on_success(k.preimage(), closer_peers);
} else {
num_failures += 1;
@ -709,7 +720,10 @@ mod tests {
let now = Instant::now();
let mut rng = StdRng::from_seed(seed.0);
let closer = random_peers(1, &mut rng);
let closer = random_peers(1, &mut rng)
.into_iter()
.map(|p| p.into())
.collect::<Vec<_>>();
// A first peer reports a "closer" peer.
let peer1 = match iter.next(now) {
@ -731,7 +745,7 @@ mod tests {
};
// The "closer" peer must only be in the iterator once.
let n = iter.closest_peers.values().filter(|e| e.key.preimage() == &closer[0]).count();
let n = iter.closest_peers.values().filter(|e| e.key == closer[0]).count();
assert_eq!(n, 1);
true

View File

@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-std = "1.0"
async-std = "~1.5.0"
data-encoding = "2.0"
dns-parser = "0.8"
either = "1.5.3"

View File

@ -19,7 +19,7 @@ void = "1.0"
wasm-timer = "0.2"
[dev-dependencies]
async-std = "1.0"
async-std = "~1.5.0"
libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" }
libp2p-secio = { version = "0.19.0", path = "../../protocols/secio" }
libp2p-yamux = { version = "0.19.0", path = "../../muxers/yamux" }

View File

@ -46,7 +46,7 @@ secp256k1 = []
aes-all = ["aesni"]
[dev-dependencies]
async-std = "1.0"
async-std = "~1.5.0"
criterion = "0.3"
libp2p-mplex = { version = "0.19.0", path = "../../muxers/mplex" }
libp2p-tcp = { version = "0.19.0", path = "../../transports/tcp" }

View File

@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-std = { version = "1.0", optional = true }
async-std = { version = "~1.5.0", optional = true }
futures = "0.3.1"
futures-timer = "3.0"
get_if_addrs = "0.5.3"

View File

@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[target.'cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))'.dependencies]
async-std = { version = "1.0", optional = true }
async-std = { version = "~1.5.0", optional = true }
libp2p-core = { version = "0.19.0", path = "../../core" }
log = "0.4.1"
futures = "0.3.1"