Gossipsub Protocol (#898)

* Create gossipsub crate - Basic template, borrowed from floodsub

* Add a GossipsubConfig struct and set up basic structures in the Gossipsub struct

* Begin implementation of join. Adds get_random_peers helper function and adds tests

* Implements gossipsub leave()

* Update publishMany to incorporate gossipsub mesh and fanout logic

* Use the gossipsub mesh for determining peer subscription

* Remove subscribed_topics field from the Gossipsub struct

* Rename gossipsubconfig to ProtocolConfig

* Implement the gossipsub control messages into the Codec's Encode/Decode and modifies GossipsubRpc

* Modify GossipsubActions to enums for succinctness.

* Modify the memcache to store Gossipsub messages

* Implement control message handling.

* Update control message handling to handle multiple messages.

* Handle received gossipsub messages using pre-built handlers.

* Remove excess connected peer hashmap

* Add extra peer mapping and consistent topic naming.

* Implement heartbeat, emit_gossip and send_graft_prune.

* Group logic in forwarding messages. Add messages to memcache.

* Add heartbeat timer and move location of helper function.

* Add gossipsub the libp2p workspace, makes layer structs public

* Add logging to gossipsub

- Adds the log crate and implements logging macros
- Specifies versions for external crates

* Add example chat for debugging purposes

* Implement #868 for gossipsub.

* Add rust documentation to gossipsub crate.

- Adds basic documentation, overview and examples to the gossipsub
crate.

* Re-introduce the initial heartbeat time config.

This commit also adds the inject_connected test.

* Add subscribe tests.

- Modifies `handle_received_subscriptions` to take a reference of
subscriptions
- Adds `test_subscribe`
- Adds `test_handle_received_subscriptions`
- Adds tests for the filter in `get_random_peers`

* Add Bug fixes and further testing for gossipsub.

- Corrects the tuple use of topic_hashes
- Corrects JOIN logic around fanout and adding peers to the mesh
- Adds test_unsubscribe
- Adds test_join

* Rename GossipsubMessage::msg_id -> id

* Add bug fix for handling disconnected peers.

* Implements (partially) #889 for Gossipsub.

* handle_iwant event count tests

* handle_ihave event count tests

* Move layer.rs tests into separate file.

* Implement clippy suggestions for gossipsub.

* Modify control message tests for specific types.

* Implement builder pattern for GossipsubConfig.

As suggested by @twittner - The builder pattern for building
GossipsubConfig struct is implemented.

* Package version updates as suggested by @twittner.

* Correct line lengths in gossipsub.

* Correct braces in  found by @twittner.

* Implement @twittner's suggestions.

- Uses `HashSet` where applicable
- Update `FnvHashMap` to standard `HashMap`
- Uses `min` function in code simplification.

* Add NodeList struct to clarify topic_peers.

* Cleaner handling of messagelist

Co-Authored-By: AgeManning <Age@AgeManning.com>

* Cleaner handling of added peers.

Co-Authored-By: AgeManning <Age@AgeManning.com>

* handle_prune peer removed test

* basic grafting tests

* multiple topic grafting test

* Convert &vec to slice.

Co-Authored-By: AgeManning <Age@AgeManning.com>

* Convert to lazy insert.

Co-Authored-By: AgeManning <Age@AgeManning.com>

* Cleaner topic handling.

Co-Authored-By: AgeManning <Age@AgeManning.com>

* control pool piggybacking

using HashMap.drain() in control_pool_flush

going to squash this

* Add Debug derives to gossipsub and correct tests.

* changes from PR

squash this

all tests passing, but still some that need to be reconsidered

test reform

* Implements Arc for GossipsubRpc events

* Remove support for floodsub nodes

* Reconnected to disconnected peers, to mitigate timeout

* Use ReadOne WriteOne with configurable max gossip sizes

* Remove length delimination from RPC encoding

* Prevent peer duplication in mesh

* Allow oneshot handler's inactivity_timeout to be configurable

* Correct peer duplication in mesh bug

* Remove auto-reconnect to allow for user-level disconnects

* Single long-lived inbound/outbound streams to match go implementation

* Allow gossipsub topics to be optionally hashable

* Improves gossipsub stream handling

- Corrects the handler's keep alive.
- Correct the chat example.
- Instantly add peers to the mesh on subscription if the mesh is low.

* Allows message validation in gossipsub

* Replaces Cuckoofilter with LRUCache

The false positive rate was unacceptable for rejecting messages.

* Renames configuration parameter and corrects logic

* Removes peer from fanout on disconnection

* Add publish and fanout tests

* Apply @mxinden suggestions

* Resend message if outbound stream negotiated

- Downgrades log warnings

* Implement further reviewer suggestions

- Created associated functions to avoid unnecessary cloning
- Messages are rejected if their sequence numbers are not u64
- `GossipsbuConfigBuilder` has the same defaults as `GossipsubConfig`
- Miscellaneous typos

* Add MessageId type and remove unnecessary comments

* Add a return value to propagate_message function

* Adds user-customised gossipsub message ids

* Adds the message id to GossipsubEvent

* Implement Debug for GossipsubConfig

* protocols/gossipsub: Add basic smoke test

Implement a basic smoke test that:

1. Builds a fully connected graph of size N.

2. Subscribes each node to the same topic.

3. Publishes a single message.

4. Waits for all nodes to receive the above message.

N and the structure of the graph are reproducibly randomized via
Quickcheck.

* Corrections pointed out by @mxinden

* Add option to remove source id publishing

* protocols/gossipsub/tests/smoke: Remove unused variable

* Merge latest master

* protocols/gossipsub: Move to stable futures

* examples/gossipsub-chat.rs: Move to stable futures

* protocols/gossipsub/src/behaviour/tests: Update to stable futures

* protocols/gossipsub/tests: Update to stable futures

* protocols/gossipsub: Log substream errors

* protocols/gossipsub: Log outbound substream errors

* Remove rust-fmt formatting

* Shift to prost for protobuf compiling

* Use wasm_timer for wasm compatibility

Co-authored-by: Grant Wuerker <gwuerker@gmail.com>
Co-authored-by: Toralf Wittner <tw@dtex.org>
Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Age Manning
2020-01-25 02:16:02 +11:00
committed by Pierre Krieger
parent 0cb3cd4262
commit 37c7d73b11
15 changed files with 4160 additions and 0 deletions

View File

@ -23,6 +23,7 @@ libp2p-mplex = { version = "0.14.0-alpha.1", path = "muxers/mplex" }
libp2p-identify = { version = "0.14.0-alpha.1", path = "protocols/identify" }
libp2p-kad = { version = "0.14.0-alpha.1", path = "protocols/kad" }
libp2p-floodsub = { version = "0.14.0-alpha.1", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.14.0-alpha.1", path = "./protocols/gossipsub" }
libp2p-ping = { version = "0.14.0-alpha.1", path = "protocols/ping" }
libp2p-plaintext = { version = "0.14.0-alpha.1", path = "protocols/plaintext" }
libp2p-core = { version = "0.14.0-alpha.1", path = "core" }
@ -62,6 +63,7 @@ members = [
"muxers/mplex",
"muxers/yamux",
"protocols/floodsub",
"protocols/gossipsub",
"protocols/identify",
"protocols/kad",
"protocols/noise",

154
examples/gossipsub-chat.rs Normal file
View File

@ -0,0 +1,154 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! A basic chat application with logs demonstrating libp2p and the gossipsub protocol.
//!
//! Using two terminal windows, start two instances. Type a message in either terminal and hit return: the
//! message is sent and printed in the other terminal. Close with Ctrl-c.
//!
//! You can of course open more terminal windows and add more participants.
//! Dialing any of the other peers will propagate the new participant to all
//! chat members and everyone will receive all messages.
//!
//! In order to get the nodes to connect, take note of the listening address of the first
//! instance and start the second with this address as the first argument. In the first terminal
//! window, run:
//!
//! ```sh
//! cargo run --example chat
//! ```
//!
//! It will print the PeerId and the listening address, e.g. `Listening on
//! "/ip4/0.0.0.0/tcp/24915"`
//!
//! In the second terminal window, start a new instance of the example with:
//!
//! ```sh
//! cargo run --example chat -- /ip4/127.0.0.1/tcp/24915
//! ```
//!
//! The two nodes should then connect.
use async_std::{io, task};
use env_logger::{Builder, Env};
use futures::prelude::*;
use libp2p::gossipsub::protocol::MessageId;
use libp2p::gossipsub::{GossipsubEvent, GossipsubMessage, Topic};
use libp2p::{
gossipsub, identity,
PeerId,
};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use std::{error::Error, task::{Context, Poll}};
fn main() -> Result<(), Box<dyn Error>> {
Builder::from_env(Env::default().default_filter_or("info")).init();
// Create a random PeerId
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
println!("Local peer id: {:?}", local_peer_id);
// Set up an encrypted TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::build_development_transport(local_key)?;
// Create a Gossipsub topic
let topic = Topic::new("test-net".into());
// Create a Swarm to manage peers and events
let mut swarm = {
// to set default parameters for gossipsub use:
// let gossipsub_config = gossipsub::GossipsubConfig::default();
// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &GossipsubMessage| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
MessageId(s.finish().to_string())
};
// set custom gossipsub
let gossipsub_config = gossipsub::GossipsubConfigBuilder::new()
.heartbeat_interval(Duration::from_secs(10))
.message_id_fn(message_id_fn) // content-address messages. No two messages of the
//same content will be propagated.
.build();
// build a gossipsub network behaviour
let mut gossipsub = gossipsub::Gossipsub::new(local_peer_id.clone(), gossipsub_config);
gossipsub.subscribe(topic.clone());
libp2p::Swarm::new(transport, gossipsub, local_peer_id)
};
// Listen on all interfaces and whatever port the OS assigns
libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
// Reach out to another node if specified
if let Some(to_dial) = std::env::args().nth(1) {
let dialing = to_dial.clone();
match to_dial.parse() {
Ok(to_dial) => match libp2p::Swarm::dial_addr(&mut swarm, to_dial) {
Ok(_) => println!("Dialed {:?}", dialing),
Err(e) => println!("Dial {:?} failed: {:?}", dialing, e),
},
Err(err) => println!("Failed to parse address to dial: {:?}", err),
}
}
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
// Kick it off
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
};
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(gossip_event)) => match gossip_event {
GossipsubEvent::Message(peer_id, id, message) => println!(
"Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(&message.data),
id,
peer_id
),
_ => {}
},
Poll::Ready(None) | Poll::Pending => break,
}
}
if !listening {
for addr in libp2p::Swarm::listeners(&swarm) {
println!("Listening on {:?}", addr);
listening = true;
}
}
Poll::Pending
}))
}

