// Copyright 2020 Sigma Prime Pty Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::topic::TopicHash; use crate::types::{MessageId, RawGossipsubMessage}; use libp2p_core::PeerId; use log::{debug, trace}; use std::collections::hash_map::Entry; use std::fmt::Debug; use std::{ collections::{HashMap, HashSet}, fmt, }; /// CacheEntry stored in the history. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct CacheEntry { mid: MessageId, topic: TopicHash, } /// MessageCache struct holding history of messages. #[derive(Clone)] pub struct MessageCache { msgs: HashMap)>, /// For every message and peer the number of times this peer asked for the message iwant_counts: HashMap>, history: Vec>, /// The number of indices in the cache history used for gossiping. That means that a message /// won't get gossiped anymore when shift got called `gossip` many times after inserting the /// message in the cache. gossip: usize, } impl fmt::Debug for MessageCache { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("MessageCache") .field("msgs", &self.msgs) .field("history", &self.history) .field("gossip", &self.gossip) .finish() } } /// Implementation of the MessageCache. impl MessageCache { pub fn new(gossip: usize, history_capacity: usize) -> Self { MessageCache { gossip, msgs: HashMap::default(), iwant_counts: HashMap::default(), history: vec![Vec::new(); history_capacity], } } /// Put a message into the memory cache. /// /// Returns true if the message didn't already exist in the cache. pub fn put(&mut self, message_id: &MessageId, msg: RawGossipsubMessage) -> bool { match self.msgs.entry(message_id.clone()) { Entry::Occupied(_) => { // Don't add duplicate entries to the cache. false } Entry::Vacant(entry) => { let cache_entry = CacheEntry { mid: message_id.clone(), topic: msg.topic.clone(), }; entry.insert((msg, HashSet::default())); self.history[0].push(cache_entry); trace!("Put message {:?} in mcache", message_id); true } } } /// Keeps track of peers we know have received the message to prevent forwarding to said peers. pub fn observe_duplicate(&mut self, message_id: &MessageId, source: &PeerId) { if let Some((message, originating_peers)) = self.msgs.get_mut(message_id) { // if the message is already validated, we don't need to store extra peers sending us // duplicates as the message has already been forwarded if message.validated { return; } originating_peers.insert(*source); } } /// Get a message with `message_id` #[cfg(test)] pub fn get(&self, message_id: &MessageId) -> Option<&RawGossipsubMessage> { self.msgs.get(message_id).map(|(message, _)| message) } /// Increases the iwant count for the given message by one and returns the message together /// with the iwant if the message exists. pub fn get_with_iwant_counts( &mut self, message_id: &MessageId, peer: &PeerId, ) -> Option<(&RawGossipsubMessage, u32)> { let iwant_counts = &mut self.iwant_counts; self.msgs.get(message_id).and_then(|(message, _)| { if !message.validated { None } else { Some((message, { let count = iwant_counts .entry(message_id.clone()) .or_default() .entry(*peer) .or_default(); *count += 1; *count })) } }) } /// Gets a message with [`MessageId`] and tags it as validated. /// This function also returns the known peers that have sent us this message. This is used to /// prevent us sending redundant messages to peers who have already propagated it. pub fn validate( &mut self, message_id: &MessageId, ) -> Option<(&RawGossipsubMessage, HashSet)> { self.msgs.get_mut(message_id).map(|(message, known_peers)| { message.validated = true; // Clear the known peers list (after a message is validated, it is forwarded and we no // longer need to store the originating peers). let originating_peers = std::mem::replace(known_peers, HashSet::new()); (&*message, originating_peers) }) } /// Get a list of [`MessageId`]s for a given topic. pub fn get_gossip_message_ids(&self, topic: &TopicHash) -> Vec { self.history[..self.gossip] .iter() .fold(vec![], |mut current_entries, entries| { // search for entries with desired topic let mut found_entries: Vec = entries .iter() .filter_map(|entry| { if &entry.topic == topic { let mid = &entry.mid; // Only gossip validated messages if let Some(true) = self.msgs.get(mid).map(|(msg, _)| msg.validated) { Some(mid.clone()) } else { None } } else { None } }) .collect(); // generate the list current_entries.append(&mut found_entries); current_entries }) } /// Shift the history array down one and delete messages associated with the /// last entry. pub fn shift(&mut self) { for entry in self.history.pop().expect("history is always > 1") { if let Some((msg, _)) = self.msgs.remove(&entry.mid) { if !msg.validated { // If GossipsubConfig::validate_messages is true, the implementing // application has to ensure that Gossipsub::validate_message gets called for // each received message within the cache timeout time." debug!( "The message with id {} got removed from the cache without being validated.", &entry.mid ); } } trace!("Remove message from the cache: {}", &entry.mid); self.iwant_counts.remove(&entry.mid); } // Insert an empty vec in position 0 self.history.insert(0, Vec::new()); } /// Removes a message from the cache and returns it if existent pub fn remove( &mut self, message_id: &MessageId, ) -> Option<(RawGossipsubMessage, HashSet)> { //We only remove the message from msgs and iwant_count and keep the message_id in the // history vector. Zhe id in the history vector will simply be ignored on popping. self.iwant_counts.remove(message_id); self.msgs.remove(message_id) } } #[cfg(test)] mod tests { use super::*; use crate::types::RawGossipsubMessage; use crate::{IdentTopic as Topic, TopicHash}; use libp2p_core::PeerId; fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawGossipsubMessage) { let default_id = |message: &RawGossipsubMessage| { // default message id is: source + sequence number let mut source_string = message.source.as_ref().unwrap().to_base58(); source_string.push_str(&message.sequence_number.unwrap().to_string()); MessageId::from(source_string) }; let u8x: u8 = x as u8; let source = Some(PeerId::random()); let data: Vec = vec![u8x]; let sequence_number = Some(x); let m = RawGossipsubMessage { source, data, sequence_number, topic, signature: None, key: None, validated: false, }; let id = default_id(&m); (id, m) } fn new_cache(gossip_size: usize, history: usize) -> MessageCache { MessageCache::new(gossip_size, history) } #[test] /// Test that the message cache can be created. fn test_new_cache() { let x: usize = 3; let mc = new_cache(x, 5); assert_eq!(mc.gossip, x); } #[test] /// Test you can put one message and get one. fn test_put_get_one() { let mut mc = new_cache(10, 15); let topic1_hash = Topic::new("topic1").hash().clone(); let (id, m) = gen_testm(10, topic1_hash); mc.put(&id, m.clone()); assert!(mc.history[0].len() == 1); let fetched = mc.get(&id); assert_eq!(fetched.is_none(), false); assert_eq!(fetched.is_some(), true); // Make sure it is the same fetched message match fetched { Some(x) => assert_eq!(*x, m), _ => assert!(false), } } #[test] /// Test attempting to 'get' with a wrong id. fn test_get_wrong() { let mut mc = new_cache(10, 15); let topic1_hash = Topic::new("topic1").hash().clone(); let (id, m) = gen_testm(10, topic1_hash); mc.put(&id, m.clone()); // Try to get an incorrect ID let wrong_id = MessageId::new(b"wrongid"); let fetched = mc.get(&wrong_id); assert_eq!(fetched.is_none(), true); } #[test] /// Test attempting to 'get' empty message cache. fn test_get_empty() { let mc = new_cache(10, 15); // Try to get an incorrect ID let wrong_string = MessageId::new(b"imempty"); let fetched = mc.get(&wrong_string); assert_eq!(fetched.is_none(), true); } #[test] /// Test shift mechanism. fn test_shift() { let mut mc = new_cache(1, 5); let topic1_hash = Topic::new("topic1").hash().clone(); // Build the message for i in 0..10 { let (id, m) = gen_testm(i, topic1_hash.clone()); mc.put(&id, m.clone()); } mc.shift(); // Ensure the shift occurred assert!(mc.history[0].len() == 0); assert!(mc.history[1].len() == 10); // Make sure no messages deleted assert!(mc.msgs.len() == 10); } #[test] /// Test Shift with no additions. fn test_empty_shift() { let mut mc = new_cache(1, 5); let topic1_hash = Topic::new("topic1").hash().clone(); // Build the message for i in 0..10 { let (id, m) = gen_testm(i, topic1_hash.clone()); mc.put(&id, m.clone()); } mc.shift(); // Ensure the shift occurred assert!(mc.history[0].len() == 0); assert!(mc.history[1].len() == 10); mc.shift(); assert!(mc.history[2].len() == 10); assert!(mc.history[1].len() == 0); assert!(mc.history[0].len() == 0); } #[test] /// Test shift to see if the last history messages are removed. fn test_remove_last_from_shift() { let mut mc = new_cache(4, 5); let topic1_hash = Topic::new("topic1").hash().clone(); // Build the message for i in 0..10 { let (id, m) = gen_testm(i, topic1_hash.clone()); mc.put(&id, m.clone()); } // Shift right until deleting messages mc.shift(); mc.shift(); mc.shift(); mc.shift(); assert_eq!(mc.history[mc.history.len() - 1].len(), 10); // Shift and delete the messages mc.shift(); assert_eq!(mc.history[mc.history.len() - 1].len(), 0); assert_eq!(mc.history[0].len(), 0); assert_eq!(mc.msgs.len(), 0); } }