Update TrustGraph

This commit is contained in:
folex 2021-03-09 17:47:13 +03:00
parent 372ef232bd
commit 47f5452f66
21 changed files with 121 additions and 60 deletions

View File

@ -97,7 +97,7 @@ libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.8.1"
tokio = { version = "1.0.1", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] }
trust-graph = "0.2.0"
trust-graph = "0.2.5"
[workspace]
members = [

View File

@ -64,7 +64,7 @@ use libp2p::{
swarm::NetworkBehaviourEventProcess
};
use std::{error::Error, task::{Context, Poll}};
use trust_graph::TrustGraph;
use trust_graph::{TrustGraph, InMemoryStorage};
fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
@ -159,7 +159,11 @@ fn main() -> Result<(), Box<dyn Error>> {
libp2p::identity::Keypair::Ed25519(kp) => kp,
_ => unreachable!("only ed25519 supported"),
};
let kademlia = Kademlia::new(local_key, local_peer_id.clone(), store, TrustGraph::new(vec![]));
let trust = {
let storage = InMemoryStorage::new_in_memory(vec![]);
TrustGraph::new(storage)
};
let kademlia = Kademlia::new(local_key, local_peer_id.clone(), store, trust);
let mdns = task::block_on(Mdns::new(MdnsConfig::default()))?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)

View File

@ -16,7 +16,7 @@ name = "libp2p_mplex"
bytes = "1"
futures = "0.3.1"
asynchronous-codec = "0.6"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
log = "0.4"
nohash-hasher = "0.2"
parking_lot = "0.11"

View File

@ -16,7 +16,7 @@ name = "libp2p_floodsub"
cuckoofilter = "0.5.0"
fnv = "1.0"
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm", package = "fluence-fork-libp2p-swarm" }
log = "0.4"
prost = "0.7"

View File

@ -14,7 +14,7 @@ name = "libp2p_gossipsub"
[dependencies]
libp2p-swarm = { version = "0.28.0", path = "../../swarm", package = "fluence-fork-libp2p-swarm" }
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
bytes = "1.0"
byteorder = "1.3.4"
fnv = "1.0.7"

View File

@ -14,7 +14,7 @@ name = "libp2p_identify"
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm", package = "fluence-fork-libp2p-swarm" }
log = "0.4.1"
prost = "0.7"

View File

@ -20,7 +20,7 @@ fnv = "1.0"
asynchronous-codec = "0.6"
futures = "0.3.1"
log = "0.4"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm", package = "fluence-fork-libp2p-swarm" }
prost = "0.7"
rand = "0.7.2"
@ -33,7 +33,8 @@ void = "1.0"
bs58 = "0.3.0"
derivative = "2.0.2"
trust-graph = "0.2.0"
trust-graph = "0.2.5"
fluence-identity = "0.2.4"
prometheus = "0.9.0"
[dev-dependencies]

View File