View File

@ -0,0 +1,35 @@
[package]
name = "libp2p-gossipsub"
edition = "2018"
version = "0.14.0-alpha.1"
authors = ["Age Manning <Age@AgeManning.com>"]
license = "MIT"
[dependencies]
libp2p-swarm = { version = "0.4.0-alpha.1", path = "../../swarm" }
libp2p-core = { version = "0.14.0-alpha.1", path = "../../core" }
bs58 = "0.3.0"
bytes = "0.5.4"
byteorder = "1.3.2"
fnv = "1.0.6"
futures = "0.3.1"
rand = "0.7.3"
futures_codec = "0.3.4"
wasm-timer = "0.2.4"
unsigned-varint = { version = "0.3.0", features = ["futures-codec"] }
log = "0.4.8"
sha2 = "0.8.1"
base64 = "0.11.0"
lru = "0.4.3"
smallvec = "1.1.0"
prost = "0.6.1"
[dev-dependencies]
async-std = "1.4.0"
env_logger = "0.7.1"
libp2p-plaintext = { version = "0.14.0-alpha.1", path = "../plaintext" }
libp2p-yamux = { version = "0.14.0-alpha.1", path = "../../muxers/yamux" }
quickcheck = "0.9.2"
[build-dependencies]
prost-build = "0.6"

View File

