libp2p 0.19 + fluence fork

This commit is contained in:
folex 2020-05-20 17:38:19 +03:00
parent 57301a8bcf
commit 2e06f5a325
4 changed files with 647 additions and 727 deletions

1180
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,11 +9,12 @@ edition = "2018"
[dependencies] [dependencies]
async-std = "1.0" async-std = "1.0"
cidr = "0.1" cidr = "0.1"
csv = "1" csv = "1.1.3"
env_logger = "0.7.1" env_logger = "0.7.1"
futures = "0.3.1" futures = "0.3.5"
libp2p = { git = "https://github.com/mxinden/rust-libp2p", branch = "disjoint-paths-2" } libp2p = { git = "https://github.com/fluencelabs/rust-libp2p", branch = "trust_graph_libp2p_git_fix" }
libp2p-kad = { git = "https://github.com/mxinden/rust-libp2p", branch = "disjoint-paths-2" } libp2p-kad = { git = "https://github.com/fluencelabs/rust-libp2p", branch = "trust_graph_libp2p_git_fix" }
trust-graph = { git = "https://github.com/fluencelabs/fluence", branch = "trust_graph_libp2p_git_fix" }
log = "0.4.1" log = "0.4.1"
prometheus = "0.7" prometheus = "0.7"
void = "1.0.2" void = "1.0.2"
@ -23,8 +24,11 @@ ctrlc = "3"
structopt = "0.3" structopt = "0.3"
futures-timer = "3" futures-timer = "3"
maxminddb = "*" maxminddb = "*"
serde = "1.0.110"
# [patch."https://github.com/mxinden/rust-libp2p"] # [patch."https://github.com/mxinden/rust-libp2p"]
# libp2p = { path = "/home/mxinden/code/github.com/libp2p/rust-libp2p" } # libp2p = { path = "/home/mxinden/code/github.com/libp2p/rust-libp2p" }
# libp2p-kad = { path = "/home/mxinden/code/github.com/libp2p/rust-libp2p/protocols/kad" } # libp2p-kad = { path = "/home/mxinden/code/github.com/libp2p/rust-libp2p/protocols/kad" }
#[patch.crates-io]
#serde = { version = "1.0.110" }

View File