@ -58,12 +58,14 @@ use std::task::{Context, Poll};
use std::vec;
use wasm_timer::Instant;
use libp2p_core::identity::ed25519::{Keypair, PublicKey};
use trust_graph::{TrustGraph, Certificate};
use trust_graph::{Certificate};
use derivative::Derivative;
use crate::metrics::Metrics;
pub use crate::query::QueryStats;
type TrustGraph = trust_graph::TrustGraph<trust_graph::InMemoryStorage>;
/// `Kademlia` is a `NetworkBehaviour` that implements the libp2p
/// Kademlia protocol.
pub struct Kademlia<TStore> {
@ -766,7 +768,7 @@ where
bs58::encode(target.as_ref()).into_string(), // sha256
);
let provider_key = self.kbuckets.local_public_key();
let certificates = self.trust.get_all_certs(&provider_key, &[]);
let certificates = self.get_certificates(&provider_key);
let peers = Self::closest_keys(&mut self.kbuckets, &target);
let context = AddProviderContext::Publish;
let info = QueryInfo::AddProvider {
@ -849,13 +851,12 @@ where
if let Some(query) = self.queries.get_mut(query_id) {
log::trace!("Request to {:?} in query {:?} succeeded.", source, query_id);
for peer in others_iter.clone() {
log::trace!("Peer {:?} reported by {:?} in query {:?}.",
peer, source, query_id);
log::trace!("Peer {:?} reported by {:?} in query {:?}.", peer, source, query_id);
query.inner.contacts.insert(peer.node_id, peer.clone().into());
}
query.on_success(source, others_iter.map(|kp| WeightedPeer {
peer_id: kp.node_id.clone().into(),
weight: trust.weight(&kp.public_key).unwrap_or_default()
weight: get_weight(trust, &kp.public_key),
}))
}
}
@ -874,7 +875,7 @@ where
.map(KadPeer::from)
.collect();
peers.iter_mut().for_each(|mut peer|
peer.certificates = self.trust.get_all_certs(&peer.public_key, &[])
peer.certificates = self.get_certificates(&peer.public_key)
);
peers
}
@ -907,14 +908,12 @@ where
// The provider is either the local node and we fill in
// the local addresses on demand,
let self_key = kbuckets.local_public_key();
let certificates = trust.get_all_certs(&self_key, &[]);
let multiaddrs = local_addrs.iter().cloned().collect::<Vec<_>>();
Some(KadPeer {
public_key: self_key,
node_id,
multiaddrs,
connection_ty,
certificates
multiaddrs: local_addrs.iter().cloned().collect::<Vec<_>>(),
certificates: get_certificates(&trust, &self_key),
public_key: self_key,
})
} else {
let key = kbucket::Key::from(node_id);
@ -927,16 +926,16 @@ where
} else {
p.addresses
};
let certificates = node_id.as_public_key().and_then(|provider_pk|
match provider_pk {
libp2p_core::identity::PublicKey::Ed25519(pk) =>
Some(trust.get_all_certs(pk, &[])),
let certificates = {
match node_id.as_public_key() {
Some(libp2p_core::identity::PublicKey::Ed25519(pk)) =>
get_certificates(&trust, &pk),
key => {
log::warn!("Provider {} has a non-Ed25519 public key: {:?}", node_id, key);
None
vec![]
}
}
).unwrap_or_default();
};
KadPeer {
node_id,
@ -969,7 +968,7 @@ where
/// Starts an iterative `ADD_PROVIDER` query for the given key.
fn start_add_provider(&mut self, key: record::Key, context: AddProviderContext) {
let provider_key = self.kbuckets.local_public_key();
let certificates = self.trust.get_all_certs(&provider_key, &[]);
let certificates = self.get_certificates(&provider_key);
let info = QueryInfo::AddProvider {
context,
key: key.clone(),
@ -1071,7 +1070,7 @@ where
{
let addresses = contact.addresses.clone();
let peer = entry.key().preimage().clone();
let weight = trust.weight(contact.public_key.clone()).unwrap_or(0);
let weight = get_weight(&trust, &contact.public_key);
debug!(
"Calculated weight for {} pk {}: {}",
entry.key().preimage(),
@ -1223,7 +1222,7 @@ where
let provider_id = *params.local_peer_id();
let external_addresses = params.external_addresses().map(|r| r.addr).collect();
let provider_key = self.kbuckets.local_public_key();
let certificates = self.trust.get_all_certs(&provider_key, &[]);
let certificates = self.get_certificates(&provider_key);
let inner = QueryInner::new(QueryInfo::AddProvider {
context,
key,
@ -1240,7 +1239,7 @@ where
let peers = result.peers.into_iter().map(|peer_id| {
let weight = contacts
.get(&peer_id)
.and_then(|c| trust.weight(&c.public_key))
.map(|c| get_weight(&trust, &c.public_key))
.unwrap_or_default();
WeightedPeer {
peer_id: peer_id.into(),
@ -1298,7 +1297,8 @@ where
let trust = &self.trust;
let weight =
result.inner.contacts.get(peer_id)
.and_then(|c| trust.weight(&c.public_key)).unwrap_or_default();
.map(|c| get_weight(&trust, &c.public_key))
.unwrap_or_default();
let peer = WeightedPeer {
weight,
peer_id: cache_key
@ -1341,9 +1341,10 @@ where
let trust = &self.trust;
let peers = result.peers.into_iter().map(|peer_id| {
let weight =
contacts.get(&peer_id).and_then(|c|
trust.weight(&c.public_key)
).unwrap_or_default();
contacts
.get(&peer_id)
.map(|c| get_weight(&trust, &c.public_key))
.unwrap_or_default();
WeightedPeer {
peer_id: peer_id.into(),
@ -1737,6 +1738,26 @@ where
log!("\n{}", buckets);
}
}
fn get_certificates(&self, key: &PublicKey) -> Vec<Certificate> {
get_certificates(&self.trust, key)
}
fn get_weight(&self, key: &PublicKey) -> u32 {
get_weight(&self.trust, key)
}
}
fn get_certificates(trust: &TrustGraph, key: &PublicKey) -> Vec<Certificate> {
fluence_identity::PublicKey::from_libp2p(&key).map(|key|
trust.get_all_certs(&key, &[]).unwrap_or_default()
).unwrap_or_default()
}
fn get_weight(trust: &TrustGraph, key: &PublicKey) -> u32 {
fluence_identity::PublicKey::from_libp2p(&key).map(|key|
trust.weight(&key).unwrap_or_default().unwrap_or_default()
).unwrap_or(0)
}
/// Exponentially decrease the given duration (base 2).

View File

@ -48,6 +48,7 @@ use quickcheck::*;
use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng};
use std::{collections::{HashSet, HashMap}, time::Duration, num::NonZeroUsize, u64};
use libp2p_core::identity::ed25519;
use trust_graph::InMemoryStorage;
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
@ -67,8 +68,12 @@ fn build_node_with_config(cfg: KademliaConfig) -> (ed25519::Keypair, Multiaddr,
.boxed();
let local_id = local_public_key.clone().into_peer_id();
let trust = {
let pk = fluence_identity::PublicKey::from_libp2p(&ed25519_key.public()).unwrap();
let storage = InMemoryStorage::new_in_memory(vec![(pk, 1)]);
TrustGraph::new(storage)
};
let store = MemoryStore::new(local_id.clone());
let trust = TrustGraph::new(vec![(ed25519_key.public(), 1)]);
let behaviour = Kademlia::with_config(ed25519_key.clone(), local_id.clone(), store, cfg.clone(), trust);
let mut swarm = Swarm::new(transport, behaviour, local_id);
@ -1192,7 +1197,8 @@ fn make_swarms(total: usize, config: KademliaConfig) -> Vec<(Keypair, Multiaddr,
#[cfg(test)]
mod certificates {
use super::*;
use trust_graph::{KeyPair, current_time};
use trust_graph::current_time;
use fluence_identity::{KeyPair, PublicKey};
fn gen_root_cert(from: &KeyPair, to: PublicKey) -> Certificate {
let cur_time = current_time();
@ -1224,7 +1230,7 @@ mod certificates {
}
fn bs(pk: PublicKey) -> String {
bs58::encode(pk.encode()).into_string()
bs58::encode(pk.to_bytes()).into_string()
}
#[test]
@ -1240,15 +1246,19 @@ mod certificates {
// Set same weights to all nodes, so they store each other's certificates
let weights = swarms.iter().map(|(kp, _, _)| (kp.public(), 1)).collect::<Vec<_>>();
for swarm in swarms.iter_mut() {
swarm.2.trust.add_root_weights(weights.clone());
for (pk, weight) in weights.iter() {
let pk = fluence_identity::PublicKey::from_libp2p(&pk).unwrap();
swarm.2.trust.add_root_weight(pk, *weight);
}
}
let mut swarms = swarms.into_iter();
let (first_kp, _, first) = swarms.next().unwrap();
// issue certs from each swarm to the first swarm, so all swarms trust the first one
let mut swarms = swarms.map(|(kp, _, mut swarm)| {
let pk = fluence_identity::PublicKey::from_libp2p(&first_kp.public()).unwrap();
// root cert, its chain is [self-signed: swarm -> swarm, swarm -> first]
let root = gen_root_cert(&kp.clone().into(), first_kp.public());
let root = gen_root_cert(&kp.clone().into(), pk);
swarm.trust.add(&root, current_time()).unwrap();
SwarmWithKeypair { swarm, kp }
});
@ -1259,16 +1269,25 @@ mod certificates {
// issue cert from the first swarm to the second (will be later disseminated via kademlia)
// chain: 0 -> 1
let cert_0_1 = gen_root_cert(&swarm0.kp.clone().into(), swarm1.kp.public());
let cert_0_1 = {
let pk = fluence_identity::PublicKey::from_libp2p(&swarm1.kp.public()).unwrap();
gen_root_cert(&swarm0.kp.clone().into(), pk)
};
swarm0.swarm.trust.add(&cert_0_1, current_time()).unwrap();
let cert_0_1_check = swarm0.swarm.trust.get_all_certs(&swarm1.kp.public(), &[]);
let cert_0_1_check = {
let pk = fluence_identity::PublicKey::from_libp2p(&swarm1.kp.public()).unwrap();
swarm0.swarm.trust.get_all_certs(pk, &[]).unwrap()
};
assert_eq!(cert_0_1_check.len(), 1);
let cert_0_1_check = cert_0_1_check.into_iter().nth(0).unwrap();
assert_eq!(cert_0_1, cert_0_1_check);
// check that this certificate (with root prepended) can be added to trust graph of any other node
// chain: (2 -> 0)
let mut cert_2_0_1 = gen_root_cert(&swarm2.kp.clone().into(), swarm0.kp.public());
let mut cert_2_0_1 = {
let pk = fluence_identity::PublicKey::from_libp2p(&swarm0.kp.public()).unwrap();
gen_root_cert(&swarm2.kp.clone().into(), pk)
};
// chain: (2 -> 0) ++ (0 -> 1)
cert_2_0_1.chain.extend_from_slice(&cert_0_1.chain[1..]);
swarm2.swarm.trust.add(cert_2_0_1, current_time()).unwrap();
@ -1306,13 +1325,26 @@ mod certificates {
// check that certificates for `swarm[1].kp` were disseminated
for swarm in swarms.iter().skip(2) {
let disseminated = swarm.swarm.trust.get_all_certs(kp_1.clone(), &[]);
let disseminated = {
let pk = fluence_identity::PublicKey::from_libp2p(&kp_1).unwrap();
swarm.swarm.trust.get_all_certs(&pk, &[]).unwrap()
};
// take only certificate converging to current `swarm` public key
let disseminated = disseminated.into_iter().find(|c| &c.chain[0].issued_for == &swarm.kp.public()).unwrap();
let disseminated = {
let pk = fluence_identity::PublicKey::from_libp2p(&swarm.kp.public()).unwrap();
disseminated.into_iter().find(|c| &c.chain[0].issued_for == &pk).unwrap()
};
// swarm -> swarm0 -> swarm1
assert_eq!(disseminated.chain.len(), 3);
let pubkeys = disseminated.chain.iter().map(|c| &c.issued_for).collect::<Vec<_>>();
assert_eq!(pubkeys, vec![&swarm.kp.public(), &swarms[0].kp.public(), &swarms[1].kp.public()]);
assert_eq!(
pubkeys,
vec![
&fluence_identity::PublicKey::from_libp2p(&swarm.kp.public()).unwrap(),
&fluence_identity::PublicKey::from_libp2p(&swarms[0].kp.public()).unwrap(),
&fluence_identity::PublicKey::from_libp2p(&swarms[1].kp.public()).unwrap(),
]
);
// last trust in the certificate must be equal to previously generated (0 -> 1) trust
let last = disseminated.chain.last().unwrap();

View File

@ -133,13 +133,16 @@ impl TryFrom<proto::message::Peer> for KadPeer {
for cert in peer.certificates.into_iter() {
let mut chain = Vec::with_capacity(cert.chain.len());
for trust in cert.chain.into_iter() {
let issued_for = PublicKey::decode(trust.issued_for.as_slice())
let issued_for = fluence_identity::PublicKey::from_bytes(trust.issued_for.as_slice())
.map_err(|e|
invalid_data(format!("invalid issued_for: {}", e).as_str())
)?;
let expires_at: Duration = Duration::from_secs(trust.expires_at_secs);
let issued_at: Duration = Duration::from_secs(trust.issued_at_secs);
let signature: Vec<u8> = trust.signature;
let signature = fluence_identity::Signature::from_bytes(&trust.signature)
.map_err(|e|
invalid_data(format!("invalid signature: {}", e).as_str())
)?;
let trust = Trust::new(issued_for, expires_at, issued_at, signature);
chain.push(trust);
@ -163,9 +166,9 @@ impl Into<proto::message::Peer> for KadPeer {
proto::Certificate {
chain: cert.chain.into_iter().map(|trust| {
proto::Trust {
issued_for: trust.issued_for.encode().to_vec(),
issued_for: trust.issued_for.to_bytes().to_vec(),
expires_at_secs: trust.expires_at.as_secs(),
signature: trust.signature,
signature: trust.signature.to_bytes().to_vec(),
issued_at_secs: trust.issued_at.as_secs(),
}
}).collect(),

View File

@ -19,7 +19,7 @@ dns-parser = "0.8.0"
futures = "0.3.13"
if-watch = "0.2.0"
lazy_static = "1.4.0"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm", package = "fluence-fork-libp2p-swarm" }
log = "0.4.14"
rand = "0.8.3"

View File

@ -14,7 +14,7 @@ name = "libp2p_ping"
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm", package = "fluence-fork-libp2p-swarm" }
log = "0.4.1"
rand = "0.7.2"

View File

@ -16,7 +16,7 @@ name = "libp2p_request_response"
async-trait = "0.1"
bytes = "1"
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-swarm = { version = "0.28.0", path = "../../swarm", package = "fluence-fork-libp2p-swarm" }
log = "0.4.11"
lru = "0.6"

View File

@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[lib]
name = "libp2p_core_derive"
name = "libp2p_swarm_derive"
proc-macro = true
[dependencies]

View File

@ -15,7 +15,7 @@ name = "libp2p_swarm"
[dependencies]
either = "1.6.0"
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../core", package = "fluence-fork-libp2p-core" }
log = "0.4"
rand = "0.7"
smallvec = "1.0"

View File

@ -14,7 +14,7 @@ name = "libp2p_deflate"
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
flate2 = "1.0"
[dev-dependencies]

View File

@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
name = "libp2p_dns"
[dependencies]
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
log = "0.4.1"
futures = "0.3.1"

View File

@ -15,7 +15,7 @@ bytes = "1"
curve25519-dalek = "3.0.0"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
log = "0.4"
prost = "0.7"
rand = "0.7.2"

View File

@ -16,7 +16,7 @@ name = "libp2p_plaintext"
bytes = "1"
futures = "0.3.1"
asynchronous-codec = "0.6"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
log = "0.4.8"
prost = "0.7"
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }

View File

@ -15,7 +15,7 @@ name = "libp2p_uds"
[target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies]
async-std = { version = "1.6.2", optional = true }
libp2p-core = { package = "fluence-fork-libp2p-core", version = "0.27.0", path = "../../core" }
libp2p-core = { package = "fluence-fork-libp2p-core", version = "0.27.1", path = "../../core" }
log = "0.4.1"
futures = "0.3.1"
tokio = { version = "1.0.1", default-features = false, features = ["net"], optional = true }

View File

@ -16,7 +16,7 @@ name = "libp2p_websocket"
futures-rustls = "0.21"
either = "1.5.3"
futures = "0.3.1"
libp2p-core = { version = "0.27.0", path = "../../core", package = "fluence-fork-libp2p-core" }
libp2p-core = { version = "0.27.1", path = "../../core", package = "fluence-fork-libp2p-core" }
log = "0.4.8"
quicksink = "0.1"
rw-stream-sink = "0.2.0"