@ -0,0 +1,24 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
fn main() {
prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap();
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,864 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
// collection of tests for the gossipsub network behaviour
#[cfg(test)]
mod tests {
use super::super::*;
use async_std::net::TcpStream;
// helper functions for testing
// This function generates `peer_no` random PeerId's, subscribes to `topics` and subscribes the
// injected nodes to all topics if `to_subscribe` is set. All nodes are considered gossipsub nodes.
fn build_and_inject_nodes(
peer_no: usize,
topics: Vec<String>,
to_subscribe: bool,
) -> (
Gossipsub<TcpStream>,
Vec<PeerId>,
Vec<TopicHash>,
) {
// generate a default GossipsubConfig
let gs_config = GossipsubConfig::default();
// create a gossipsub struct
let mut gs: Gossipsub<TcpStream> = Gossipsub::new(PeerId::random(), gs_config);
let mut topic_hashes = vec![];
// subscribe to the topics
for t in topics {
let topic = Topic::new(t);
gs.subscribe(topic.clone());
topic_hashes.push(topic.no_hash().clone());
}
// build and connect peer_no random peers
let mut peers = vec![];
let dummy_connected_point = ConnectedPoint::Dialer {
address: "/ip4/0.0.0.0/tcp/0".parse().unwrap(),
};
for _ in 0..peer_no {
let peer = PeerId::random();
peers.push(peer.clone());
<Gossipsub<TcpStream> as NetworkBehaviour>::inject_connected(
&mut gs,
peer.clone(),
dummy_connected_point.clone(),
);
if to_subscribe {
gs.handle_received_subscriptions(
&topic_hashes
.iter()
.cloned()
.map(|t| GossipsubSubscription {
action: GossipsubSubscriptionAction::Subscribe,
topic_hash: t,
})
.collect::<Vec<_>>(),
&peer,
);
};
}
return (gs, peers, topic_hashes);
}
#[test]
/// Test local node subscribing to a topic
fn test_subscribe() {
// The node should:
// - Create an empty vector in mesh[topic]
// - Send subscription request to all peers
// - run JOIN(topic)
let subscribe_topic = vec![String::from("test_subscribe")];
let (gs, _, topic_hashes) = build_and_inject_nodes(20, subscribe_topic, true);
assert!(
gs.mesh.get(&topic_hashes[0]).is_some(),
"Subscribe should add a new entry to the mesh[topic] hashmap"
);
// collect all the subscriptions
let subscriptions =
gs.events
.iter()
.fold(vec![], |mut collected_subscriptions, e| match e {
NetworkBehaviourAction::SendEvent { peer_id: _, event } => {
for s in &event.subscriptions {
match s.action {
GossipsubSubscriptionAction::Subscribe => {
collected_subscriptions.push(s.clone())
}
_ => {}
};
}
collected_subscriptions
}
_ => collected_subscriptions,
});
// we sent a subscribe to all known peers
assert!(
subscriptions.len() == 20,
"Should send a subscription to all known peers"
);
}
#[test]
/// Test unsubscribe.
fn test_unsubscribe() {
// Unsubscribe should:
// - Remove the mesh entry for topic
// - Send UNSUBSCRIBE to all known peers
// - Call Leave
let topic_strings = vec![String::from("topic1"), String::from("topic2")];
let topics = topic_strings
.iter()
.map(|t| Topic::new(t.clone()))
.collect::<Vec<Topic>>();
// subscribe to topic_strings
let (mut gs, _, topic_hashes) = build_and_inject_nodes(20, topic_strings, true);
for topic_hash in &topic_hashes {
assert!(
gs.topic_peers.get(&topic_hash).is_some(),
"Topic_peers contain a topic entry"
);
assert!(
gs.mesh.get(&topic_hash).is_some(),
"mesh should contain a topic entry"
);
}
// unsubscribe from both topics
assert!(
gs.unsubscribe(topics[0].clone()),
"should be able to unsubscribe successfully from each topic",
);
assert!(
gs.unsubscribe(topics[1].clone()),
"should be able to unsubscribe successfully from each topic",
);
let subscriptions =
gs.events
.iter()
.fold(vec![], |mut collected_subscriptions, e| match e {
NetworkBehaviourAction::SendEvent { peer_id: _, event } => {
for s in &event.subscriptions {
match s.action {
GossipsubSubscriptionAction::Unsubscribe => {
collected_subscriptions.push(s.clone())
}
_ => {}
};
}
collected_subscriptions
}
_ => collected_subscriptions,
});
// we sent a unsubscribe to all known peers, for two topics
assert!(
subscriptions.len() == 40,
"Should send an unsubscribe event to all known peers"
);
// check we clean up internal structures
for topic_hash in &topic_hashes {
assert!(
gs.mesh.get(&topic_hash).is_none(),
"All topics should have been removed from the mesh"
);
}
}
#[test]
/// Test JOIN(topic) functionality.
fn test_join() {
// The Join function should:
// - Remove peers from fanout[topic]
// - Add any fanout[topic] peers to the mesh (up to mesh_n)
// - Fill up to mesh_n peers from known gossipsub peers in the topic
// - Send GRAFT messages to all nodes added to the mesh
// This test is not an isolated unit test, rather it uses higher level,
// subscribe/unsubscribe to perform the test.
let topic_strings = vec![String::from("topic1"), String::from("topic2")];
let topics = topic_strings
.iter()
.map(|t| Topic::new(t.clone()))
.collect::<Vec<Topic>>();
let (mut gs, _, topic_hashes) = build_and_inject_nodes(20, topic_strings, true);
// unsubscribe, then call join to invoke functionality
assert!(
gs.unsubscribe(topics[0].clone()),
"should be able to unsubscribe successfully"
);
assert!(
gs.unsubscribe(topics[1].clone()),
"should be able to unsubscribe successfully"
);
// re-subscribe - there should be peers associated with the topic
assert!(
gs.subscribe(topics[0].clone()),
"should be able to subscribe successfully"
);
// should have added mesh_n nodes to the mesh
assert!(
gs.mesh.get(&topic_hashes[0]).unwrap().len() == 6,
"Should have added 6 nodes to the mesh"
);
// there should be mesh_n GRAFT messages.
let graft_messages =
gs.control_pool
.iter()
.fold(vec![], |mut collected_grafts, (_, controls)| {
for c in controls.iter() {
match c {
GossipsubControlAction::Graft { topic_hash: _ } => {
collected_grafts.push(c.clone())
}
_ => {}
}
}
collected_grafts
});
assert_eq!(
graft_messages.len(),
6,
"There should be 6 grafts messages sent to peers"
);
// verify fanout nodes
// add 3 random peers to the fanout[topic1]
gs.fanout.insert(topic_hashes[1].clone(), vec![]);
let new_peers = vec![];
for _ in 0..3 {
let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap();
fanout_peers.push(PeerId::random());
}
// subscribe to topic1
gs.subscribe(topics[1].clone());
// the three new peers should have been added, along with 3 more from the pool.
assert!(
gs.mesh.get(&topic_hashes[1]).unwrap().len() == 6,
"Should have added 6 nodes to the mesh"
);
let mesh_peers = gs.mesh.get(&topic_hashes[1]).unwrap();
for new_peer in new_peers {
assert!(
mesh_peers.contains(new_peer),
"Fanout peer should be included in the mesh"
);
}
// there should now be 12 graft messages to be sent
let graft_messages =
gs.control_pool
.iter()
.fold(vec![], |mut collected_grafts, (_, controls)| {
for c in controls.iter() {
match c {
GossipsubControlAction::Graft { topic_hash: _ } => {
collected_grafts.push(c.clone())
}
_ => {}
}
}
collected_grafts
});
assert!(
graft_messages.len() == 12,
"There should be 12 grafts messages sent to peers"
);
}
/// Test local node publish to subscribed topic
#[test]
fn test_publish() {
// node should:
// - Send publish message to all peers
// - Insert message into gs.mcache and gs.received
let publish_topic = String::from("test_publish");
let (mut gs, _, topic_hashes) =
build_and_inject_nodes(20, vec![publish_topic.clone()], true);
assert!(
gs.mesh.get(&topic_hashes[0]).is_some(),
"Subscribe should add a new entry to the mesh[topic] hashmap"
);
// publish on topic
let publish_data = vec![0; 42];
gs.publish(&Topic::new(publish_topic), publish_data);
// Collect all publish messages
let publishes = gs
.events
.iter()
.fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::SendEvent { peer_id: _, event } => {
for s in &event.messages {
collected_publish.push(s.clone());
}
collected_publish
}
_ => collected_publish,
});
let msg_id =
(gs.config.message_id_fn)(&publishes.first().expect("Should contain > 0 entries"));
assert!(
publishes.len() == 20,
"Should send a publish message to all known peers"
);
assert!(
gs.mcache.get(&msg_id).is_some(),
"Message cache should contain published message"
);
assert!(
gs.received.get(&msg_id).is_some(),
"Received cache should contain published message"
);
}
/// Test local node publish to unsubscribed topic
#[test]
fn test_fanout() {
// node should:
// - Populate fanout peers
// - Send publish message to fanout peers
// - Insert message into gs.mcache and gs.received
let fanout_topic = String::from("test_fanout");
let (mut gs, _, topic_hashes) =
build_and_inject_nodes(20, vec![fanout_topic.clone()], true);
assert!(
gs.mesh.get(&topic_hashes[0]).is_some(),
"Subscribe should add a new entry to the mesh[topic] hashmap"
);
// Unsubscribe from topic
assert!(
gs.unsubscribe(Topic::new(fanout_topic.clone())),
"should be able to unsubscribe successfully from topic"
);
// Publish on unsubscribed topic
let publish_data = vec![0; 42];
gs.publish(&Topic::new(fanout_topic.clone()), publish_data);
assert_eq!(
gs.fanout
.get(&TopicHash::from_raw(fanout_topic.clone()))
.unwrap()
.len(),
gs.config.mesh_n,
"Fanout should contain `mesh_n` peers for fanout topic"
);
// Collect all publish messages
let publishes = gs
.events
.iter()
.fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::SendEvent { peer_id: _, event } => {
for s in &event.messages {
collected_publish.push(s.clone());
}
collected_publish
}
_ => collected_publish,
});
let msg_id =
(gs.config.message_id_fn)(&publishes.first().expect("Should contain > 0 entries"));
assert_eq!(
publishes.len(),
gs.config.mesh_n,
"Should send a publish message to `mesh_n` fanout peers"
);
assert!(
gs.mcache.get(&msg_id).is_some(),
"Message cache should contain published message"
);
assert!(
gs.received.get(&msg_id).is_some(),
"Received cache should contain published message"
);
}
#[test]
/// Test the gossipsub NetworkBehaviour peer connection logic.
fn test_inject_connected() {
let (gs, peers, topic_hashes) = build_and_inject_nodes(
20,
vec![String::from("topic1"), String::from("topic2")],
true,
);
// check that our subscriptions are sent to each of the peers
// collect all the SendEvents
let send_events: Vec<&NetworkBehaviourAction<Arc<GossipsubRpc>, GossipsubEvent>> = gs
.events
.iter()
.filter(|e| match e {
NetworkBehaviourAction::SendEvent {
peer_id: _,
event: _,
} => true,
_ => false,
})
.collect();
// check that there are two subscriptions sent to each peer
for sevent in send_events.clone() {
match sevent {
NetworkBehaviourAction::SendEvent { peer_id: _, event } => {
assert!(
event.subscriptions.len() == 2,
"There should be two subscriptions sent to each peer (1 for each topic)."
);
}
_ => {}
};
}
// check that there are 20 send events created
assert!(
send_events.len() == 20,
"There should be a subscription event sent to each peer."
);
// should add the new peers to `peer_topics` with an empty vec as a gossipsub node
for peer in peers {
let known_topics = gs.peer_topics.get(&peer).unwrap();
assert!(
known_topics == &topic_hashes,
"The topics for each node should all topics"
);
}
}
#[test]
/// Test subscription handling
fn test_handle_received_subscriptions() {
// For every subscription:
// SUBSCRIBE: - Add subscribed topic to peer_topics for peer.
// - Add peer to topics_peer.
// UNSUBSCRIBE - Remove topic from peer_topics for peer.
// - Remove peer from topic_peers.
let topics = vec!["topic1", "topic2", "topic3", "topic4"]
.iter()
.map(|&t| String::from(t))
.collect();
let (mut gs, peers, topic_hashes) = build_and_inject_nodes(20, topics, false);
// The first peer sends 3 subscriptions and 1 unsubscription
let mut subscriptions = topic_hashes[..3]
.iter()
.map(|topic_hash| GossipsubSubscription {
action: GossipsubSubscriptionAction::Subscribe,
topic_hash: topic_hash.clone(),
})
.collect::<Vec<GossipsubSubscription>>();
subscriptions.push(GossipsubSubscription {
action: GossipsubSubscriptionAction::Unsubscribe,
topic_hash: topic_hashes[topic_hashes.len() - 1].clone(),
});
let unknown_peer = PeerId::random();
// process the subscriptions
// first and second peers send subscriptions
gs.handle_received_subscriptions(&subscriptions, &peers[0]);
gs.handle_received_subscriptions(&subscriptions, &peers[1]);
// unknown peer sends the same subscriptions
gs.handle_received_subscriptions(&subscriptions, &unknown_peer);
// verify the result
let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
assert!(
peer_topics == topic_hashes[..3].to_vec(),
"First peer should be subscribed to three topics"
);
let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone();
assert!(
peer_topics == topic_hashes[..3].to_vec(),
"Second peer should be subscribed to three topics"
);
assert!(
gs.peer_topics.get(&unknown_peer).is_none(),
"Unknown peer should not have been added"
);
for topic_hash in topic_hashes[..3].iter() {
let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone();
assert!(
topic_peers == peers[..2].to_vec(),
"Two peers should be added to the first three topics"
);
}
// Peer 0 unsubscribes from the first topic
gs.handle_received_subscriptions(
&vec![GossipsubSubscription {
action: GossipsubSubscriptionAction::Unsubscribe,
topic_hash: topic_hashes[0].clone(),
}],
&peers[0],
);
let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
assert!(
peer_topics == topic_hashes[1..3].to_vec(),
"Peer should be subscribed to two topics"
);
let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment
assert!(
topic_peers == peers[1..2].to_vec(),
"Only the second peers should be in the first topic"
);
}
#[test]
/// Test Gossipsub.get_random_peers() function
fn test_get_random_peers() {
// generate a default GossipsubConfig
let gs_config = GossipsubConfig::default();
// create a gossipsub struct
let mut gs: Gossipsub<usize> = Gossipsub::new(PeerId::random(), gs_config);
// create a topic and fill it with some peers
let topic_hash = Topic::new("Test".into()).no_hash().clone();
let mut peers = vec![];
for _ in 0..20 {
peers.push(PeerId::random())
}
gs.topic_peers.insert(topic_hash.clone(), peers.clone());
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 5, { |_| true });
assert!(random_peers.len() == 5, "Expected 5 peers to be returned");
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 30, { |_| true });
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(random_peers == peers, "Expected no shuffling");
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 20, { |_| true });
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(random_peers == peers, "Expected no shuffling");
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 0, { |_| true });
assert!(random_peers.len() == 0, "Expected 0 peers to be returned");
// test the filter
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 5, { |_| false });
assert!(random_peers.len() == 0, "Expected 0 peers to be returned");
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 10, {
|peer| peers.contains(peer)
});
assert!(random_peers.len() == 10, "Expected 10 peers to be returned");
}
/// Tests that the correct message is sent when a peer asks for a message in our cache.
#[test]
fn test_handle_iwant_msg_cached() {
let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true);
let id = gs.config.message_id_fn;
let message = GossipsubMessage {
source: peers[11].clone(),
data: vec![1, 2, 3, 4],
sequence_number: 1u64,
topics: Vec::new(),
};
let msg_id = id(&message);
gs.mcache.put(message.clone());
gs.handle_iwant(&peers[7], vec![msg_id.clone()]);
// the messages we are sending
let sent_messages = gs
.events
.iter()
.fold(vec![], |mut collected_messages, e| match e {
NetworkBehaviourAction::SendEvent { peer_id: _, event } => {
for c in &event.messages {
collected_messages.push(c.clone())
}
collected_messages
}
_ => collected_messages,
});
assert!(
sent_messages.iter().any(|msg| id(msg) == msg_id),
"Expected the cached message to be sent to an IWANT peer"
);
}
/// Tests that messages are sent correctly depending on the shifting of the message cache.
#[test]
fn test_handle_iwant_msg_cached_shifted() {
let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true);
let id = gs.config.message_id_fn;
// perform 10 memshifts and check that it leaves the cache
for shift in 1..10 {
let message = GossipsubMessage {
source: peers[11].clone(),
data: vec![1, 2, 3, 4],
sequence_number: shift,
topics: Vec::new(),
};
let msg_id = id(&message);
gs.mcache.put(message.clone());
for _ in 0..shift {
gs.mcache.shift();
}
gs.handle_iwant(&peers[7], vec![msg_id.clone()]);
// is the message is being sent?
let message_exists = gs.events.iter().any(|e| match e {
NetworkBehaviourAction::SendEvent { peer_id: _, event } => {
event.messages.iter().any(|msg| id(msg) == msg_id)
}
_ => false,
});
// default history_length is 5, expect no messages after shift > 5
if shift < 5 {
assert!(
message_exists,
"Expected the cached message to be sent to an IWANT peer before 5 shifts"
);
} else {
assert!(
!message_exists,
"Expected the cached message to not be sent to an IWANT peer after 5 shifts"
);
}
}
}
#[test]
// tests that an event is not created when a peers asks for a message not in our cache
fn test_handle_iwant_msg_not_cached() {
let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true);
let events_before = gs.events.len();
gs.handle_iwant(&peers[7], vec![MessageId(String::from("unknown id"))]);
let events_after = gs.events.len();
assert_eq!(
events_before, events_after,
"Expected event count to stay the same"
);
}
#[test]
// tests that an event is created when a peer shares that it has a message we want
fn test_handle_ihave_subscribed_and_msg_not_cached() {
let (mut gs, peers, topic_hashes) =
build_and_inject_nodes(20, vec![String::from("topic1")], true);
gs.handle_ihave(
&peers[7],
vec![(
topic_hashes[0].clone(),
vec![MessageId(String::from("unknown id"))],
)],
);
// check that we sent an IWANT request for `unknown id`
let iwant_exists = match gs.control_pool.get(&peers[7]) {
Some(controls) => controls.iter().any(|c| match c {
GossipsubControlAction::IWant { message_ids } => message_ids
.iter()
.any(|m| *m.0 == String::from("unknown id")),
_ => false,
}),
_ => false,
};
assert!(
iwant_exists,
"Expected to send an IWANT control message for unkown message id"
);
}
#[test]
// tests that an event is not created when a peer shares that it has a message that
// we already have
fn test_handle_ihave_subscribed_and_msg_cached() {
let (mut gs, peers, topic_hashes) =
build_and_inject_nodes(20, vec![String::from("topic1")], true);
let msg_id = MessageId(String::from("known id"));
gs.received.put(msg_id.clone(), ());
let events_before = gs.events.len();
gs.handle_ihave(&peers[7], vec![(topic_hashes[0].clone(), vec![msg_id])]);
let events_after = gs.events.len();
assert_eq!(
events_before, events_after,
"Expected event count to stay the same"
)
}
#[test]
// test that an event is not created when a peer shares that it has a message in
// a topic that we are not subscribed to
fn test_handle_ihave_not_subscribed() {
let (mut gs, peers, _) = build_and_inject_nodes(20, vec![], true);
let events_before = gs.events.len();
gs.handle_ihave(
&peers[7],
vec![(
TopicHash::from_raw(String::from("unsubscribed topic")),
vec![MessageId(String::from("irrelevant id"))],
)],
);
let events_after = gs.events.len();
assert_eq!(
events_before, events_after,
"Expected event count to stay the same"
)
}
#[test]
// tests that a peer is added to our mesh when we are both subscribed
// to the same topic
fn test_handle_graft_is_subscribed() {
let (mut gs, peers, topic_hashes) =
build_and_inject_nodes(20, vec![String::from("topic1")], true);
gs.handle_graft(&peers[7], topic_hashes.clone());
assert!(
gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]),
"Expected peer to have been added to mesh"
);
}
#[test]
// tests that a peer is not added to our mesh when they are subscribed to
// a topic that we are not
fn test_handle_graft_is_not_subscribed() {
let (mut gs, peers, topic_hashes) =
build_and_inject_nodes(20, vec![String::from("topic1")], true);
gs.handle_graft(
&peers[7],
vec![TopicHash::from_raw(String::from("unsubscribed topic"))],
);
assert!(
gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]),
"Expected peer to have been added to mesh"
);
}
#[test]
// tests multiple topics in a single graft message
fn test_handle_graft_multiple_topics() {
let topics: Vec<String> = vec!["topic1", "topic2", "topic3", "topic4"]
.iter()
.map(|&t| String::from(t))
.collect();
let (mut gs, peers, topic_hashes) = build_and_inject_nodes(20, topics.clone(), true);
let mut their_topics = topic_hashes.clone();
// their_topics = [topic1, topic2, topic3]
// our_topics = [topic1, topic2, topic4]
their_topics.pop();
gs.leave(&their_topics[2]);
gs.handle_graft(&peers[7], their_topics.clone());
for i in 0..2 {
assert!(
gs.mesh.get(&topic_hashes[i]).unwrap().contains(&peers[7]),
"Expected peer to be in the mesh for the first 2 topics"
);
}
assert!(
gs.mesh.get(&topic_hashes[2]).is_none(),
"Expected the second topic to not be in the mesh"
);
}
#[test]
// tests that a peer is removed from our mesh
fn test_handle_prune_peer_in_mesh() {
let (mut gs, peers, topic_hashes) =
build_and_inject_nodes(20, vec![String::from("topic1")], true);
// insert peer into our mesh for 'topic1'
gs.mesh.insert(topic_hashes[0].clone(), peers.clone());
assert!(
gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]),
"Expected peer to be in mesh"
);
gs.handle_prune(&peers[7], topic_hashes.clone());
assert!(
!gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]),
"Expected peer to be removed from mesh"
);
}
}

