mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-04 07:02:16 +00:00
* Create gossipsub crate - Basic template, borrowed from floodsub * Add a GossipsubConfig struct and set up basic structures in the Gossipsub struct * Begin implementation of join. Adds get_random_peers helper function and adds tests * Implements gossipsub leave() * Update publishMany to incorporate gossipsub mesh and fanout logic * Use the gossipsub mesh for determining peer subscription * Remove subscribed_topics field from the Gossipsub struct * Rename gossipsubconfig to ProtocolConfig * Implement the gossipsub control messages into the Codec's Encode/Decode and modifies GossipsubRpc * Modify GossipsubActions to enums for succinctness. * Modify the memcache to store Gossipsub messages * Implement control message handling. * Update control message handling to handle multiple messages. * Handle received gossipsub messages using pre-built handlers. * Remove excess connected peer hashmap * Add extra peer mapping and consistent topic naming. * Implement heartbeat, emit_gossip and send_graft_prune. * Group logic in forwarding messages. Add messages to memcache. * Add heartbeat timer and move location of helper function. * Add gossipsub the libp2p workspace, makes layer structs public * Add logging to gossipsub - Adds the log crate and implements logging macros - Specifies versions for external crates * Add example chat for debugging purposes * Implement #868 for gossipsub. * Add rust documentation to gossipsub crate. - Adds basic documentation, overview and examples to the gossipsub crate. * Re-introduce the initial heartbeat time config. This commit also adds the inject_connected test. * Add subscribe tests. - Modifies `handle_received_subscriptions` to take a reference of subscriptions - Adds `test_subscribe` - Adds `test_handle_received_subscriptions` - Adds tests for the filter in `get_random_peers` * Add Bug fixes and further testing for gossipsub. - Corrects the tuple use of topic_hashes - Corrects JOIN logic around fanout and adding peers to the mesh - Adds test_unsubscribe - Adds test_join * Rename GossipsubMessage::msg_id -> id * Add bug fix for handling disconnected peers. * Implements (partially) #889 for Gossipsub. * handle_iwant event count tests * handle_ihave event count tests * Move layer.rs tests into separate file. * Implement clippy suggestions for gossipsub. * Modify control message tests for specific types. * Implement builder pattern for GossipsubConfig. As suggested by @twittner - The builder pattern for building GossipsubConfig struct is implemented. * Package version updates as suggested by @twittner. * Correct line lengths in gossipsub. * Correct braces in found by @twittner. * Implement @twittner's suggestions. - Uses `HashSet` where applicable - Update `FnvHashMap` to standard `HashMap` - Uses `min` function in code simplification. * Add NodeList struct to clarify topic_peers. * Cleaner handling of messagelist Co-Authored-By: AgeManning <Age@AgeManning.com> * Cleaner handling of added peers. Co-Authored-By: AgeManning <Age@AgeManning.com> * handle_prune peer removed test * basic grafting tests * multiple topic grafting test * Convert &vec to slice. Co-Authored-By: AgeManning <Age@AgeManning.com> * Convert to lazy insert. Co-Authored-By: AgeManning <Age@AgeManning.com> * Cleaner topic handling. Co-Authored-By: AgeManning <Age@AgeManning.com> * control pool piggybacking using HashMap.drain() in control_pool_flush going to squash this * Add Debug derives to gossipsub and correct tests. * changes from PR squash this all tests passing, but still some that need to be reconsidered test reform * Implements Arc for GossipsubRpc events * Remove support for floodsub nodes * Reconnected to disconnected peers, to mitigate timeout * Use ReadOne WriteOne with configurable max gossip sizes * Remove length delimination from RPC encoding * Prevent peer duplication in mesh * Allow oneshot handler's inactivity_timeout to be configurable * Correct peer duplication in mesh bug * Remove auto-reconnect to allow for user-level disconnects * Single long-lived inbound/outbound streams to match go implementation * Allow gossipsub topics to be optionally hashable * Improves gossipsub stream handling - Corrects the handler's keep alive. - Correct the chat example. - Instantly add peers to the mesh on subscription if the mesh is low. * Allows message validation in gossipsub * Replaces Cuckoofilter with LRUCache The false positive rate was unacceptable for rejecting messages. * Renames configuration parameter and corrects logic * Removes peer from fanout on disconnection * Add publish and fanout tests * Apply @mxinden suggestions * Resend message if outbound stream negotiated - Downgrades log warnings * Implement further reviewer suggestions - Created associated functions to avoid unnecessary cloning - Messages are rejected if their sequence numbers are not u64 - `GossipsbuConfigBuilder` has the same defaults as `GossipsubConfig` - Miscellaneous typos * Add MessageId type and remove unnecessary comments * Add a return value to propagate_message function * Adds user-customised gossipsub message ids * Adds the message id to GossipsubEvent * Implement Debug for GossipsubConfig * protocols/gossipsub: Add basic smoke test Implement a basic smoke test that: 1. Builds a fully connected graph of size N. 2. Subscribes each node to the same topic. 3. Publishes a single message. 4. Waits for all nodes to receive the above message. N and the structure of the graph are reproducibly randomized via Quickcheck. * Corrections pointed out by @mxinden * Add option to remove source id publishing * protocols/gossipsub/tests/smoke: Remove unused variable * Merge latest master * protocols/gossipsub: Move to stable futures * examples/gossipsub-chat.rs: Move to stable futures * protocols/gossipsub/src/behaviour/tests: Update to stable futures * protocols/gossipsub/tests: Update to stable futures * protocols/gossipsub: Log substream errors * protocols/gossipsub: Log outbound substream errors * Remove rust-fmt formatting * Shift to prost for protobuf compiling * Use wasm_timer for wasm compatibility Co-authored-by: Grant Wuerker <gwuerker@gmail.com> Co-authored-by: Toralf Wittner <tw@dtex.org> Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com> Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
315 lines
9.6 KiB
Rust
315 lines
9.6 KiB
Rust
// Copyright 2020 Sigma Prime Pty Ltd.
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
// copy of this software and associated documentation files (the "Software"),
|
|
// to deal in the Software without restriction, including without limitation
|
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
// and/or sell copies of the Software, and to permit persons to whom the
|
|
// Software is furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
// DEALINGS IN THE SOFTWARE.
|
|
|
|
extern crate fnv;
|
|
|
|
use crate::protocol::{GossipsubMessage, MessageId};
|
|
use crate::topic::TopicHash;
|
|
use std::collections::HashMap;
|
|
|
|
/// CacheEntry stored in the history.
|
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
|
pub struct CacheEntry {
|
|
mid: MessageId,
|
|
topics: Vec<TopicHash>,
|
|
}
|
|
|
|
/// MessageCache struct holding history of messages.
|
|
#[derive(Clone)]
|
|
pub struct MessageCache {
|
|
msgs: HashMap<MessageId, GossipsubMessage>,
|
|
history: Vec<Vec<CacheEntry>>,
|
|
gossip: usize,
|
|
msg_id: fn(&GossipsubMessage) -> MessageId,
|
|
}
|
|
|
|
/// Implementation of the MessageCache.
|
|
impl MessageCache {
|
|
pub fn new(
|
|
gossip: usize,
|
|
history_capacity: usize,
|
|
msg_id: fn(&GossipsubMessage) -> MessageId,
|
|
) -> MessageCache {
|
|
MessageCache {
|
|
gossip,
|
|
msgs: HashMap::default(),
|
|
history: vec![Vec::new(); history_capacity],
|
|
msg_id,
|
|
}
|
|
}
|
|
|
|
/// Creates a `MessageCache` with a default message id function.
|
|
#[allow(dead_code)]
|
|
pub fn new_default(gossip: usize, history_capacity: usize) -> MessageCache {
|
|
let default_id = |message: &GossipsubMessage| {
|
|
// default message id is: source + sequence number
|
|
let mut source_string = message.source.to_base58();
|
|
source_string.push_str(&message.sequence_number.to_string());
|
|
MessageId(source_string)
|
|
};
|
|
MessageCache {
|
|
gossip,
|
|
msgs: HashMap::default(),
|
|
history: vec![Vec::new(); history_capacity],
|
|
msg_id: default_id,
|
|
}
|
|
}
|
|
|
|
/// Put a message into the memory cache
|
|
pub fn put(&mut self, msg: GossipsubMessage) {
|
|
let message_id = (self.msg_id)(&msg);
|
|
let cache_entry = CacheEntry {
|
|
mid: message_id.clone(),
|
|
topics: msg.topics.clone(),
|
|
};
|
|
|
|
self.msgs.insert(message_id, msg);
|
|
|
|
self.history[0].push(cache_entry);
|
|
}
|
|
|
|
/// Get a message with `message_id`
|
|
pub fn get(&self, message_id: &MessageId) -> Option<&GossipsubMessage> {
|
|
self.msgs.get(message_id)
|
|
}
|
|
|
|
/// Get a list of GossipIds for a given topic
|
|
pub fn get_gossip_ids(&self, topic: &TopicHash) -> Vec<MessageId> {
|
|
self.history[..self.gossip]
|
|
.iter()
|
|
.fold(vec![], |mut current_entries, entries| {
|
|
// search for entries with desired topic
|
|
let mut found_entries: Vec<MessageId> = entries
|
|
.iter()
|
|
.filter_map(|entry| {
|
|
if entry.topics.iter().any(|t| t == topic) {
|
|
Some(entry.mid.clone())
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
// generate the list
|
|
current_entries.append(&mut found_entries);
|
|
current_entries
|
|
})
|
|
}
|
|
|
|
/// Shift the history array down one and delete messages associated with the
|
|
/// last entry
|
|
pub fn shift(&mut self) {
|
|
for entry in self.history.pop().expect("history is always > 1") {
|
|
self.msgs.remove(&entry.mid);
|
|
}
|
|
|
|
// Insert an empty vec in position 0
|
|
self.history.insert(0, Vec::new());
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::{Topic, TopicHash};
|
|
use libp2p_core::PeerId;
|
|
|
|
fn gen_testm(x: u64, topics: Vec<TopicHash>) -> GossipsubMessage {
|
|
let u8x: u8 = x as u8;
|
|
let source = PeerId::random();
|
|
let data: Vec<u8> = vec![u8x];
|
|
let sequence_number = x;
|
|
|
|
let m = GossipsubMessage {
|
|
source,
|
|
data,
|
|
sequence_number,
|
|
topics,
|
|
};
|
|
m
|
|
}
|
|
|
|
#[test]
|
|
/// Test that the message cache can be created.
|
|
fn test_new_cache() {
|
|
let default_id = |message: &GossipsubMessage| {
|
|
// default message id is: source + sequence number
|
|
let mut source_string = message.source.to_base58();
|
|
source_string.push_str(&message.sequence_number.to_string());
|
|
MessageId(source_string)
|
|
};
|
|
let x: usize = 3;
|
|
let mc = MessageCache::new(x, 5, default_id);
|
|
|
|
assert_eq!(mc.gossip, x);
|
|
}
|
|
|
|
#[test]
|
|
/// Test you can put one message and get one.
|
|
fn test_put_get_one() {
|
|
let mut mc = MessageCache::new_default(10, 15);
|
|
|
|
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
|
|
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
|
|
|
|
let m = gen_testm(10, vec![topic1_hash, topic2_hash]);
|
|
|
|
mc.put(m.clone());
|
|
|
|
assert!(mc.history[0].len() == 1);
|
|
|
|
let fetched = mc.get(&(mc.msg_id)(&m));
|
|
|
|
assert_eq!(fetched.is_none(), false);
|
|
assert_eq!(fetched.is_some(), true);
|
|
|
|
// Make sure it is the same fetched message
|
|
match fetched {
|
|
Some(x) => assert_eq!(*x, m),
|
|
_ => assert!(false),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
/// Test attempting to 'get' with a wrong id.
|
|
fn test_get_wrong() {
|
|
let mut mc = MessageCache::new_default(10, 15);
|
|
|
|
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
|
|
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
|
|
|
|
let m = gen_testm(10, vec![topic1_hash, topic2_hash]);
|
|
|
|
mc.put(m.clone());
|
|
|
|
// Try to get an incorrect ID
|
|
let wrong_id = MessageId(String::from("wrongid"));
|
|
let fetched = mc.get(&wrong_id);
|
|
assert_eq!(fetched.is_none(), true);
|
|
}
|
|
|
|
#[test]
|
|
/// Test attempting to 'get' empty message cache.
|
|
fn test_get_empty() {
|
|
let mc = MessageCache::new_default(10, 15);
|
|
|
|
// Try to get an incorrect ID
|
|
let wrong_string = MessageId(String::from("imempty"));
|
|
let fetched = mc.get(&wrong_string);
|
|
assert_eq!(fetched.is_none(), true);
|
|
}
|
|
|
|
#[test]
|
|
/// Test adding a message with no topics.
|
|
fn test_no_topic_put() {
|
|
let mut mc = MessageCache::new_default(3, 5);
|
|
|
|
// Build the message
|
|
let m = gen_testm(1, vec![]);
|
|
mc.put(m.clone());
|
|
|
|
let fetched = mc.get(&(mc.msg_id)(&m));
|
|
|
|
// Make sure it is the same fetched message
|
|
match fetched {
|
|
Some(x) => assert_eq!(*x, m),
|
|
_ => assert!(false),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
/// Test shift mechanism.
|
|
fn test_shift() {
|
|
let mut mc = MessageCache::new_default(1, 5);
|
|
|
|
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
|
|
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
|
|
|
|
// Build the message
|
|
for i in 0..10 {
|
|
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
|
|
mc.put(m.clone());
|
|
}
|
|
|
|
mc.shift();
|
|
|
|
// Ensure the shift occurred
|
|
assert!(mc.history[0].len() == 0);
|
|
assert!(mc.history[1].len() == 10);
|
|
|
|
// Make sure no messages deleted
|
|
assert!(mc.msgs.len() == 10);
|
|
}
|
|
|
|
#[test]
|
|
/// Test Shift with no additions.
|
|
fn test_empty_shift() {
|
|
let mut mc = MessageCache::new_default(1, 5);
|
|
|
|
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
|
|
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
|
|
// Build the message
|
|
for i in 0..10 {
|
|
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
|
|
mc.put(m.clone());
|
|
}
|
|
|
|
mc.shift();
|
|
|
|
// Ensure the shift occurred
|
|
assert!(mc.history[0].len() == 0);
|
|
assert!(mc.history[1].len() == 10);
|
|
|
|
mc.shift();
|
|
|
|
assert!(mc.history[2].len() == 10);
|
|
assert!(mc.history[1].len() == 0);
|
|
assert!(mc.history[0].len() == 0);
|
|
}
|
|
|
|
#[test]
|
|
/// Test shift to see if the last history messages are removed.
|
|
fn test_remove_last_from_shift() {
|
|
let mut mc = MessageCache::new_default(4, 5);
|
|
|
|
let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
|
|
let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
|
|
// Build the message
|
|
for i in 0..10 {
|
|
let m = gen_testm(i, vec![topic1_hash.clone(), topic2_hash.clone()]);
|
|
mc.put(m.clone());
|
|
}
|
|
|
|
// Shift right until deleting messages
|
|
mc.shift();
|
|
mc.shift();
|
|
mc.shift();
|
|
mc.shift();
|
|
|
|
assert_eq!(mc.history[mc.history.len() - 1].len(), 10);
|
|
|
|
// Shift and delete the messages
|
|
mc.shift();
|
|
assert_eq!(mc.history[mc.history.len() - 1].len(), 0);
|
|
assert_eq!(mc.history[0].len(), 0);
|
|
assert_eq!(mc.msgs.len(), 0);
|
|
}
|
|
}
|