Merge branch 'master' into fluence_master

# Conflicts:
#	CHANGELOG.md
#	Cargo.toml
#	core/Cargo.toml
#	misc/core-derive/Cargo.toml
#	misc/peer-id-generator/Cargo.toml
#	muxers/mplex/Cargo.toml
#	muxers/yamux/Cargo.toml
#	protocols/deflate/Cargo.toml
#	protocols/floodsub/Cargo.toml
#	protocols/gossipsub/Cargo.toml
#	protocols/identify/Cargo.toml
#	protocols/kad/Cargo.toml
#	protocols/kad/src/behaviour.rs
#	protocols/kad/src/behaviour/test.rs
#	protocols/mdns/Cargo.toml
#	protocols/noise/Cargo.toml
#	protocols/ping/Cargo.toml
#	protocols/plaintext/Cargo.toml
#	protocols/pnet/Cargo.toml
#	protocols/secio/Cargo.toml
#	swarm/Cargo.toml
#	swarm/src/lib.rs
#	transports/dns/Cargo.toml
#	transports/tcp/Cargo.toml
#	transports/uds/Cargo.toml
#	transports/wasm-ext/Cargo.toml
#	transports/websocket/Cargo.toml
This commit is contained in:
folex
2020-04-20 19:29:58 +03:00
38 changed files with 507 additions and 257 deletions

View File

@ -2,7 +2,7 @@
name = "libp2p-deflate"
edition = "2018"
description = "Deflate encryption protocol for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,11 +11,11 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-core = { version = "0.18.0", path = "../../core" }
flate2 = "1.0"
[dev-dependencies]
async-std = "1.0"
libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" }
libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" }
rand = "0.7"
quickcheck = "0.9"

View File

@ -2,7 +2,7 @@
name = "libp2p-floodsub"
edition = "2018"
description = "Floodsub protocol for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"]
cuckoofilter = "0.3.2"
fnv = "1.0"
futures = "0.3.1"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-swarm = { version = "0.17.0", path = "../../swarm" }
libp2p-core = { version = "0.18.0", path = "../../core" }
libp2p-swarm = { version = "0.18.0", path = "../../swarm" }
prost = "0.6.1"
rand = "0.7"
smallvec = "1.0"

View File