View File

@ -0,0 +1,251 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::protocol::{GossipsubMessage, MessageId};
use std::borrow::Cow;
use std::time::Duration;
/// If the `no_source_id` flag is set, the IDENTITY_SOURCE value is used as the source of the
/// packet.
pub const IDENTITY_SOURCE: [u8; 3] = [0, 1, 0];
/// Configuration parameters that define the performance of the gossipsub network.
#[derive(Clone)]
pub struct GossipsubConfig {
/// The protocol id to negotiate this protocol (default is `/meshsub/1.0.0`).
pub protocol_id: Cow<'static, [u8]>,
// Overlay network parameters.
/// Number of heartbeats to keep in the `memcache` (default is 5).
pub history_length: usize,
/// Number of past heartbeats to gossip about (default is 3).
pub history_gossip: usize,
/// Target number of peers for the mesh network (D in the spec, default is 6).
pub mesh_n: usize,
/// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 4).
pub mesh_n_low: usize,
/// Maximum number of peers in mesh network before removing some (D_high in the spec, default
/// is 12).
pub mesh_n_high: usize,
/// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec, default is 6).
pub gossip_lazy: usize,
/// Initial delay in each heartbeat (default is 5 seconds).
pub heartbeat_initial_delay: Duration,
/// Time between each heartbeat (default is 1 second).
pub heartbeat_interval: Duration,
/// Time to live for fanout peers (default is 60 seconds).
pub fanout_ttl: Duration,
/// The maximum byte size for each gossip (default is 2048 bytes).
pub max_transmit_size: usize,
/// Flag determining if gossipsub topics are hashed or sent as plain strings (default is false).
pub hash_topics: bool,
/// When set, all published messages will have a 0 source `PeerId` (default is false).
pub no_source_id: bool,
/// When set to `true`, prevents automatic forwarding of all received messages. This setting
/// allows a user to validate the messages before propagating them to their peers. If set to
/// true, the user must manually call `propagate_message()` on the behaviour to forward message
/// once validated (default is false).
pub manual_propagation: bool,
/// A user-defined function allowing the user to specify the message id of a gossipsub message.
/// The default value is to concatenate the source peer id with a sequence number. Setting this
/// parameter allows the user to address packets arbitrarily. One example is content based
/// addressing, where this function may be set to `hash(message)`. This would prevent messages
/// of the same content from being duplicated.
///
/// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as
/// the message id.
pub message_id_fn: fn(&GossipsubMessage) -> MessageId,
}
impl Default for GossipsubConfig {
fn default() -> GossipsubConfig {
GossipsubConfig {
protocol_id: Cow::Borrowed(b"/meshsub/1.0.0"),
history_length: 5,
history_gossip: 3,
mesh_n: 6,
mesh_n_low: 4,
mesh_n_high: 12,
gossip_lazy: 6, // default to mesh_n
heartbeat_initial_delay: Duration::from_secs(5),
heartbeat_interval: Duration::from_secs(1),
fanout_ttl: Duration::from_secs(60),
max_transmit_size: 2048,
hash_topics: false, // default compatibility with floodsub
no_source_id: false,
manual_propagation: false,
message_id_fn: |message| {
// default message id is: source + sequence number
let mut source_string = message.source.to_base58();
source_string.push_str(&message.sequence_number.to_string());
MessageId(source_string)
},
}
}
}
pub struct GossipsubConfigBuilder {
config: GossipsubConfig,
}
impl Default for GossipsubConfigBuilder {
fn default() -> GossipsubConfigBuilder {
GossipsubConfigBuilder {
config: GossipsubConfig::default(),
}
}
}
impl GossipsubConfigBuilder {
// set default values
pub fn new() -> GossipsubConfigBuilder {
GossipsubConfigBuilder::default()
}
pub fn protocol_id(&mut self, protocol_id: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.config.protocol_id = protocol_id.into();
self
}
pub fn history_length(&mut self, history_length: usize) -> &mut Self {
assert!(
history_length >= self.config.history_gossip,
"The history_length must be greater than or equal to the history_gossip length"
);
self.config.history_length = history_length;
self
}
pub fn history_gossip(&mut self, history_gossip: usize) -> &mut Self {
assert!(
self.config.history_length >= history_gossip,
"The history_length must be greater than or equal to the history_gossip length"
);
self.config.history_gossip = history_gossip;
self
}
pub fn mesh_n(&mut self, mesh_n: usize) -> &mut Self {
assert!(
self.config.mesh_n_low <= mesh_n && mesh_n <= self.config.mesh_n_high,
"The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high"
);
self.config.mesh_n = mesh_n;
self
}
pub fn mesh_n_low(&mut self, mesh_n_low: usize) -> &mut Self {
assert!(
mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= self.config.mesh_n_high,
"The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high"
);
self.config.mesh_n_low = mesh_n_low;
self
}
pub fn mesh_n_high(&mut self, mesh_n_high: usize) -> &mut Self {
assert!(
self.config.mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= mesh_n_high,
"The following equality doesn't hold mesh_n_low <= mesh_n <= mesh_n_high"
);
self.config.mesh_n_high = mesh_n_high;
self
}
pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self {
self.config.gossip_lazy = gossip_lazy;
self
}
pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self {
self.config.heartbeat_initial_delay = heartbeat_initial_delay;
self
}
pub fn heartbeat_interval(&mut self, heartbeat_interval: Duration) -> &mut Self {
self.config.heartbeat_interval = heartbeat_interval;
self
}
pub fn fanout_ttl(&mut self, fanout_ttl: Duration) -> &mut Self {
self.config.fanout_ttl = fanout_ttl;
self
}
pub fn max_transmit_size(&mut self, max_transmit_size: usize) -> &mut Self {
self.config.max_transmit_size = max_transmit_size;
self
}
pub fn hash_topics(&mut self) -> &mut Self {
self.config.hash_topics = true;
self
}
pub fn no_source_id(&mut self) -> &mut Self {
self.config.no_source_id = true;
self
}
pub fn manual_propagation(&mut self) -> &mut Self {
self.config.manual_propagation = true;
self
}
pub fn message_id_fn(&mut self, id_fn: fn(&GossipsubMessage) -> MessageId) -> &mut Self {
self.config.message_id_fn = id_fn;
self
}
pub fn build(&self) -> GossipsubConfig {
self.config.clone()
}
}
impl std::fmt::Debug for GossipsubConfig {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let mut builder = f.debug_struct("GossipsubConfig");
let _ = builder.field("protocol_id", &self.protocol_id);
let _ = builder.field("history_length", &self.history_length);
let _ = builder.field("history_gossip", &self.history_gossip);
let _ = builder.field("mesh_n", &self.mesh_n);
let _ = builder.field("mesh_n_low", &self.mesh_n_low);
let _ = builder.field("mesh_n_high", &self.mesh_n_high);
let _ = builder.field("gossip_lazy", &self.gossip_lazy);
let _ = builder.field("heartbeat_initial_delay", &self.heartbeat_initial_delay);
let _ = builder.field("heartbeat_interval", &self.heartbeat_interval);
let _ = builder.field("fanout_ttl", &self.fanout_ttl);
let _ = builder.field("max_transmit_size", &self.max_transmit_size);
let _ = builder.field("hash_topics", &self.hash_topics);
let _ = builder.field("no_source_id", &self.no_source_id);
let _ = builder.field("manual_propagation", &self.manual_propagation);
builder.finish()
}
}

