Age Manning 37c7d73b11 Gossipsub Protocol (#898)
* Create gossipsub crate - Basic template, borrowed from floodsub

* Add a GossipsubConfig struct and set up basic structures in the Gossipsub struct

* Begin implementation of join. Adds get_random_peers helper function and adds tests

* Implements gossipsub leave()

* Update publishMany to incorporate gossipsub mesh and fanout logic

* Use the gossipsub mesh for determining peer subscription

* Remove subscribed_topics field from the Gossipsub struct

* Rename gossipsubconfig to ProtocolConfig

* Implement the gossipsub control messages into the Codec's Encode/Decode and modifies GossipsubRpc

* Modify GossipsubActions to enums for succinctness.

* Modify the memcache to store Gossipsub messages

* Implement control message handling.

* Update control message handling to handle multiple messages.

* Handle received gossipsub messages using pre-built handlers.

* Remove excess connected peer hashmap

* Add extra peer mapping and consistent topic naming.

* Implement heartbeat, emit_gossip and send_graft_prune.

* Group logic in forwarding messages. Add messages to memcache.

* Add heartbeat timer and move location of helper function.

* Add gossipsub the libp2p workspace, makes layer structs public

* Add logging to gossipsub

- Adds the log crate and implements logging macros
- Specifies versions for external crates

* Add example chat for debugging purposes

* Implement #868 for gossipsub.

* Add rust documentation to gossipsub crate.

- Adds basic documentation, overview and examples to the gossipsub
crate.

* Re-introduce the initial heartbeat time config.

This commit also adds the inject_connected test.

* Add subscribe tests.

- Modifies `handle_received_subscriptions` to take a reference of
subscriptions
- Adds `test_subscribe`
- Adds `test_handle_received_subscriptions`
- Adds tests for the filter in `get_random_peers`

* Add Bug fixes and further testing for gossipsub.

- Corrects the tuple use of topic_hashes
- Corrects JOIN logic around fanout and adding peers to the mesh
- Adds test_unsubscribe
- Adds test_join

* Rename GossipsubMessage::msg_id -> id

* Add bug fix for handling disconnected peers.

* Implements (partially) #889 for Gossipsub.

* handle_iwant event count tests

* handle_ihave event count tests

* Move layer.rs tests into separate file.

* Implement clippy suggestions for gossipsub.

* Modify control message tests for specific types.

* Implement builder pattern for GossipsubConfig.

As suggested by @twittner - The builder pattern for building
GossipsubConfig struct is implemented.

* Package version updates as suggested by @twittner.

* Correct line lengths in gossipsub.

* Correct braces in  found by @twittner.

* Implement @twittner's suggestions.

- Uses `HashSet` where applicable
- Update `FnvHashMap` to standard `HashMap`
- Uses `min` function in code simplification.

* Add NodeList struct to clarify topic_peers.

* Cleaner handling of messagelist

Co-Authored-By: AgeManning <Age@AgeManning.com>

* Cleaner handling of added peers.

Co-Authored-By: AgeManning <Age@AgeManning.com>

* handle_prune peer removed test

* basic grafting tests

* multiple topic grafting test

* Convert &vec to slice.

Co-Authored-By: AgeManning <Age@AgeManning.com>

* Convert to lazy insert.

Co-Authored-By: AgeManning <Age@AgeManning.com>

* Cleaner topic handling.

Co-Authored-By: AgeManning <Age@AgeManning.com>

* control pool piggybacking

using HashMap.drain() in control_pool_flush

going to squash this

* Add Debug derives to gossipsub and correct tests.

* changes from PR

squash this

all tests passing, but still some that need to be reconsidered

test reform

* Implements Arc for GossipsubRpc events

* Remove support for floodsub nodes

* Reconnected to disconnected peers, to mitigate timeout

* Use ReadOne WriteOne with configurable max gossip sizes

* Remove length delimination from RPC encoding

* Prevent peer duplication in mesh

* Allow oneshot handler's inactivity_timeout to be configurable

* Correct peer duplication in mesh bug

* Remove auto-reconnect to allow for user-level disconnects

* Single long-lived inbound/outbound streams to match go implementation

* Allow gossipsub topics to be optionally hashable

* Improves gossipsub stream handling

- Corrects the handler's keep alive.
- Correct the chat example.
- Instantly add peers to the mesh on subscription if the mesh is low.

* Allows message validation in gossipsub

* Replaces Cuckoofilter with LRUCache

The false positive rate was unacceptable for rejecting messages.

* Renames configuration parameter and corrects logic

* Removes peer from fanout on disconnection

* Add publish and fanout tests

* Apply @mxinden suggestions

* Resend message if outbound stream negotiated

- Downgrades log warnings

* Implement further reviewer suggestions

- Created associated functions to avoid unnecessary cloning
- Messages are rejected if their sequence numbers are not u64
- `GossipsbuConfigBuilder` has the same defaults as `GossipsubConfig`
- Miscellaneous typos

* Add MessageId type and remove unnecessary comments

* Add a return value to propagate_message function

* Adds user-customised gossipsub message ids

* Adds the message id to GossipsubEvent

* Implement Debug for GossipsubConfig

* protocols/gossipsub: Add basic smoke test

Implement a basic smoke test that:

1. Builds a fully connected graph of size N.

2. Subscribes each node to the same topic.

3. Publishes a single message.

4. Waits for all nodes to receive the above message.

N and the structure of the graph are reproducibly randomized via
Quickcheck.

* Corrections pointed out by @mxinden

* Add option to remove source id publishing

* protocols/gossipsub/tests/smoke: Remove unused variable

* Merge latest master

* protocols/gossipsub: Move to stable futures

* examples/gossipsub-chat.rs: Move to stable futures

* protocols/gossipsub/src/behaviour/tests: Update to stable futures

* protocols/gossipsub/tests: Update to stable futures

* protocols/gossipsub: Log substream errors

* protocols/gossipsub: Log outbound substream errors

* Remove rust-fmt formatting

* Shift to prost for protobuf compiling

* Use wasm_timer for wasm compatibility

Co-authored-by: Grant Wuerker <gwuerker@gmail.com>
Co-authored-by: Toralf Wittner <tw@dtex.org>
Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>
Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
2020-01-24 16:16:02 +01:00

315 lines
9.6 KiB
Rust

// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
extern crate fnv;
use crate::protocol::{GossipsubMessage, MessageId};
use crate::topic::TopicHash;
use std::collections::HashMap;
/// CacheEntry stored in the history.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CacheEntry {
mid: MessageId,
topics: Vec<TopicHash>,
}
/// MessageCache struct holding history of messages.
#[derive(Clone)]
pub struct MessageCache {
msgs: HashMap<MessageId, GossipsubMessage>,
history: Vec<Vec<CacheEntry>>,
gossip: usize,
msg_id: fn(&GossipsubMessage) -> MessageId,
}
/// Implementation of the MessageCache.
impl MessageCache {
pub fn new(
gossip: usize,
history_capacity: usize,
msg_id: fn(&GossipsubMessage) -> MessageId,
) -> MessageCache {
MessageCache {
gossip,
msgs: HashMap::default(),
history: vec![Vec::new(); history_capacity],
msg_id,
}
}
/// Creates a `MessageCache` with a default message id function.
#[allow(dead_code)]
pub fn new_default(gossip: usize, history_capacity: usize) -> MessageCache {
let default_id = |message: &GossipsubMessage| {
// default message id is: source + sequence number
let mut source_string = message.source.to_base58();
source_string.push_str(&message.sequence_number.to_string());
MessageId(source_string)
};
MessageCache {
gossip,
msgs: HashMap::default(),
history: vec![Vec::new(); history_capacity],
msg_id: default_id,
}
}
/// Put a message into the memory cache
pub fn put(&mut self, msg: GossipsubMessage) {
let message_id = (self.msg_id)(&msg);
let cache_entry = CacheEntry {
mid: message_id.clone(),
topics: msg.topics.clone(),
};
self.msgs.insert(message_id, msg);
self.history[0].push(cache_entry);
}
/// Get a message with `message_id`
pub fn get(&self, message_id: &MessageId) -> Option<&GossipsubMessage> {
self.msgs.get(message_id)
}
/// Get a list of GossipIds for a given topic
pub fn get_gossip_ids(&self, topic: &TopicHash) -> Vec<MessageId> {
self.history[..self.gossip]
.iter()
.fold(vec![], |mut current_entries, entries| {
// search for entries with desired topic
let mut found_entries: Vec<MessageId> = entries
.iter()
.filter_map(|entry| {
if entry.topics.iter().any(|t| t == topic) {
Some(entry.mid.clone())
} else {
None
}
})
.collect();
// generate the list
current_entries.append(&mut found_entries);
current_entries
})
}
/// Shift the history array down one and delete messages associated with the
/// last entry
pub fn shift(&mut self) {
for entry in self.history.pop().expect("history is always > 1") {
self.msgs.remove(&entry.mid);
}
// Insert an empty vec in position 0
self.history.insert(0, Vec::new());
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Topic, TopicHash};
use libp2p_core::PeerId;
fn gen_testm(x: u64, topics: Vec<TopicHash>) -> GossipsubMessage {
let u8x: u8 = x as u8;
let source = PeerId::random();
let data: Vec<u8> = vec![u8x];
let sequence_number = x;
let m = GossipsubMessage {
source,
data,
sequence_number,
topics,
};
m
}
#[test]
/// Test that the message cache can be created.
fn test_new_cache() {
let default_id = |message: &GossipsubMessage| {
// default message id is: source + sequence number
let mut source_string = message.source.to_base58();
source_string.push_str(&message.sequence_number.to_string());
MessageId(source_string)
};
let x: usize = 3;
let mc = MessageCache::new(x, 5, default_id);
assert_eq!(mc.gossip, x);
}
#[test]
/// Test you can put one message and get one.
fn test_put_get_one() {
let mut mc = MessageCache::new_default(10, 15);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
let m = gen_testm(10, vec![topic1_hash, topic2_hash]);
mc.put(m.clone());
assert!(mc.history[0].len() == 1);
let fetched = mc.get(&(mc.msg_id)(&m));
assert_eq!(fetched.is_none(), false);
assert_eq!(fetched.is_some(), true);
// Make sure it is the same fetched message
match fetched {
Some(x) => assert_eq!(*x, m),
_ => assert!(false),
}
}
#[test]
/// Test attempting to 'get' with a wrong id.
fn test_get_wrong() {
let mut mc = MessageCache::new_default(10, 15);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
let m = gen_testm(10, vec![topic1_hash, topic2_hash]);
mc.put(m.clone());
// Try to get an incorrect ID
let wrong_id = MessageId(String::from("wrongid"));
let fetched = mc.get(&wrong_id);
assert_eq!(fetched.is_none(), true);
}
#[test]
/// Test attempting to 'get' empty message cache.
fn test_get_empty() {
let mc = MessageCache::new_default(10, 15);
// Try to get an incorrect ID
let wrong_string = MessageId(String::from("imempty"));
let fetched = mc.get(&wrong_string);
assert_eq!(fetched.is_none(), true);
}
#[test]
/// Test adding a message with no topics.
fn test_no_topic_put() {
let mut mc = MessageCache::new_default(3, 5);
// Build the message
let m = gen_testm(1, vec![]);
mc.put(m.clone());
let fetched = mc.get(&(mc.msg_id)(&m));
// Make sure it is the same fetched message
match fetched {
Some(x) => assert_eq!(*x, m),
_ => assert!(false),
}
}
#[test]
/// Test shift mechanism.
fn test_shift() {
let mut mc = MessageCache::new_default(1, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
// Build the message
for i in 0..10 {
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
mc.put(m.clone());
}
mc.shift();
// Ensure the shift occurred
assert!(mc.history[0].len() == 0);
assert!(mc.history[1].len() == 10);
// Make sure no messages deleted
assert!(mc.msgs.len() == 10);
}
#[test]
/// Test Shift with no additions.
fn test_empty_shift() {
let mut mc = MessageCache::new_default(1, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
// Build the message
for i in 0..10 {
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
mc.put(m.clone());
}
mc.shift();
// Ensure the shift occurred
assert!(mc.history[0].len() == 0);
assert!(mc.history[1].len() == 10);
mc.shift();
assert!(mc.history[2].len() == 10);
assert!(mc.history[1].len() == 0);
assert!(mc.history[0].len() == 0);
}
#[test]
/// Test shift to see if the last history messages are removed.
fn test_remove_last_from_shift() {
let mut mc = MessageCache::new_default(4, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
// Build the message
for i in 0..10 {
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
mc.put(m.clone());
}
// Shift right until deleting messages
mc.shift();
mc.shift();
mc.shift();
mc.shift();
assert_eq!(mc.history[mc.history.len() - 1].len(), 10);
// Shift and delete the messages
mc.shift();
assert_eq!(mc.history[mc.history.len() - 1].len(), 0);
assert_eq!(mc.history[0].len(), 0);
assert_eq!(mc.msgs.len(), 0);
}
}