@ -18,8 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::topic::Topic;
use crate::FloodsubConfig;
use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet;
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
@ -43,8 +44,7 @@ pub struct Floodsub {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
/// Peer id of the local node. Used for the source of the messages that we publish.
local_peer_id: PeerId,
config: FloodsubConfig,
/// List of peers to send messages to.
target_peers: FnvHashSet<PeerId>,
@ -64,11 +64,16 @@ pub struct Floodsub {
}
impl Floodsub {
/// Creates a `Floodsub`.
/// Creates a `Floodsub` with default configuration.
pub fn new(local_peer_id: PeerId) -> Self {
Self::from_config(FloodsubConfig::new(local_peer_id))
}
/// Creates a `Floodsub` with the given configuration.
pub fn from_config(config: FloodsubConfig) -> Self {
Floodsub {
events: VecDeque::new(),
local_peer_id,
config,
target_peers: FnvHashSet::default(),
connected_peers: HashMap::new(),
subscribed_topics: SmallVec::new(),
@ -190,7 +195,7 @@ impl Floodsub {
fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
let message = FloodsubMessage {
source: self.local_peer_id.clone(),
source: self.config.local_peer_id.clone(),
data: data.into(),
// If the sequence numbers are predictable, then an attacker could flood the network
// with packets with the predetermined sequence numbers and absorb our legitimate
@ -202,6 +207,10 @@ impl Floodsub {
let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
if self_subscribed {
self.received.add(&message);
if self.config.subscribe_local_messages {
self.events.push_back(
NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
}
}
// Don't publish the message if we have to check subscriptions
// and we're not subscribed ourselves to any of the topics.
@ -228,7 +237,7 @@ impl Floodsub {
}
impl NetworkBehaviour for Floodsub {
type ProtocolsHandler = OneShotHandler<FloodsubConfig, FloodsubRpc, InnerMessage>;
type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
type OutEvent = FloodsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@ -21,6 +21,8 @@
//! Implements the floodsub protocol, see also the:
//! [spec](https://github.com/libp2p/specs/tree/master/pubsub).
use libp2p_core::PeerId;
pub mod protocol;
mod layer;
@ -33,3 +35,22 @@ mod rpc_proto {
pub use self::layer::{Floodsub, FloodsubEvent};
pub use self::protocol::{FloodsubMessage, FloodsubRpc};
pub use self::topic::Topic;
/// Configuration options for the Floodsub protocol.
pub struct FloodsubConfig {
/// Peer id of the local node. Used for the source of the messages that we publish.
pub local_peer_id: PeerId,
/// `true` if messages published by local node should be propagated as messages received from
/// the network, `false` by default.
pub subscribe_local_messages: bool,
}
impl FloodsubConfig {
pub fn new(local_peer_id: PeerId) -> Self {
Self {
local_peer_id,
subscribe_local_messages: false
}
}
}

View File

@ -27,16 +27,16 @@ use futures::{Future, io::{AsyncRead, AsyncWrite}};
/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone, Default)]
pub struct FloodsubConfig {}
pub struct FloodsubProtocol {}
impl FloodsubConfig {
/// Builds a new `FloodsubConfig`.
pub fn new() -> FloodsubConfig {
FloodsubConfig {}
impl FloodsubProtocol {
/// Builds a new `FloodsubProtocol`.
pub fn new() -> FloodsubProtocol {
FloodsubProtocol {}
}
}
impl UpgradeInfo for FloodsubConfig {
impl UpgradeInfo for FloodsubProtocol {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
@ -45,7 +45,7 @@ impl UpgradeInfo for FloodsubConfig {
}
}
impl<TSocket> InboundUpgrade<TSocket> for FloodsubConfig
impl<TSocket> InboundUpgrade<TSocket> for FloodsubProtocol
where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{

View File

@ -2,7 +2,7 @@
name = "libp2p-gossipsub"
edition = "2018"
description = "Gossipsub protocol for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Age Manning <Age@AgeManning.com>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -10,8 +10,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
libp2p-swarm = { version = "0.17.0", path = "../../swarm" }
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-swarm = { version = "0.18.0", path = "../../swarm" }
libp2p-core = { version = "0.18.0", path = "../../core" }
bytes = "0.5.4"
byteorder = "1.3.2"
fnv = "1.0.6"
@ -30,8 +30,8 @@ prost = "0.6.1"
[dev-dependencies]
async-std = "1.4.0"
env_logger = "0.7.1"
libp2p-plaintext = { version = "0.17.0", path = "../plaintext" }
libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" }
libp2p-plaintext = { version = "0.18.0", path = "../plaintext" }
libp2p-yamux = { version = "0.18.0", path = "../../muxers/yamux" }
quickcheck = "0.9.2"
[build-dependencies]

View File

@ -2,7 +2,7 @@
name = "libp2p-identify"
edition = "2018"
description = "Nodes identifcation protocol for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-swarm = { version = "0.17.0", path = "../../swarm" }
libp2p-core = { version = "0.18.0", path = "../../core" }
libp2p-swarm = { version = "0.18.0", path = "../../swarm" }
log = "0.4.1"
prost = "0.6.1"
smallvec = "1.0"
@ -20,9 +20,9 @@ wasm-timer = "0.2"
[dev-dependencies]
async-std = "1.0"
libp2p-mplex = { version = "0.17.0", path = "../../muxers/mplex" }
libp2p-secio = { version = "0.17.0", path = "../../protocols/secio" }
libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" }
libp2p-mplex = { version = "0.18.0", path = "../../muxers/mplex" }
libp2p-secio = { version = "0.18.0", path = "../../protocols/secio" }
libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" }
[build-dependencies]
prost-build = "0.6"

View File

@ -2,7 +2,7 @@
name = "libp2p-kad"
edition = "2018"
description = "Kademlia protocol for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -17,8 +17,8 @@ fnv = "1.0"
futures_codec = "0.3.4"
futures = "0.3.1"
log = "0.4"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-swarm = { version = "0.17.0", path = "../../swarm" }
libp2p-core = { version = "0.18.0", path = "../../core" }
libp2p-swarm = { version = "0.18.0", path = "../../swarm" }
multihash = "0.10"
prost = "0.6.1"
rand = "0.7.2"
@ -34,8 +34,8 @@ derivative = "2.0.2"
trust-graph = { git = "ssh://git@github.com/fluencelabs/arqada.git", branch = "master" }
[dev-dependencies]
libp2p-secio = { version = "0.17.0", path = "../secio" }
libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" }
libp2p-secio = { version = "0.18.0", path = "../secio" }
libp2p-yamux = { version = "0.18.0", path = "../../muxers/yamux" }
quickcheck = "0.9.0"
[build-dependencies]

View File

@ -1426,13 +1426,7 @@ where
None => None
};
// If no record is found, at least report known closer peers.
let closer_peers =
if record.is_none() {
self.find_closest(&kbucket::Key::new(key), &source)
} else {
Vec::new()
};
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: source,

View File

@ -22,7 +22,7 @@
use super::*;
use crate::K_VALUE;
use crate::{ALPHA_VALUE, K_VALUE};
use crate::kbucket::Distance;
use crate::record::store::MemoryStore;
use futures::{
@ -35,7 +35,7 @@ use libp2p_core::{
Transport,
identity,
transport::MemoryTransport,
multiaddr::{Protocol, multiaddr},
multiaddr::{Protocol, Multiaddr, multiaddr},
muxing::StreamMuxerBox,
upgrade
};
@ -50,64 +50,86 @@ use libp2p_core::identity::ed25519;
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
fn build_node() -> (Multiaddr, TestSwarm) {
build_node_with_config(Default::default())
}
fn build_node_with_config(cfg: KademliaConfig) -> (Ed25519::Keypair, Multiaddr, TestSwarm) {
let ed25519_key = ed25519::Keypair::generate();
let local_key = identity::Keypair::Ed25519(ed25519_key.clone());
let local_public_key = local_key.public();
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(local_key))
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();
let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let trust = TrustGraph::new(Vec::new());
let behaviour = Kademlia::with_config(ed25519_key.clone(), local_id.clone(), store, cfg.clone(), trust);
let mut swarm = Swarm::new(transport, behaviour, local_id);
let address: Multiaddr = Protocol::Memory(random::<u64>()).into();
Swarm::listen_on(&mut swarm, address.clone()).unwrap();
(ed25519_key, address, swarm)
}
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes(num: usize) -> (u64, Vec<(ed25519::Keypair, TestSwarm)>) {
fn build_nodes(num: usize) -> Vec<(Multiaddr, TestSwarm)> {
build_nodes_with_config(num, Default::default())
}
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<(ed25519::Keypair, TestSwarm)>) {
let port_base = 1 + random::<u64>() % (u64::MAX - num as u64);
let mut result: Vec<(ed25519::Keypair, Swarm<_, _>)> = Vec::with_capacity(num);
for _ in 0 .. num {
let ed25519_key = ed25519::Keypair::generate();
let local_key = identity::Keypair::Ed25519(ed25519_key.clone());
let local_public_key = local_key.public();
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(local_key))
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();
let local_id = local_public_key.clone().into_peer_id();
let store = MemoryStore::new(local_id.clone());
let trust = TrustGraph::new(Vec::new());
let behaviour = Kademlia::with_config(ed25519_key.clone(), local_id.clone(), store, cfg.clone(), trust);
result.push((ed25519_key, Swarm::new(transport, behaviour, local_id)));
}
for (i, (_, s)) in result.iter_mut().enumerate() {
Swarm::listen_on(s, Protocol::Memory(port_base + i as u64).into()).unwrap();
}
(port_base, result)
fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> Vec<(Multiaddr, TestSwarm)> {
(0..num).map(|_| build_node_with_config(cfg.clone())).collect()
}
fn build_connected_nodes(total: usize, step: usize) -> (Vec<PeerId>, Vec<(ed25519::Keypair, TestSwarm)>) {
fn build_connected_nodes(total: usize, step: usize) -> Vec<(Multiaddr, TestSwarm)> {
build_connected_nodes_with_config(total, step, Default::default())
}
fn build_connected_nodes_with_config(total: usize, step: usize, cfg: KademliaConfig)
-> (Vec<PeerId>, Vec<(ed25519::Keypair, TestSwarm)>)
-> Vec<(Multiaddr, TestSwarm)>
{
let (port_base, mut swarms) = build_nodes_with_config(total, cfg);
let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect();
let mut swarms = build_nodes_with_config(total, cfg);
let swarm_ids: Vec<_> = swarms.iter()
.map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone()))
.collect();
let mut i = 0;
for (j, peer) in swarm_ids.iter().enumerate().skip(1) {
for (j, (addr, peer_id)) in swarm_ids.iter().enumerate().skip(1) {
if i < swarm_ids.len() {
let public = swarms[i].0.public();
swarms[i].1.add_address(&peer, Protocol::Memory(port_base + j as u64).into(), public);
swarms[i].1.add_address(peer_id, addr.clone(), public);
}
if j % step == 0 {
i += step;
}
}
(swarm_ids, swarms)
swarms
}
fn build_fully_connected_nodes_with_config(total: usize, cfg: KademliaConfig)
-> Vec<(Multiaddr, TestSwarm)>
{
let mut swarms = build_nodes_with_config(total, cfg);
let swarm_addr_and_peer_id: Vec<_> = swarms.iter()
.map(|(addr, swarm)| (addr.clone(), Swarm::local_peer_id(swarm).clone()))
.collect();
for (_addr, swarm) in swarms.iter_mut() {
for (addr, peer) in &swarm_addr_and_peer_id {
swarm.add_address(&peer, addr.clone());
}
}
swarms
}
fn random_multihash() -> Multihash {
@ -118,8 +140,17 @@ fn random_multihash() -> Multihash {
fn bootstrap() {
fn run(rng: &mut impl Rng) {
let num_total = rng.gen_range(2, 20);
let num_group = rng.gen_range(1, num_total);
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, num_group);
// When looking for the closest node to a key, Kademlia considers ALPHA_VALUE nodes to query
// at initialization. If `num_groups` is larger than ALPHA_VALUE the remaining locally known
// nodes will not be considered. Given that no other node is aware of them, they would be
// lost entirely. To prevent the above restrict `num_groups` to be equal or smaller than
// ALPHA_VALUE.
let num_group = rng.gen_range(1, (num_total % ALPHA_VALUE.get()) + 2);
let mut swarms = build_connected_nodes(num_total, num_group).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
swarms[0].1.bootstrap();
@ -170,7 +201,10 @@ fn query_iter() {
fn run(rng: &mut impl Rng) {
let num_total = rng.gen_range(2, 20);
let (swarm_ids, mut swarms) = build_connected_nodes(num_total, 1);
let mut swarms = build_connected_nodes(num_total, 1).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
let swarm_ids: Vec<_> = swarms.iter().map(Swarm::local_peer_id).cloned().collect();
// Ask the first peer in the list to search a random peer. The search should
// propagate forwards through the list of peers.
@ -222,7 +256,9 @@ fn unresponsive_not_returned_direct() {
// Build one node. It contains fake addresses to non-existing nodes. We ask it to find a
// random peer. We make sure that no fake address is returned.
let (_, mut swarms) = build_nodes(1);
let mut swarms = build_nodes(1).into_iter()
.map(|(_a, s)| s)
.collect::<Vec<_>>();
// Add fake addresses.
for _ in 0 .. 10 {
@ -263,18 +299,22 @@ fn unresponsive_not_returned_indirect() {
// non-existing nodes. We ask node #2 to find a random peer. We make sure that no fake address
// is returned.
let (port_base, mut swarms) = build_nodes(2);
let mut swarms = build_nodes(2);
// Add fake addresses to first.
let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone();
for _ in 0 .. 10 {
let public0 = swarms[0].0.public();
swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)], public0);
swarms[0].1.add_address(&PeerId::random(), multiaddr![Udp(10u16)]);
}
// Connect second to first.
let first_peer_id = Swarm::local_peer_id(&swarms[0].1).clone();
let first_address = swarms[0].0.clone();
let public1 = swarms[1].0.public();
swarms[1].1.add_address(&first_peer_id, Protocol::Memory(port_base).into(), public1);
swarms[1].1.add_address(&first_peer_id, first_address);
// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();
// Ask second to search a random value.
let search_target = PeerId::random();
@ -306,14 +346,21 @@ fn unresponsive_not_returned_indirect() {
#[test]
fn get_record_not_found() {
let (port_base, mut swarms) = build_nodes(3);
let mut swarms = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect();
let swarm_ids: Vec<_> = swarms.iter()
.map(|(_addr, swarm)| Swarm::local_peer_id(swarm))
.cloned()
.collect();
let public0 = swarms[0].0.public();
swarms[0].1.add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into(), public0);
let public1 = swarms[1].0.public();
swarms[1].1.add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into(), public1);
let (second, third) = (swarms[1].0.clone(), swarms[2].0.clone());
swarms[0].1.add_address(&swarm_ids[1], second);
swarms[1].1.add_address(&swarm_ids[2], third);
// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>();
let target_key = record::Key::from(random_multihash());
swarms[0].1.get_record(&target_key, Quorum::One);
@ -347,16 +394,35 @@ fn get_record_not_found() {
)
}
/// A node joining a fully connected network via a single bootnode should be able to put a record to
/// the X closest nodes of the network where X is equal to the configured replication factor.
#[test]
fn put_record() {
fn prop(replication_factor: usize, records: Vec<Record>) {
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap();
let num_total = replication_factor.get() * 2;
let num_group = replication_factor.get();
let mut config = KademliaConfig::default();
config.set_replication_factor(replication_factor);
let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config);
let mut swarms = {
let mut fully_connected_swarms = build_fully_connected_nodes_with_config(
num_total - 1,
config.clone(),
);
let mut single_swarm = build_node_with_config(config);
single_swarm.1.add_address(
Swarm::local_peer_id(&fully_connected_swarms[0].1),
fully_connected_swarms[0].0.clone(),
);
let mut swarms = vec![single_swarm];
swarms.append(&mut fully_connected_swarms);
// Drop the swarm addresses.
swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>()
};
let records = records.into_iter()
.take(num_total)
@ -387,7 +453,7 @@ fn put_record() {
Poll::Ready(Some(KademliaEvent::PutRecordResult(res))) |
Poll::Ready(Some(KademliaEvent::RepublishRecordResult(res))) => {
match res {
Err(e) => panic!(e),
Err(e) => panic!("{:?}", e),
Ok(ok) => {
assert!(records.contains_key(&ok.key));
let record = swarm.store.get(&ok.key).unwrap();
@ -417,10 +483,14 @@ fn put_record() {
assert_eq!(r.key, expected.key);
assert_eq!(r.value, expected.value);
assert_eq!(r.expires, expected.expires);
assert_eq!(r.publisher.as_ref(), Some(&swarm_ids[0]));
assert_eq!(r.publisher.as_ref(), Some(Swarm::local_peer_id(&swarms[0])));
let key = kbucket::Key::new(r.key.clone());
let mut expected = swarm_ids.clone().split_off(1);
let mut expected = swarms.iter()
.skip(1)
.map(Swarm::local_peer_id)
.cloned()
.collect::<Vec<_>>();
expected.sort_by(|id1, id2|
kbucket::Key::new(id1.clone()).distance(&key).cmp(
&kbucket::Key::new(id2.clone()).distance(&key)));
@ -430,17 +500,32 @@ fn put_record() {
.take(replication_factor.get())
.collect::<HashSet<_>>();
let actual = swarms.iter().enumerate().skip(1)
.filter_map(|(i, (_, s))|
if s.store.get(key.preimage()).is_some() {
Some(swarm_ids[i].clone())
let actual = swarms.iter()
.skip(1)
.filter_map(|(_, swarm)|
if swarm.store.get(key.preimage()).is_some() {
Some(Swarm::local_peer_id(swarm).clone())
} else {
None
})
.collect::<HashSet<_>>();
assert_eq!(actual.len(), replication_factor.get());
assert_eq!(actual, expected);
let actual_not_expected = actual.difference(&expected)
.collect::<Vec<&PeerId>>();
assert!(
actual_not_expected.is_empty(),
"Did not expect records to be stored on nodes {:?}.",
actual_not_expected,
);
let expected_not_actual = expected.difference(&actual)
.collect::<Vec<&PeerId>>();
assert!(expected_not_actual.is_empty(),
"Expected record to be stored on nodes {:?}.",
expected_not_actual,
);
}
if republished {
@ -461,22 +546,27 @@ fn put_record() {
)
}
QuickCheck::new().tests(3).quickcheck(prop as fn(_,_))
QuickCheck::new().tests(3).quickcheck(prop as fn(_,_) -> _)
}
#[test]
fn get_value() {
let (port_base, mut swarms) = build_nodes(3);
let mut swarms = build_nodes(3);
let swarm_ids: Vec<_> = swarms.iter().map(|(_, s)| s).map(Swarm::local_peer_id).cloned().collect();
// Let first peer know of second peer and second peer know of third peer.
for i in 0..2 {
let (peer_id, address) = (|(_, s)| s).map(Swarm::local_peer_id(&swarms[i+1].1).clone(), swarms[i+1].0.clone());
swarms[i].1.add_address(&peer_id, address);
}
let public0 = swarms[0].0.public();
swarms[0].1.add_address(&swarm_ids[1], Protocol::Memory(port_base + 1).into(), public0);
let public1 = swarms[1].0.public();
swarms[1].1.add_address(&swarm_ids[2], Protocol::Memory(port_base + 2).into(), public1);
// Drop the swarm addresses.
let mut swarms = swarms.into_iter().map(|(_key, _addr, swarm)| swarm).collect::<Vec<_>>();
let record = Record::new(random_multihash(), vec![4,5,6]);
let public0 = swarms[0].0.public();
let public1 = swarms[1].0.public();
swarms[1].1.store.put(record.clone()).unwrap();
swarms[0].1.get_record(&record.key, Quorum::One);
@ -507,7 +597,9 @@ fn get_value() {
fn get_value_many() {
// TODO: Randomise
let num_nodes = 12;
let (_, mut swarms) = build_connected_nodes(num_nodes, num_nodes);
let mut swarms = build_connected_nodes(num_nodes, 3).into_iter()
.map(|(_addr, swarm)| swarm)
.collect::<Vec<_>>();
let num_results = 10;
let record = Record::new(random_multihash(), vec![4,5,6]);
@ -541,17 +633,36 @@ fn get_value_many() {
)
}
/// A node joining a fully connected network via a single bootnode should be able to add itself as a
/// provider to the X closest nodes of the network where X is equal to the configured replication
/// factor.
#[test]
fn add_provider() {
fn prop(replication_factor: usize, keys: Vec<record::Key>) {
let replication_factor = NonZeroUsize::new(replication_factor % (K_VALUE.get() / 2) + 1).unwrap();
let num_total = replication_factor.get() * 2;
let num_group = replication_factor.get();
let mut config = KademliaConfig::default();
config.set_replication_factor(replication_factor);
let (swarm_ids, mut swarms) = build_connected_nodes_with_config(num_total, num_group, config);
let mut swarms = {
let mut fully_connected_swarms = build_fully_connected_nodes_with_config(
num_total - 1,
config.clone(),
);
let mut single_swarm = build_node_with_config(config);
single_swarm.1.add_address(
Swarm::local_peer_id(&fully_connected_swarms[0].1),
fully_connected_swarms[0].0.clone(),
);
let mut swarms = vec![single_swarm];
swarms.append(&mut fully_connected_swarms);
// Drop addresses before returning.
swarms.into_iter().map(|(_addr, swarm)| swarm).collect::<Vec<_>>()
};
let keys: HashSet<_> = keys.into_iter().take(num_total).collect();
@ -605,10 +716,10 @@ fn add_provider() {
// each key was published to the `replication_factor` closest peers.
while let Some(key) = results.pop() {
// Collect the nodes that have a provider record for `key`.
let actual = swarms.iter().enumerate().skip(1)
.filter_map(|(i, (_, s))|
if s.store.providers(&key).len() == 1 {
Some(swarm_ids[i].clone())
let actual = swarms.iter().skip(1)
.filter_map(|(_, swarm)|
if swarm.store.providers(&key).len() == 1 {
Some(Swarm::local_peer_id(&swarm).clone())
} else {
None
})
@ -620,7 +731,11 @@ fn add_provider() {
return Poll::Pending
}
let mut expected = swarm_ids.clone().split_off(1);
let mut expected = swarms.iter()
.skip(1)
.map(Swarm::local_peer_id)
.cloned()
.collect::<Vec<_>>();
let kbucket_key = kbucket::Key::new(key);
expected.sort_by(|id1, id2|
kbucket::Key::new(id1.clone()).distance(&kbucket_key).cmp(
@ -636,8 +751,8 @@ fn add_provider() {
// One round of publishing is complete.
assert!(results.is_empty());
for (_, s) in &swarms {
assert_eq!(s.queries.size(), 0);
for (_, swarm) in &swarms {
assert_eq!(swarm.queries.size(), 0);
}
if republished {
@ -667,19 +782,19 @@ fn add_provider() {
/// arithmetic overflow, see https://github.com/libp2p/rust-libp2p/issues/1290.
#[test]
fn exceed_jobs_max_queries() {
let (_, mut swarms) = build_nodes(1);
let (_addr, mut swarm) = build_node();
let num = JOBS_MAX_QUERIES + 1;
for _ in 0 .. num {
swarms[0].1.bootstrap();
swarm.1.bootstrap();
}
assert_eq!(swarms[0].1.queries.size(), num);
assert_eq!(swarm.1.queries.size(), num);
block_on(
poll_fn(move |ctx| {
for _ in 0 .. num {
// There are no other nodes, so the queries finish instantly.
if let Poll::Ready(Some(e)) = swarms[0].1.poll_next_unpin(ctx) {
if let Poll::Ready(Some(e)) = swarm.1.poll_next_unpin(ctx) {
if let KademliaEvent::BootstrapResult(r) = e {
assert!(r.is_ok(), "Unexpected error")
} else {

View File

@ -112,7 +112,7 @@ impl ClosestPeersIter {
let state = PeerState::NotContacted;
(distance, Peer { key, state })
})
.take(config.num_results));
.take(K_VALUE.into()));
// The iterator initially makes progress by iterating towards the target.
let state = State::Iterating { no_progress : 0 };
@ -401,7 +401,9 @@ impl ClosestPeersIter {
/// k closest nodes it has not already queried".
fn at_capacity(&self) -> bool {
match self.state {
State::Stalled => self.num_waiting >= self.config.num_results,
State::Stalled => self.num_waiting >= usize::max(
self.config.num_results, self.config.parallelism
),
State::Iterating { .. } => self.num_waiting >= self.config.parallelism,
State::Finished => true
}
@ -726,4 +728,49 @@ mod tests {
QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _)
}
#[test]
fn without_success_try_up_to_k_peers() {
fn prop(mut iter: ClosestPeersIter) {
let now = Instant::now();
for _ in 0..(usize::min(iter.closest_peers.len(), K_VALUE.get())) {
match iter.next(now) {
PeersIterState::Waiting(Some(p)) => {
let peer = p.clone().into_owned();
iter.on_failure(&peer);
},
_ => panic!("Expected iterator to yield another peer to query."),
}
}
assert_eq!(PeersIterState::Finished, iter.next(now));
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}
fn stalled_at_capacity() {
fn prop(mut iter: ClosestPeersIter) {
iter.state = State::Stalled;
for i in 0..usize::max(iter.config.parallelism, iter.config.num_results) {
iter.num_waiting = i;
assert!(
!iter.at_capacity(),
"Iterator should not be at capacity if less than \
`max(parallelism, num_results)` requests are waiting.",
)
}
iter.num_waiting = usize::max(iter.config.parallelism, iter.config.num_results);
assert!(
iter.at_capacity(),
"Iterator should be at capacity if `max(parallelism, num_results)` requests are \
waiting.",
)
}
QuickCheck::new().tests(10).quickcheck(prop as fn(_))
}
}

View File

@ -1,7 +1,7 @@
[package]
name = "libp2p-mdns"
edition = "2018"
version = "0.17.0"
version = "0.18.0"
description = "Implementation of the libp2p mDNS discovery method"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
@ -16,8 +16,8 @@ dns-parser = "0.8"
either = "1.5.3"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-swarm = { version = "0.17.0", path = "../../swarm" }
libp2p-core = { version = "0.18.0", path = "../../core" }
libp2p-swarm = { version = "0.18.0", path = "../../swarm" }
log = "0.4"
net2 = "0.2"
rand = "0.7"

View File

@ -1,7 +1,7 @@
[package]
name = "libp2p-noise"
description = "Cryptographic handshake protocol using the noise framework."
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,7 +11,7 @@ edition = "2018"
curve25519-dalek = "2.0.0"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-core = { version = "0.18.0", path = "../../core" }
log = "0.4"
prost = "0.6.1"
rand = "0.7.2"
@ -28,7 +28,7 @@ snow = { version = "0.6.1", features = ["default-resolver"], default-features =
[dev-dependencies]
env_logger = "0.7.1"
libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" }
libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" }
quickcheck = "0.9.0"
sodiumoxide = "^0.2.5"

View File

@ -2,7 +2,7 @@
name = "libp2p-ping"
edition = "2018"
description = "Ping protocol for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-swarm = { version = "0.17.0", path = "../../swarm" }
libp2p-core = { version = "0.18.0", path = "../../core" }
libp2p-swarm = { version = "0.18.0", path = "../../swarm" }
log = "0.4.1"
rand = "0.7.2"
void = "1.0"
@ -20,7 +20,7 @@ wasm-timer = "0.2"
[dev-dependencies]
async-std = "1.0"
libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" }
libp2p-secio = { version = "0.17.0", path = "../../protocols/secio" }
libp2p-yamux = { version = "0.17.0", path = "../../muxers/yamux" }
libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" }
libp2p-secio = { version = "0.18.0", path = "../../protocols/secio" }
libp2p-yamux = { version = "0.18.0", path = "../../muxers/yamux" }
quickcheck = "0.9.0"

View File

@ -2,7 +2,7 @@
name = "libp2p-plaintext"
edition = "2018"
description = "Plaintext encryption dummy protocol for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"]
bytes = "0.5"
futures = "0.3.1"
futures_codec = "0.3.4"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-core = { version = "0.18.0", path = "../../core" }
log = "0.4.8"
prost = "0.6.1"
rw-stream-sink = "0.2.0"

View File

@ -2,7 +2,7 @@
name = "libp2p-pnet"
edition = "2018"
description = "Private swarm support for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -2,7 +2,7 @@
name = "libp2p-secio"
edition = "2018"
description = "Secio encryption protocol for libp2p"
version = "0.17.0"
version = "0.18.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
@ -16,7 +16,7 @@ ctr = "0.3"
futures = "0.3.1"
hmac = "0.7.0"
lazy_static = "1.2.0"
libp2p-core = { version = "0.17.0", path = "../../core" }
libp2p-core = { version = "0.18.0", path = "../../core" }
log = "0.4.6"
prost = "0.6.1"
pin-project = "0.4.6"
@ -48,8 +48,8 @@ aes-all = ["aesni"]
[dev-dependencies]
async-std = "1.0"
criterion = "0.3"
libp2p-mplex = { version = "0.17.0", path = "../../muxers/mplex" }
libp2p-tcp = { version = "0.17.0", path = "../../transports/tcp" }
libp2p-mplex = { version = "0.18.0", path = "../../muxers/mplex" }
libp2p-tcp = { version = "0.18.0", path = "../../transports/tcp" }
[[bench]]
name = "bench"