View File

@ -0,0 +1,359 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::behaviour::GossipsubRpc;
use crate::protocol::{GossipsubCodec, ProtocolConfig};
use futures::prelude::*;
use futures_codec::Framed;
use libp2p_core::upgrade::{InboundUpgrade, Negotiated, OutboundUpgrade};
use libp2p_swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use log::{debug, trace, warn};
use smallvec::SmallVec;
use std::{
borrow::Cow,
io,
pin::Pin,
task::{Context, Poll},
};
/// Protocol Handler that manages a single long-lived substream with a peer.
pub struct GossipsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Upgrade configuration for the gossipsub protocol.
listen_protocol: SubstreamProtocol<ProtocolConfig>,
/// The single long-lived outbound substream.
outbound_substream: Option<OutboundSubstreamState<TSubstream>>,
/// The single long-lived inbound substream.
inbound_substream: Option<InboundSubstreamState<TSubstream>>,
/// Queue of values that we want to send to the remote.
send_queue: SmallVec<[GossipsubRpc; 16]>,
/// Flag determining whether to maintain the connection to the peer.
keep_alive: KeepAlive,
}
/// State of the inbound substream, opened either by us or by the remote.
enum InboundSubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Waiting for a message from the remote. The idle state for an inbound substream.
WaitingInput(Framed<Negotiated<TSubstream>, GossipsubCodec>),
/// The substream is being closed.
Closing(Framed<Negotiated<TSubstream>, GossipsubCodec>),
/// An error occurred during processing.
Poisoned,
}
/// State of the outbound substream, opened either by us or by the remote.
enum OutboundSubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Waiting for the user to send a message. The idle state for an outbound substream.
WaitingOutput(Framed<Negotiated<TSubstream>, GossipsubCodec>),
/// Waiting to send a message to the remote.
PendingSend(Framed<Negotiated<TSubstream>, GossipsubCodec>, GossipsubRpc),
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(Framed<Negotiated<TSubstream>, GossipsubCodec>),
/// The substream is being closed. Used by either substream.
_Closing(Framed<Negotiated<TSubstream>, GossipsubCodec>),
/// An error occurred during processing.
Poisoned,
}
impl<TSubstream> GossipsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Builds a new `GossipsubHandler`.
pub fn new(protocol_id: impl Into<Cow<'static, [u8]>>, max_transmit_size: usize) -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::new(
protocol_id,
max_transmit_size,
)),
inbound_substream: None,
outbound_substream: None,
send_queue: SmallVec::new(),
keep_alive: KeepAlive::Yes,
}
}
}
impl<TSubstream> Default for GossipsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
fn default() -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()),
inbound_substream: None,
outbound_substream: None,
send_queue: SmallVec::new(),
keep_alive: KeepAlive::Yes,
}
}
}
impl<TSubstream> ProtocolsHandler for GossipsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type InEvent = GossipsubRpc;
type OutEvent = GossipsubRpc;
type Error = io::Error;
type Substream = TSubstream;
type InboundProtocol = ProtocolConfig;
type OutboundProtocol = ProtocolConfig;
type OutboundOpenInfo = GossipsubRpc;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.listen_protocol.clone()
}
fn inject_fully_negotiated_inbound(
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<Negotiated<TSubstream>>>::Output,
) {
// new inbound substream. Replace the current one, if it exists.
trace!("New inbound substream request");
self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
}
fn inject_fully_negotiated_outbound(
&mut self,
substream: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<TSubstream>>>::Output,
message: Self::OutboundOpenInfo,
) {
// Should never establish a new outbound substream if one already exists.
// If this happens, an outbound message is not sent.
if self.outbound_substream.is_some() {
warn!("Established an outbound substream with one already available");
// Add the message back to the send queue
self.send_queue.push(message);
} else {
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message));
}
}
fn inject_event(&mut self, message: GossipsubRpc) {
self.send_queue.push(message);
}
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
>,
) {
// Ignore upgrade errors for now.
// If a peer doesn't support this protocol, this will just ignore them, but not disconnect
// them.
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
// determine if we need to create the stream
if !self.send_queue.is_empty() && self.outbound_substream.is_none() {
let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit();
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: self.listen_protocol.clone(),
info: message,
});
}
loop {
match std::mem::replace(
&mut self.inbound_substream,
Some(InboundSubstreamState::Poisoned),
) {
// inbound idle state
Some(InboundSubstreamState::WaitingInput(mut substream)) => {
match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(message))) => {
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
return Poll::Ready(ProtocolsHandlerEvent::Custom(message));
}
Poll::Ready(Some(Err(e))) => {
debug!("Inbound substream error while awaiting input: {:?}", e);
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
}
// peer closed the stream
Poll::Ready(None) => {
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
}
Poll::Pending => {
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
break;
}
}
}
Some(InboundSubstreamState::Closing(mut substream)) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
self.inbound_substream = None;
if self.outbound_substream.is_none() {
self.keep_alive = KeepAlive::No;
}
break;
}
Poll::Ready(Err(e)) => {
debug!("Inbound substream error while closing: {:?}", e);
return Poll::Ready(ProtocolsHandlerEvent::Close(io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to close stream",
)));
}
Poll::Pending => {
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
break;
}
}
}
None => {
self.inbound_substream = None;
break;
}
Some(InboundSubstreamState::Poisoned) => {
panic!("Error occurred during inbound stream processing")
}
}
}
loop {
match std::mem::replace(
&mut self.outbound_substream,
Some(OutboundSubstreamState::Poisoned),
) {
// outbound idle state
Some(OutboundSubstreamState::WaitingOutput(substream)) => {
if !self.send_queue.is_empty() {
let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit();
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
} else {
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
break;
}
}
Some(OutboundSubstreamState::PendingSend(mut substream, message)) => {
match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
match Sink::start_send(Pin::new(&mut substream), message) {
Ok(()) => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingFlush(substream))
}
Err(e) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
}
}
}
Poll::Ready(Err(e)) => {
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
}
Poll::Pending => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingSend(substream, message));
break;
}
}
}
Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream))
}
Poll::Ready(Err(e)) => return Poll::Ready(ProtocolsHandlerEvent::Close(e)),
Poll::Pending => {
self.outbound_substream =
Some(OutboundSubstreamState::PendingFlush(substream));
break;
}
}
}
// Currently never used - manual shutdown may implement this in the future
Some(OutboundSubstreamState::_Closing(mut substream)) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
self.outbound_substream = None;
if self.inbound_substream.is_none() {
self.keep_alive = KeepAlive::No;
}
break;
}
Poll::Ready(Err(e)) => {
debug!("Outbound substream error while closing: {:?}", e);
return Poll::Ready(ProtocolsHandlerEvent::Close(io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to close outbound substream",
)));
}
Poll::Pending => {
self.outbound_substream =
Some(OutboundSubstreamState::_Closing(substream));
break;
}
}
}
None => {
self.outbound_substream = None;
break;
}
Some(OutboundSubstreamState::Poisoned) => {
panic!("Error occurred during outbound stream processing")
}
}
}
Poll::Pending
}
}

View File