@ -4,7 +4,7 @@ use futures::prelude::*;
use futures_timer::Delay; use futures_timer::Delay;
use libp2p::{ use libp2p::{
identify::IdentifyEvent, identify::IdentifyEvent,
kad::{GetClosestPeersOk, KademliaEvent}, kad::{GetClosestPeersOk, KademliaEvent, QueryResult},
multiaddr::{Multiaddr, Protocol}, multiaddr::{Multiaddr, Protocol},
ping::{PingEvent, PingSuccess}, ping::{PingEvent, PingSuccess},
PeerId, PeerId,
@ -74,8 +74,10 @@ impl Exporter {
}) })
.collect(); .collect();
let nodes_to_probe_periodically = let nodes_to_probe_periodically = dhts
dhts.into_iter().map(|(name, _, _)| (name, vec![])).collect(); .into_iter()
.map(|(name, _, _)| (name, vec![]))
.collect();
Ok(Exporter { Ok(Exporter {
clients, clients,
@ -163,68 +165,70 @@ impl Exporter {
} }
}, },
client::Event::Kademlia(event) => match *event { client::Event::Kademlia(event) => match *event {
KademliaEvent::BootstrapResult(_) => { KademliaEvent::QueryResult { result, .. } => match result {
self.metrics QueryResult::Bootstrap(_) => {
.event_counter self.metrics
.with_label_values(&[&name, "kad", "bootstrap"]) .event_counter
.inc(); .with_label_values(&[&name, "kad", "bootstrap"])
} .inc();
KademliaEvent::GetClosestPeersResult(res) => { }
self.metrics QueryResult::GetClosestPeers(res) => {
.event_counter self.metrics
.with_label_values(&[&name, "kad", "get_closest_peers"]) .event_counter
.inc(); .with_label_values(&[&name, "kad", "get_closest_peers"])
.inc();
// Record lookup latency. // Record lookup latency.
let result_label = if res.is_ok() { "ok" } else { "error" }; let result_label = if res.is_ok() { "ok" } else { "error" };
let peer_id = PeerId::from_bytes(match res { let peer_id = PeerId::from_bytes(match res {
Ok(GetClosestPeersOk { key, .. }) => key, Ok(GetClosestPeersOk { key, .. }) => key,
Err(err) => err.into_key(), Err(err) => err.into_key(),
}) })
.unwrap(); .unwrap();
let duration = let duration =
Instant::now() - self.in_flight_lookups.remove(&peer_id).unwrap(); Instant::now() - self.in_flight_lookups.remove(&peer_id).unwrap();
self.metrics self.metrics
.random_node_lookup_duration .random_node_lookup_duration
.with_label_values(&[&name, result_label]) .with_label_values(&[&name, result_label])
.observe(duration.as_secs_f64()); .observe(duration.as_secs_f64());
} }
KademliaEvent::GetProvidersResult(_) => { QueryResult::GetProviders(_) => {
self.metrics self.metrics
.event_counter .event_counter
.with_label_values(&[&name, "kad", "get_providers"]) .with_label_values(&[&name, "kad", "get_providers"])
.inc(); .inc();
} }
KademliaEvent::StartProvidingResult(_) => { QueryResult::StartProviding(_) => {
self.metrics self.metrics
.event_counter .event_counter
.with_label_values(&[&name, "kad", "start_providing"]) .with_label_values(&[&name, "kad", "start_providing"])
.inc(); .inc();
} }
KademliaEvent::RepublishProviderResult(_) => { QueryResult::RepublishProvider(_) => {
self.metrics self.metrics
.event_counter .event_counter
.with_label_values(&[&name, "kad", "republish_provider"]) .with_label_values(&[&name, "kad", "republish_provider"])
.inc(); .inc();
} }
KademliaEvent::GetRecordResult(_) => { QueryResult::GetRecord(_) => {
self.metrics self.metrics
.event_counter .event_counter
.with_label_values(&[&name, "kad", "get_record"]) .with_label_values(&[&name, "kad", "get_record"])
.inc(); .inc();
} }
KademliaEvent::PutRecordResult(_) => { QueryResult::PutRecord(_) => {
self.metrics self.metrics
.event_counter .event_counter
.with_label_values(&[&name, "kad", "put_record"]) .with_label_values(&[&name, "kad", "put_record"])
.inc(); .inc();
} }
KademliaEvent::RepublishRecordResult(_) => { QueryResult::RepublishRecord(_) => {
self.metrics self.metrics
.event_counter .event_counter
.with_label_values(&[&name, "kad", "republish_record"]) .with_label_values(&[&name, "kad", "republish_record"])
.inc(); .inc();
} }
},
// Note: Do not interpret Discovered event as a proof of a node // Note: Do not interpret Discovered event as a proof of a node
// being online. // being online.
KademliaEvent::Discovered { .. } => { KademliaEvent::Discovered { .. } => {
@ -320,7 +324,6 @@ impl Exporter {
None None
} }
} }
impl Future for Exporter { impl Future for Exporter {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {

View File

@ -1,9 +1,9 @@
use futures::prelude::*; use futures::prelude::*;
use libp2p::identity::PublicKey;
use libp2p::{ use libp2p::{
core::{ core::{
self, connection::ConnectionLimit, either::EitherError, either::EitherOutput, self, either::EitherError, either::EitherOutput, multiaddr::Protocol,
multiaddr::Protocol, muxing::StreamMuxerBox, transport::boxed::Boxed, transport::Transport, muxing::StreamMuxerBox, transport::boxed::Boxed, transport::Transport, upgrade, Multiaddr,
upgrade, Multiaddr,
}, },
dns, dns,
identify::{Identify, IdentifyEvent}, identify::{Identify, IdentifyEvent},
@ -12,7 +12,10 @@ use libp2p::{
mplex, noise, mplex, noise,
ping::{Ping, PingConfig, PingEvent}, ping::{Ping, PingConfig, PingEvent},
secio, secio,
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, SwarmBuilder}, swarm::{
DialError, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters,
SwarmBuilder,
},
tcp, yamux, InboundUpgradeExt, NetworkBehaviour, OutboundUpgradeExt, PeerId, Swarm, tcp, yamux, InboundUpgradeExt, NetworkBehaviour, OutboundUpgradeExt, PeerId, Swarm,
}; };
use std::{ use std::{
@ -23,6 +26,7 @@ use std::{
time::Duration, time::Duration,
usize, usize,
}; };
use trust_graph::TrustGraph;
mod global_only; mod global_only;
@ -32,13 +36,16 @@ pub struct Client {
} }
impl Client { impl Client {
pub fn new(mut bootnode: Multiaddr, use_disjoint_paths: bool) -> Result<Client, Box<dyn Error>> { pub fn new(
mut bootnode: Multiaddr,
use_disjoint_paths: bool,
) -> Result<Client, Box<dyn Error>> {
// Create a random key for ourselves. // Create a random key for ourselves.
let local_key = Keypair::generate_ed25519(); let local_key = Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public()); let local_peer_id = PeerId::from(local_key.public());
let behaviour = MyBehaviour::new(local_key.clone(), use_disjoint_paths)?; let behaviour = MyBehaviour::new(local_key.clone(), use_disjoint_paths)?;
let transport = build_transport(local_key); let transport = build_transport(local_key.clone());
let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id) let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id)
.incoming_connection_limit(10) .incoming_connection_limit(10)
.outgoing_connection_limit(10) .outgoing_connection_limit(10)
@ -53,8 +60,17 @@ impl Client {
panic!("expected peer id"); panic!("expected peer id");
}; };
swarm.kademlia.add_address(&bootnode_peer_id, bootnode); let public = match local_key.public() {
swarm.kademlia.bootstrap(); PublicKey::Ed25519(pk) => pk,
_ => unreachable!("generate_ed25519 was used"),
};
swarm
.kademlia
.add_address(&bootnode_peer_id, bootnode, public);
if let Err(err) = swarm.kademlia.bootstrap() {
log::error!("Bootstrap failed: {}", err);
}
Ok(Client { Ok(Client {
swarm, swarm,
@ -66,7 +82,7 @@ impl Client {
self.swarm.kademlia.get_closest_peers(peer_id); self.swarm.kademlia.get_closest_peers(peer_id);
} }
pub fn dial(&mut self, peer_id: &PeerId) -> Result<bool, ConnectionLimit> { pub fn dial(&mut self, peer_id: &PeerId) -> Result<(), DialError> {
Swarm::dial(&mut self.swarm, peer_id) Swarm::dial(&mut self.swarm, peer_id)
} }
} }
@ -125,9 +141,16 @@ impl MyBehaviour {
// kind: PermissionDenied, error: "len > max" })` // kind: PermissionDenied, error: "len > max" })`
kademlia_config.set_max_packet_size(8000); kademlia_config.set_max_packet_size(8000);
if use_disjoint_paths { if use_disjoint_paths {
kademlia_config.use_disjoint_path_queries(); log::warn!("Disjoint paths aren't supported yet");
// kademlia_config.use_disjoint_path_queries();
} }
let kademlia = Kademlia::with_config(local_peer_id, store, kademlia_config); let trust = TrustGraph::default();
let kp = match &local_key {
Keypair::Ed25519(kp) => kp,
_ => unreachable!("only ed25519 is supported"),
};
let kademlia =
Kademlia::with_config(kp.clone(), local_peer_id, store, kademlia_config, trust);
let ping = Ping::new(PingConfig::new().with_keep_alive(true)); let ping = Ping::new(PingConfig::new().with_keep_alive(true));
@ -184,7 +207,9 @@ fn build_transport(keypair: Keypair) -> Boxed<(PeerId, StreamMuxerBox), impl Err
let global_only_tcp = global_only::GlobalIpOnly::new(tcp); let global_only_tcp = global_only::GlobalIpOnly::new(tcp);
let transport = dns::DnsConfig::new(global_only_tcp).unwrap(); let transport = dns::DnsConfig::new(global_only_tcp).unwrap();
let noise_keypair = noise::Keypair::new().into_authentic(&keypair).unwrap(); let noise_keypair = noise::Keypair::<noise::X25519>::new()
.into_authentic(&keypair)
.unwrap();
let noise_config = noise::NoiseConfig::ix(noise_keypair); let noise_config = noise::NoiseConfig::ix(noise_keypair);
let secio_config = secio::SecioConfig::new(keypair).max_frame_len(1024 * 1024); let secio_config = secio::SecioConfig::new(keypair).max_frame_len(1024 * 1024);