@ -0,0 +1,153 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Gossipsub is a P2P pubsub (publish/subscription) routing layer designed to extend upon
//! flooodsub and meshsub routing protocols.
//!
//! # Overview
//!
//! *Note: The gossipsub protocol specifications
//! (https://github.com/libp2p/specs/tree/master/pubsub/gossipsub) provide an outline for the
//! routing protocol. They should be consulted for further detail.*
//!
//! Gossipsub is a blend of meshsub for data and randomsub for mesh metadata. It provides bounded
//! degree and amplification factor with the meshsub construction and augments it using gossip
//! propagation of metadata with the randomsub technique.
//!
//! The router maintains an overlay mesh network of peers on which to efficiently send messages and
//! metadata. Peers use control messages to broadcast and request known messages and
//! subscribe/unsubscribe from topics in the mesh network.
//!
//! # Important Discrepancies
//!
//! This section outlines the current implementation's potential discrepancies from that of other
//! implementations, due to undefined elements in the current specification.
//!
//! - **Topics** - In gossipsub, topics configurable by the `hash_topics` configuration parameter.
//! Topics are of type `TopicHash`. The current go implementation uses raw utf-8 strings, and this
//! is default configuration in rust-libp2p. Topics can be hashed (SHA256 hashed then base64
//! encoded) by setting the `hash_topics` configuration parameter to true.
//!
//! - **Sequence Numbers** - A message on the gossipsub network is identified by the source
//! `PeerId` and a nonce (sequence number) of the message. The sequence numbers in this
//! implementation are sent as raw bytes across the wire. They are 64-bit big-endian unsigned
//! integers. They are chosen at random in this implementation of gossipsub, but are sequential in
//! the current go implementation.
//!
//! # Using Gossipsub
//!
//! ## GossipsubConfig
//!
//! The [`GossipsubConfig`] struct specifies various network performance/tuning configuration
//! parameters. Specifically it specifies:
//!
//! [`GossipsubConfig`]: struct.GossipsubConfig.html
//!
//! - `protocol_id` - The protocol id that this implementation will accept connections on.
//! - `history_length` - The number of heartbeats which past messages are kept in cache (default: 5).
//! - `history_gossip` - The number of past heartbeats that the node will send gossip metadata
//! about (default: 3).
//! - `mesh_n` - The target number of peers store in the local mesh network.
//! (default: 6).
//! - `mesh_n_low` - The minimum number of peers in the local mesh network before.
//! trying to add more peers to the mesh from the connected peer pool (default: 4).
//! - `mesh_n_high` - The maximum number of peers in the local mesh network before removing peers to
//! reach `mesh_n` peers (default: 12).
//! - `gossip_lazy` - The number of peers that the local node will gossip to during a heartbeat (default: `mesh_n` = 6).
//! - `heartbeat_initial_delay - The initial time delay before starting the first heartbeat (default: 5 seconds).
//! - `heartbeat_interval` - The time between each heartbeat (default: 1 second).
//! - `fanout_ttl` - The fanout time to live time period. The timeout required before removing peers from the fanout
//! for a given topic (default: 1 minute).
//! - `max_transmit_size` - This sets the maximum transmission size for total gossipsub messages on the network.
//! - `hash_topics` - Whether to hash the topics using base64(SHA256(topic)) or to leave as plain utf-8 strings.
//! - `manual_propagation` - Whether gossipsub should immediately forward received messages on the
//! network. For applications requiring message validation, this should be set to false, then the
//! application should call `propagate_message(message_id, propagation_source)` once validated, to
//! propagate the message to peers.
//!
//! This struct implements the `Default` trait and can be initialised via
//! `GossipsubConfig::default()`.
//!
//!
//! ## Gossipsub
//!
//! The [`Gossipsub`] struct implements the `NetworkBehaviour` trait allowing it to act as the
//! routing behaviour in a `Swarm`. This struct requires an instance of `PeerId` and
//! [`GossipsubConfig`].
//!
//! [`Gossipsub`]: struct.Gossipsub.html
//! ## Example
//!
//! An example of initialising a gossipsub compatible swarm:
//!
//! ```ignore
//! #extern crate libp2p;
//! #extern crate futures;
//! #extern crate tokio;
//! #use libp2p::gossipsub::GossipsubEvent;
//! #use libp2p::{gossipsub, secio,
//! # tokio_codec::{FramedRead, LinesCodec},
//! #};
//! let local_key = secio::SecioKeyPair::ed25519_generated().unwrap();
//! let local_pub_key = local_key.to_public_key();
//!
//! // Set up an encrypted TCP Transport over the Mplex and Yamux protocols
//! let transport = libp2p::build_development_transport(local_key);
//!
//! // Create a Floodsub/Gossipsub topic
//! let topic = libp2p::floodsub::TopicBuilder::new("example").build();
//!
//! // Create a Swarm to manage peers and events
//! let mut swarm = {
//! // set default parameters for gossipsub
//! let gossipsub_config = gossipsub::GossipsubConfig::default();
//! // build a gossipsub network behaviour
//! let mut gossipsub =
//! gossipsub::Gossipsub::new(local_pub_key.clone().into_peer_id(), gossipsub_config);
//! gossipsub.subscribe(topic.clone());
//! libp2p::Swarm::new(
//! transport,
//! gossipsub,
//! libp2p::core::topology::MemoryTopology::empty(local_pub_key),
//! )
//! };
//!
//! // Listen on all interfaces and whatever port the OS assigns
//! let addr = libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
//! println!("Listening on {:?}", addr);
//! ```
pub mod protocol;
mod behaviour;
mod config;
mod handler;
mod mcache;
mod topic;
mod rpc_proto {
include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs"));
}
pub use self::behaviour::{Gossipsub, GossipsubEvent, GossipsubRpc};
pub use self::config::{GossipsubConfig, GossipsubConfigBuilder};
pub use self::protocol::{GossipsubMessage, MessageId};
pub use self::topic::{Topic, TopicHash};

View File

@ -0,0 +1,314 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
extern crate fnv;
use crate::protocol::{GossipsubMessage, MessageId};
use crate::topic::TopicHash;
use std::collections::HashMap;
/// CacheEntry stored in the history.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheEntry {
mid: MessageId,
topics: Vec<TopicHash>,
}
/// MessageCache struct holding history of messages.
#[derive(Clone)]
pub struct MessageCache {
msgs: HashMap<MessageId, GossipsubMessage>,
history: Vec<Vec<CacheEntry>>,
gossip: usize,
msg_id: fn(&GossipsubMessage) -> MessageId,
}
/// Implementation of the MessageCache.
impl MessageCache {
pub fn new(
gossip: usize,
history_capacity: usize,
msg_id: fn(&GossipsubMessage) -> MessageId,
) -> MessageCache {
MessageCache {
gossip,
msgs: HashMap::default(),
history: vec![Vec::new(); history_capacity],
msg_id,
}
}
/// Creates a `MessageCache` with a default message id function.
#[allow(dead_code)]
pub fn new_default(gossip: usize, history_capacity: usize) -> MessageCache {
let default_id = |message: &GossipsubMessage| {
// default message id is: source + sequence number
let mut source_string = message.source.to_base58();
source_string.push_str(&message.sequence_number.to_string());
MessageId(source_string)
};
MessageCache {
gossip,
msgs: HashMap::default(),
history: vec![Vec::new(); history_capacity],
msg_id: default_id,
}
}
/// Put a message into the memory cache
pub fn put(&mut self, msg: GossipsubMessage) {
let message_id = (self.msg_id)(&msg);
let cache_entry = CacheEntry {
mid: message_id.clone(),
topics: msg.topics.clone(),
};
self.msgs.insert(message_id, msg);
self.history[0].push(cache_entry);
}
/// Get a message with `message_id`
pub fn get(&self, message_id: &MessageId) -> Option<&GossipsubMessage> {
self.msgs.get(message_id)
}
/// Get a list of GossipIds for a given topic
pub fn get_gossip_ids(&self, topic: &TopicHash) -> Vec<MessageId> {
self.history[..self.gossip]
.iter()
.fold(vec![], |mut current_entries, entries| {
// search for entries with desired topic
let mut found_entries: Vec<MessageId> = entries
.iter()
.filter_map(|entry| {
if entry.topics.iter().any(|t| t == topic) {
Some(entry.mid.clone())
} else {
None
}
})
.collect();
// generate the list
current_entries.append(&mut found_entries);
current_entries
})
}
/// Shift the history array down one and delete messages associated with the
/// last entry
pub fn shift(&mut self) {
for entry in self.history.pop().expect("history is always > 1") {
self.msgs.remove(&entry.mid);
}
// Insert an empty vec in position 0
self.history.insert(0, Vec::new());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Topic, TopicHash};
use libp2p_core::PeerId;
fn gen_testm(x: u64, topics: Vec<TopicHash>) -> GossipsubMessage {
let u8x: u8 = x as u8;
let source = PeerId::random();
let data: Vec<u8> = vec![u8x];
let sequence_number = x;
let m = GossipsubMessage {
source,
data,
sequence_number,
topics,
};
m
}
#[test]
/// Test that the message cache can be created.
fn test_new_cache() {
let default_id = |message: &GossipsubMessage| {
// default message id is: source + sequence number
let mut source_string = message.source.to_base58();
source_string.push_str(&message.sequence_number.to_string());
MessageId(source_string)
};
let x: usize = 3;
let mc = MessageCache::new(x, 5, default_id);
assert_eq!(mc.gossip, x);
}
#[test]
/// Test you can put one message and get one.
fn test_put_get_one() {
let mut mc = MessageCache::new_default(10, 15);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
let m = gen_testm(10, vec![topic1_hash, topic2_hash]);
mc.put(m.clone());
assert!(mc.history[0].len() == 1);
let fetched = mc.get(&(mc.msg_id)(&m));
assert_eq!(fetched.is_none(), false);
assert_eq!(fetched.is_some(), true);
// Make sure it is the same fetched message
match fetched {
Some(x) => assert_eq!(*x, m),
_ => assert!(false),
}
}
#[test]
/// Test attempting to 'get' with a wrong id.
fn test_get_wrong() {
let mut mc = MessageCache::new_default(10, 15);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
let m = gen_testm(10, vec![topic1_hash, topic2_hash]);
mc.put(m.clone());
// Try to get an incorrect ID
let wrong_id = MessageId(String::from("wrongid"));
let fetched = mc.get(&wrong_id);
assert_eq!(fetched.is_none(), true);
}
#[test]
/// Test attempting to 'get' empty message cache.
fn test_get_empty() {
let mc = MessageCache::new_default(10, 15);
// Try to get an incorrect ID
let wrong_string = MessageId(String::from("imempty"));
let fetched = mc.get(&wrong_string);
assert_eq!(fetched.is_none(), true);
}
#[test]
/// Test adding a message with no topics.
fn test_no_topic_put() {
let mut mc = MessageCache::new_default(3, 5);
// Build the message
let m = gen_testm(1, vec![]);
mc.put(m.clone());
let fetched = mc.get(&(mc.msg_id)(&m));
// Make sure it is the same fetched message
match fetched {
Some(x) => assert_eq!(*x, m),
_ => assert!(false),
}
}
#[test]
/// Test shift mechanism.
fn test_shift() {
let mut mc = MessageCache::new_default(1, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
// Build the message
for i in 0..10 {
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
mc.put(m.clone());
}
mc.shift();
// Ensure the shift occurred
assert!(mc.history[0].len() == 0);
assert!(mc.history[1].len() == 10);
// Make sure no messages deleted
assert!(mc.msgs.len() == 10);
}
#[test]
/// Test Shift with no additions.
fn test_empty_shift() {
let mut mc = MessageCache::new_default(1, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
// Build the message
for i in 0..10 {
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
mc.put(m.clone());
}
mc.shift();
// Ensure the shift occurred
assert!(mc.history[0].len() == 0);
assert!(mc.history[1].len() == 10);
mc.shift();
assert!(mc.history[2].len() == 10);
assert!(mc.history[1].len() == 0);
assert!(mc.history[0].len() == 0);
}
#[test]
/// Test shift to see if the last history messages are removed.
fn test_remove_last_from_shift() {
let mut mc = MessageCache::new_default(4, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
// Build the message
for i in 0..10 {
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
mc.put(m.clone());
}
// Shift right until deleting messages
mc.shift();
mc.shift();
mc.shift();
mc.shift();
assert_eq!(mc.history[mc.history.len() - 1].len(), 10);
// Shift and delete the messages
mc.shift();
assert_eq!(mc.history[mc.history.len() - 1].len(), 0);
assert_eq!(mc.history[0].len(), 0);
assert_eq!(mc.msgs.len(), 0);
}
}

View File

@ -0,0 +1,399 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::behaviour::GossipsubRpc;
use crate::rpc_proto;
use crate::topic::TopicHash;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use bytes::BytesMut;
use futures::future;
use futures::prelude::*;
use futures_codec::{Decoder, Encoder, Framed};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
use prost::Message as ProtobufMessage;
use std::{borrow::Cow, io, iter, pin::Pin};
use unsigned_varint::codec;
/// Implementation of the `ConnectionUpgrade` for the Gossipsub protocol.
#[derive(Debug, Clone)]
pub struct ProtocolConfig {
protocol_id: Cow<'static, [u8]>,
max_transmit_size: usize,
}
impl Default for ProtocolConfig {
fn default() -> Self {
Self {
protocol_id: Cow::Borrowed(b"/meshsub/1.0.0"),
max_transmit_size: 2048,
}
}
}
impl ProtocolConfig {
/// Builds a new `ProtocolConfig`.
/// Sets the maximum gossip transmission size.
pub fn new(
protocol_id: impl Into<Cow<'static, [u8]>>,
max_transmit_size: usize,
) -> ProtocolConfig {
ProtocolConfig {
protocol_id: protocol_id.into(),
max_transmit_size,
}
}
}
impl UpgradeInfo for ProtocolConfig {
type Info = Cow<'static, [u8]>;
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol_id.clone())
}
}
impl<TSocket> InboundUpgrade<TSocket> for ProtocolConfig
where
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = Framed<TSocket, GossipsubCodec>;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
let mut length_codec = codec::UviBytes::default();
length_codec.set_max_len(self.max_transmit_size);
Box::pin(future::ok(Framed::new(
socket,
GossipsubCodec { length_codec },
)))
}
}
impl<TSocket> OutboundUpgrade<TSocket> for ProtocolConfig
where
TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
{
type Output = Framed<TSocket, GossipsubCodec>;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
let mut length_codec = codec::UviBytes::default();
length_codec.set_max_len(self.max_transmit_size);
Box::pin(future::ok(Framed::new(
socket,
GossipsubCodec { length_codec },
)))
}
}
/* Gossip codec for the framing */
pub struct GossipsubCodec {
/// Codec to encode/decode the Unsigned varint length prefix of the frames.
length_codec: codec::UviBytes,
}
impl Encoder for GossipsubCodec {
type Item = GossipsubRpc;
type Error = io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
// messages
let publish = item
.messages
.into_iter()
.map(|message| rpc_proto::Message {
from: Some(message.source.into_bytes()),
data: Some(message.data),
seqno: Some(message.sequence_number.to_be_bytes().to_vec()),
topic_ids: message
.topics
.into_iter()
.map(TopicHash::into_string)
.collect(),
})
.collect::<Vec<_>>();
// subscriptions
let subscriptions = item
.subscriptions
.into_iter()
.map(|sub| rpc_proto::rpc::SubOpts {
subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe),
topic_id: Some(sub.topic_hash.into_string()),
})
.collect::<Vec<_>>();
// control messages
let mut control = rpc_proto::ControlMessage {
ihave: Vec::new(),
iwant: Vec::new(),
graft: Vec::new(),
prune: Vec::new(),
};
let empty_control_msg = item.control_msgs.is_empty();
for action in item.control_msgs {
match action {
// collect all ihave messages
GossipsubControlAction::IHave {
topic_hash,
message_ids,
} => {
let rpc_ihave = rpc_proto::ControlIHave {
topic_id: Some(topic_hash.into_string()),
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.ihave.push(rpc_ihave);
}
GossipsubControlAction::IWant { message_ids } => {
let rpc_iwant = rpc_proto::ControlIWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.iwant.push(rpc_iwant);
}
GossipsubControlAction::Graft { topic_hash } => {
let rpc_graft = rpc_proto::ControlGraft {
topic_id: Some(topic_hash.into_string()),
};
control.graft.push(rpc_graft);
}
GossipsubControlAction::Prune { topic_hash } => {
let rpc_prune = rpc_proto::ControlPrune {
topic_id: Some(topic_hash.into_string()),
};
control.prune.push(rpc_prune);
}
}
}
let rpc = rpc_proto::Rpc {
subscriptions,
publish,
control: if empty_control_msg {
None
} else {
Some(control)
},
};
let mut buf = Vec::with_capacity(rpc.encoded_len());
rpc.encode(&mut buf)
.expect("Buffer has sufficient capacity");
// length prefix the protobuf message, ensuring the max limit is not hit
self.length_codec.encode(Bytes::from(buf), dst)
}
}
impl Decoder for GossipsubCodec {
type Item = GossipsubRpc;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let packet = match self.length_codec.decode(src)? {
Some(p) => p,
None => return Ok(None),
};
let rpc = rpc_proto::Rpc::decode(&packet[..])?;
let mut messages = Vec::with_capacity(rpc.publish.len());
for publish in rpc.publish.into_iter() {
// ensure the sequence number is a u64
let seq_no = publish.seqno.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"sequence number was not provided",
)
})?;
if seq_no.len() != 8 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"sequence number has an incorrect size",
));
}
messages.push(GossipsubMessage {
source: PeerId::from_bytes(publish.from.unwrap_or_default())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id"))?,
data: publish.data.unwrap_or_default(),
sequence_number: BigEndian::read_u64(&seq_no),
topics: publish
.topic_ids
.into_iter()
.map(TopicHash::from_raw)
.collect(),
});
}
let mut control_msgs = Vec::new();
if let Some(rpc_control) = rpc.control {
// Collect the gossipsub control messages
let ihave_msgs: Vec<GossipsubControlAction> = rpc_control
.ihave
.into_iter()
.map(|ihave| GossipsubControlAction::IHave {
topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
message_ids: ihave
.message_ids
.into_iter()
.map(|x| MessageId(x))
.collect::<Vec<_>>(),
})
.collect();
let iwant_msgs: Vec<GossipsubControlAction> = rpc_control
.iwant
.into_iter()
.map(|iwant| GossipsubControlAction::IWant {
message_ids: iwant
.message_ids
.into_iter()
.map(|x| MessageId(x))
.collect::<Vec<_>>(),
})
.collect();
let graft_msgs: Vec<GossipsubControlAction> = rpc_control
.graft
.into_iter()
.map(|graft| GossipsubControlAction::Graft {
topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
})
.collect();
let prune_msgs: Vec<GossipsubControlAction> = rpc_control
.prune
.into_iter()
.map(|prune| GossipsubControlAction::Prune {
topic_hash: TopicHash::from_raw(prune.topic_id.unwrap_or_default()),
})
.collect();
control_msgs.extend(ihave_msgs);
control_msgs.extend(iwant_msgs);
control_msgs.extend(graft_msgs);
control_msgs.extend(prune_msgs);
}
Ok(Some(GossipsubRpc {
messages,
subscriptions: rpc
.subscriptions
.into_iter()
.map(|sub| GossipsubSubscription {
action: if Some(true) == sub.subscribe {
GossipsubSubscriptionAction::Subscribe
} else {
GossipsubSubscriptionAction::Unsubscribe
},
topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
})
.collect(),
control_msgs,
}))
}
}
/// A type for gossipsub message ids.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct MessageId(pub String);
impl std::fmt::Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Into<String> for MessageId {
fn into(self) -> String {
self.0.into()
}
}
/// A message received by the gossipsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GossipsubMessage {
/// Id of the peer that published this message.
pub source: PeerId,
/// Content of the message. Its meaning is out of scope of this library.
pub data: Vec<u8>,
/// A random sequence number.
pub sequence_number: u64,
/// List of topics this message belongs to.
///
/// Each message can belong to multiple topics at once.
pub topics: Vec<TopicHash>,
}
/// A subscription received by the gossipsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GossipsubSubscription {
/// Action to perform.
pub action: GossipsubSubscriptionAction,
/// The topic from which to subscribe or unsubscribe.
pub topic_hash: TopicHash,
}
/// Action that a subscription wants to perform.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum GossipsubSubscriptionAction {
/// The remote wants to subscribe to the given topic.
Subscribe,
/// The remote wants to unsubscribe from the given topic.
Unsubscribe,
}
/// A Control message received by the gossipsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum GossipsubControlAction {
/// Node broadcasts known messages per topic - IHave control message.
IHave {
/// The topic of the messages.
topic_hash: TopicHash,
/// A list of known message ids (peer_id + sequence _number) as a string.
message_ids: Vec<MessageId>,
},
/// The node requests specific message ids (peer_id + sequence _number) - IWant control message.
IWant {
/// A list of known message ids (peer_id + sequence _number) as a string.
message_ids: Vec<MessageId>,
},
/// The node has been added to the mesh - Graft control message.
Graft {
/// The mesh topic the peer should be added to.
topic_hash: TopicHash,
},
/// The node has been removed from the mesh - Prune control message.
Prune {
/// The mesh topic the peer should be removed from.
topic_hash: TopicHash,
},
}

View File

@ -0,0 +1,75 @@
syntax = "proto2";
package gossipsub.pb;
message RPC {
repeated SubOpts subscriptions = 1;
repeated Message publish = 2;
message SubOpts {
optional bool subscribe = 1; // subscribe or unsubscribe
optional string topic_id = 2;
}
optional ControlMessage control = 3;
}
message Message {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topic_ids = 4;
}
message ControlMessage {
repeated ControlIHave ihave = 1;
repeated ControlIWant iwant = 2;
repeated ControlGraft graft = 3;
repeated ControlPrune prune = 4;
}
message ControlIHave {
optional string topic_id = 1;
repeated string message_ids = 2;
}
message ControlIWant {
repeated string message_ids= 1;
}
message ControlGraft {
optional string topic_id = 1;
}
message ControlPrune {
optional string topic_id = 1;
}
// topicID = hash(topicDescriptor); (not the topic.name)
message TopicDescriptor {
optional string name = 1;
optional AuthOpts auth = 2;
optional EncOpts enc = 3;
message AuthOpts {
optional AuthMode mode = 1;
repeated bytes keys = 2; // root keys to trust
enum AuthMode {
NONE = 0; // no authentication, anyone can publish
KEY = 1; // only messages signed by keys in the topic descriptor are accepted
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}
message EncOpts {
optional EncMode mode = 1;
repeated bytes key_hashes = 2; // the hashes of the shared keys used (salted)
enum EncMode {
NONE = 0; // no encryption, anyone can read
SHAREDKEY = 1; // messages are encrypted with shared key
WOT = 2; // web of trust, certificates can allow publisher set to grow
}
}
}

View File

@ -0,0 +1,93 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::rpc_proto;
use base64::encode;
use prost::Message;
use sha2::{Digest, Sha256};
use std::fmt;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopicHash {
/// The topic hash. Stored as a string to align with the protobuf API.
hash: String,
}
impl TopicHash {
pub fn from_raw(hash: impl Into<String>) -> TopicHash {
TopicHash { hash: hash.into() }
}
pub fn into_string(self) -> String {
self.hash
}
pub fn as_str(&self) -> &str {
&self.hash
}
}
/// A gossipsub topic.
#[derive(Debug, Clone)]
pub struct Topic {
topic: String,
}
impl Topic {
pub fn new(topic: String) -> Self {
Topic { topic }
}
/// Creates a `TopicHash` by SHA256 hashing the topic then base64 encoding the
/// hash.
pub fn sha256_hash(&self) -> TopicHash {
let topic_descripter = rpc_proto::TopicDescriptor {
name: Some(self.topic.clone()),
auth: None,
enc: None,
};
let mut bytes = Vec::with_capacity(topic_descripter.encoded_len());
topic_descripter
.encode(&mut bytes)
.expect("buffer is large enough");
let hash = encode(Sha256::digest(&bytes).as_slice());
TopicHash { hash }
}
/// Creates a `TopicHash` as a raw string.
pub fn no_hash(&self) -> TopicHash {
TopicHash {
hash: self.topic.clone(),
}
}
}
impl fmt::Display for Topic {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.topic)
}
}
impl fmt::Display for TopicHash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.hash)
}
}

View File

@ -0,0 +1,224 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use log::debug;
use quickcheck::{QuickCheck, TestResult};
use rand::{random, seq::SliceRandom, SeedableRng};
use std::{
io::Error,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use libp2p_core::{
identity,
multiaddr::Protocol,
muxing::StreamMuxerBox,
nodes::Substream,
transport::{boxed::Boxed, MemoryTransport},
upgrade, Multiaddr, PeerId, Transport,
};
use libp2p_gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic};
use libp2p_plaintext::PlainText2Config;
use libp2p_swarm::Swarm;
use libp2p_yamux as yamux;
type TestSwarm =
Swarm<Boxed<(PeerId, StreamMuxerBox), Error>, Gossipsub<Substream<StreamMuxerBox>>>;
struct Graph {
pub nodes: Vec<(Multiaddr, TestSwarm)>,
}
impl Future for Graph {
type Output = (Multiaddr, GossipsubEvent);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
for (addr, node) in &mut self.nodes {
match node.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => return Poll::Ready((addr.clone(), event)),
Poll::Ready(None) => panic!("unexpected None when polling nodes"),
Poll::Pending => {}
}
}
Poll::Pending
}
}
impl Graph {
fn new_connected(num_nodes: usize, seed: u64) -> Graph {
if num_nodes == 0 {
panic!("expecting at least one node");
}
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
let mut not_connected_nodes = std::iter::once(())
.cycle()
.take(num_nodes)
.map(|_| build_node())
.collect::<Vec<(Multiaddr, TestSwarm)>>();
let mut connected_nodes = vec![not_connected_nodes.pop().unwrap()];
while !not_connected_nodes.is_empty() {
connected_nodes.shuffle(&mut rng);
not_connected_nodes.shuffle(&mut rng);
let mut next = not_connected_nodes.pop().unwrap();
let connected_addr = &connected_nodes[0].0;
// Memory transport can not handle addresses with `/p2p` suffix.
let mut connected_addr_no_p2p = connected_addr.clone();
let p2p_suffix_connected = connected_addr_no_p2p.pop();
debug!(
"Connect: {} -> {}",
next.0.clone().pop().unwrap(),
p2p_suffix_connected.unwrap()
);
Swarm::dial_addr(&mut next.1, connected_addr_no_p2p).unwrap();
connected_nodes.push(next);
}
Graph {
nodes: connected_nodes,
}
}
/// Polls the graph and passes each event into the provided FnMut until it returns `true`.
fn wait_for<F>(self, mut f: F) -> Self
where
F: FnMut(GossipsubEvent) -> bool,
{
// The future below should return self. Given that it is a FnMut and not a FnOnce, one needs
// to wrap `self` in an Option, leaving a `None` behind after the final `Poll::Ready`.
let mut this = Some(self);
let fut = futures::future::poll_fn(move |cx| match &mut this {
Some(graph) => loop {
match graph.poll_unpin(cx) {
Poll::Ready((_addr, ev)) => {
if f(ev) {
return Poll::Ready(this.take().unwrap());
}
}
Poll::Pending => return Poll::Pending,
}
},
None => panic!("future called after final return"),
});
let fut = async_std::future::timeout(Duration::from_secs(10), fut);
futures::executor::block_on(fut).unwrap()
}
}
fn build_node() -> (Multiaddr, TestSwarm) {
let key = identity::Keypair::generate_ed25519();
let public_key = key.public();
let transport = MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(PlainText2Config {
local_public_key: public_key.clone(),
})
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| panic!("Failed to create transport: {:?}", e))
.boxed();
let peer_id = public_key.clone().into_peer_id();
let behaviour = Gossipsub::new(peer_id.clone(), GossipsubConfig::default());
let mut swarm = Swarm::new(transport, behaviour, peer_id);
let port = 1 + random::<u64>();
let mut addr: Multiaddr = Protocol::Memory(port).into();
Swarm::listen_on(&mut swarm, addr.clone()).unwrap();
addr = addr.with(libp2p_core::multiaddr::Protocol::P2p(
public_key.into_peer_id().into(),
));
(addr, swarm)
}
#[test]
fn multi_hop_propagation() {
let _ = env_logger::try_init();
fn prop(num_nodes: usize, seed: u64) -> TestResult {
if num_nodes < 2 || num_nodes > 100 {
return TestResult::discard();
}
debug!("number nodes: {:?}, seed: {:?}", num_nodes, seed);
let mut graph = Graph::new_connected(num_nodes, seed);
let number_nodes = graph.nodes.len();
// Subscribe each node to the same topic.
let topic = Topic::new("test-net".into());
for (_addr, node) in &mut graph.nodes {
node.subscribe(topic.clone());
}
// Wait for all nodes to be subscribed.
let mut subscribed = 0;
graph = graph.wait_for(move |ev| {
if let GossipsubEvent::Subscribed { .. } = ev {
subscribed += 1;
if subscribed == (number_nodes - 1) * 2 {
return true;
}
}
false
});
// Publish a single message.
graph.nodes[0].1.publish(&topic, vec![1, 2, 3]);
// Wait for all nodes to receive the published message.
let mut received_msgs = 0;
graph.wait_for(move |ev| {
if let GossipsubEvent::Message(..) = ev {
received_msgs += 1;
if received_msgs == number_nodes - 1 {
return true;
}
}
false
});
TestResult::passed()
}
QuickCheck::new()
.max_tests(10)
.quickcheck(prop as fn(usize, u64) -> TestResult)
}

View File

@ -174,6 +174,8 @@ pub use libp2p_kad as kad;
#[doc(inline)]
pub use libp2p_floodsub as floodsub;
#[doc(inline)]
pub use libp2p_gossipsub as gossipsub;
#[doc(inline)]
pub use libp2p_mplex as mplex;
#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))]
#[doc(inline)]