protocols/gossipsub: Add message signing and verification configuration (#1583)

This adds optional message signing and verification to the gossipsub protocol as
per the libp2p specifications.

In addition this commit:

- Removes the LruCache received cache and simply uses the memcache in it's
  place.

- Send subscriptions to all peers

- Prevent invalid messages from being gossiped

- Send grafts when subscriptions are added to the mesh

Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: Rüdiger Klaehn <rklaehn@protonmail.com>
Co-authored-by: Rüdiger Klaehn <rklaehn@gmail.com>
This commit is contained in:
Age Manning
2020-08-03 18:13:43 +10:00
committed by GitHub
parent d0ad105433
commit d8ad7bddf5
14 changed files with 1358 additions and 568 deletions

View File

@ -50,15 +50,15 @@ use async_std::{io, task};
use env_logger::{Builder, Env}; use env_logger::{Builder, Env};
use futures::prelude::*; use futures::prelude::*;
use libp2p::gossipsub::protocol::MessageId; use libp2p::gossipsub::protocol::MessageId;
use libp2p::gossipsub::{GossipsubEvent, GossipsubMessage, Topic}; use libp2p::gossipsub::{GossipsubEvent, GossipsubMessage, MessageAuthenticity, Topic};
use libp2p::{ use libp2p::{gossipsub, identity, PeerId};
gossipsub, identity,
PeerId,
};
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::time::Duration; use std::time::Duration;
use std::{error::Error, task::{Context, Poll}}; use std::{
error::Error,
task::{Context, Poll},
};
fn main() -> Result<(), Box<dyn Error>> { fn main() -> Result<(), Box<dyn Error>> {
Builder::from_env(Env::default().default_filter_or("info")).init(); Builder::from_env(Env::default().default_filter_or("info")).init();
@ -69,7 +69,7 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Local peer id: {:?}", local_peer_id); println!("Local peer id: {:?}", local_peer_id);
// Set up an encrypted TCP Transport over the Mplex and Yamux protocols // Set up an encrypted TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::build_development_transport(local_key)?; let transport = libp2p::build_development_transport(local_key.clone())?;
// Create a Gossipsub topic // Create a Gossipsub topic
let topic = Topic::new("test-net".into()); let topic = Topic::new("test-net".into());
@ -83,7 +83,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let message_id_fn = |message: &GossipsubMessage| { let message_id_fn = |message: &GossipsubMessage| {
let mut s = DefaultHasher::new(); let mut s = DefaultHasher::new();
message.data.hash(&mut s); message.data.hash(&mut s);
MessageId(s.finish().to_string()) MessageId::from(s.finish().to_string())
}; };
// set custom gossipsub // set custom gossipsub
@ -93,7 +93,8 @@ fn main() -> Result<(), Box<dyn Error>> {
//same content will be propagated. //same content will be propagated.
.build(); .build();
// build a gossipsub network behaviour // build a gossipsub network behaviour
let mut gossipsub = gossipsub::Gossipsub::new(local_peer_id.clone(), gossipsub_config); let mut gossipsub =
gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config);
gossipsub.subscribe(topic.clone()); gossipsub.subscribe(topic.clone());
libp2p::Swarm::new(transport, gossipsub, local_peer_id) libp2p::Swarm::new(transport, gossipsub, local_peer_id)
}; };
@ -120,11 +121,13 @@ fn main() -> Result<(), Box<dyn Error>> {
let mut listening = false; let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop { loop {
match stdin.try_poll_next_unpin(cx)? { if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()), Poll::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"), Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break, Poll::Pending => break,
}; } {
println!("Publish error: {:?}", e);
}
} }
loop { loop {

View File

@ -35,7 +35,7 @@ use async_std::{io, task};
use futures::{future, prelude::*}; use futures::{future, prelude::*};
use libp2p::{ use libp2p::{
core::{either::EitherTransport, transport::upgrade::Version, StreamMuxer}, core::{either::EitherTransport, transport::upgrade::Version, StreamMuxer},
gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent}, gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity},
identify::{Identify, IdentifyEvent}, identify::{Identify, IdentifyEvent},
identity, identity,
multiaddr::Protocol, multiaddr::Protocol,
@ -178,18 +178,14 @@ fn main() -> Result<(), Box<dyn Error>> {
ping: Ping, ping: Ping,
} }
impl NetworkBehaviourEventProcess<IdentifyEvent> impl NetworkBehaviourEventProcess<IdentifyEvent> for MyBehaviour {
for MyBehaviour
{
// Called when `identify` produces an event. // Called when `identify` produces an event.
fn inject_event(&mut self, event: IdentifyEvent) { fn inject_event(&mut self, event: IdentifyEvent) {
println!("identify: {:?}", event); println!("identify: {:?}", event);
} }
} }
impl NetworkBehaviourEventProcess<GossipsubEvent> impl NetworkBehaviourEventProcess<GossipsubEvent> for MyBehaviour {
for MyBehaviour
{
// Called when `gossipsub` produces an event. // Called when `gossipsub` produces an event.
fn inject_event(&mut self, event: GossipsubEvent) { fn inject_event(&mut self, event: GossipsubEvent) {
match event { match event {
@ -204,9 +200,7 @@ fn main() -> Result<(), Box<dyn Error>> {
} }
} }
impl NetworkBehaviourEventProcess<PingEvent> impl NetworkBehaviourEventProcess<PingEvent> for MyBehaviour {
for MyBehaviour
{
// Called when `ping` produces an event. // Called when `ping` produces an event.
fn inject_event(&mut self, event: PingEvent) { fn inject_event(&mut self, event: PingEvent) {
use ping::handler::{PingFailure, PingSuccess}; use ping::handler::{PingFailure, PingSuccess};
@ -245,11 +239,11 @@ fn main() -> Result<(), Box<dyn Error>> {
// Create a Swarm to manage peers and events // Create a Swarm to manage peers and events
let mut swarm = { let mut swarm = {
let gossipsub_config = GossipsubConfigBuilder::default() let gossipsub_config = GossipsubConfigBuilder::new()
.max_transmit_size(262144) .max_transmit_size(262144)
.build(); .build();
let mut behaviour = MyBehaviour { let mut behaviour = MyBehaviour {
gossipsub: Gossipsub::new(local_peer_id.clone(), gossipsub_config), gossipsub: Gossipsub::new(MessageAuthenticity::Signed(local_key.clone()), gossipsub_config),
identify: Identify::new( identify: Identify::new(
"/ipfs/0.1.0".into(), "/ipfs/0.1.0".into(),
"rust-ipfs-example".into(), "rust-ipfs-example".into(),
@ -280,12 +274,14 @@ fn main() -> Result<(), Box<dyn Error>> {
let mut listening = false; let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop { loop {
match stdin.try_poll_next_unpin(cx)? { if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => { Poll::Ready(Some(line)) => {
swarm.gossipsub.publish(&gossipsub_topic, line.as_bytes()); swarm.gossipsub.publish(&gossipsub_topic, line.as_bytes())
} }
Poll::Ready(None) => panic!("Stdin closed"), Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break, Poll::Pending => break,
} {
println!("Publish error: {:?}", e);
} }
} }
loop { loop {

View File

@ -23,10 +23,10 @@ unsigned-varint = { version = "0.4.0", features = ["futures-codec"] }
log = "0.4.8" log = "0.4.8"
sha2 = "0.8.1" sha2 = "0.8.1"
base64 = "0.11.0" base64 = "0.11.0"
lru = "0.4.3"
smallvec = "1.1.0" smallvec = "1.1.0"
prost = "0.6.1" prost = "0.6.1"
hex_fmt = "0.3.0" hex_fmt = "0.3.0"
lru_time_cache = "0.10.0"
[dev-dependencies] [dev-dependencies]
async-std = "1.6.2" async-std = "1.6.2"
@ -34,6 +34,7 @@ env_logger = "0.7.1"
libp2p-plaintext = { path = "../plaintext" } libp2p-plaintext = { path = "../plaintext" }
libp2p-yamux = { path = "../../muxers/yamux" } libp2p-yamux = { path = "../../muxers/yamux" }
quickcheck = "0.9.2" quickcheck = "0.9.2"
hex = "0.4.2"
[build-dependencies] [build-dependencies]
prost-build = "0.6" prost-build = "0.6"

File diff suppressed because it is too large Load Diff

View File

@ -33,10 +33,11 @@ mod tests {
topics: Vec<String>, topics: Vec<String>,
to_subscribe: bool, to_subscribe: bool,
) -> (Gossipsub, Vec<PeerId>, Vec<TopicHash>) { ) -> (Gossipsub, Vec<PeerId>, Vec<TopicHash>) {
// generate a default GossipsubConfig let keypair = libp2p_core::identity::Keypair::generate_secp256k1();
// generate a default GossipsubConfig with signing
let gs_config = GossipsubConfig::default(); let gs_config = GossipsubConfig::default();
// create a gossipsub struct // create a gossipsub struct
let mut gs: Gossipsub = Gossipsub::new(PeerId::random(), gs_config); let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Signed(keypair), gs_config);
let mut topic_hashes = vec![]; let mut topic_hashes = vec![];
@ -53,10 +54,7 @@ mod tests {
for _ in 0..peer_no { for _ in 0..peer_no {
let peer = PeerId::random(); let peer = PeerId::random();
peers.push(peer.clone()); peers.push(peer.clone());
<Gossipsub as NetworkBehaviour>::inject_connected( <Gossipsub as NetworkBehaviour>::inject_connected(&mut gs, &peer);
&mut gs,
&peer,
);
if to_subscribe { if to_subscribe {
gs.handle_received_subscriptions( gs.handle_received_subscriptions(
&topic_hashes &topic_hashes
@ -230,21 +228,23 @@ mod tests {
"Should have added 6 nodes to the mesh" "Should have added 6 nodes to the mesh"
); );
// there should be mesh_n GRAFT messages. fn collect_grafts(
let graft_messages = mut collected_grafts: Vec<GossipsubControlAction>,
gs.control_pool (_, controls): (&PeerId, &Vec<GossipsubControlAction>),
.iter() ) -> Vec<GossipsubControlAction> {
.fold(vec![], |mut collected_grafts, (_, controls)| { for c in controls.iter() {
for c in controls.iter() { match c {
match c { GossipsubControlAction::Graft { topic_hash: _ } => {
GossipsubControlAction::Graft { topic_hash: _ } => { collected_grafts.push(c.clone())
collected_grafts.push(c.clone())
}
_ => {}
}
} }
collected_grafts _ => {}
}); }
}
collected_grafts
}
// there should be mesh_n GRAFT messages.
let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts);
assert_eq!( assert_eq!(
graft_messages.len(), graft_messages.len(),
@ -254,11 +254,12 @@ mod tests {
// verify fanout nodes // verify fanout nodes
// add 3 random peers to the fanout[topic1] // add 3 random peers to the fanout[topic1]
gs.fanout.insert(topic_hashes[1].clone(), vec![]); gs.fanout
let new_peers = vec![]; .insert(topic_hashes[1].clone(), Default::default());
let new_peers: Vec<PeerId> = vec![];
for _ in 0..3 { for _ in 0..3 {
let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap(); let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap();
fanout_peers.push(PeerId::random()); fanout_peers.insert(PeerId::random());
} }
// subscribe to topic1 // subscribe to topic1
@ -272,26 +273,13 @@ mod tests {
let mesh_peers = gs.mesh.get(&topic_hashes[1]).unwrap(); let mesh_peers = gs.mesh.get(&topic_hashes[1]).unwrap();
for new_peer in new_peers { for new_peer in new_peers {
assert!( assert!(
mesh_peers.contains(new_peer), mesh_peers.contains(&new_peer),
"Fanout peer should be included in the mesh" "Fanout peer should be included in the mesh"
); );
} }
// there should now be 12 graft messages to be sent // there should now be 12 graft messages to be sent
let graft_messages = let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts);
gs.control_pool
.iter()
.fold(vec![], |mut collected_grafts, (_, controls)| {
for c in controls.iter() {
match c {
GossipsubControlAction::Graft { topic_hash: _ } => {
collected_grafts.push(c.clone())
}
_ => {}
}
}
collected_grafts
});
assert!( assert!(
graft_messages.len() == 12, graft_messages.len() == 12,
@ -315,9 +303,17 @@ mod tests {
"Subscribe should add a new entry to the mesh[topic] hashmap" "Subscribe should add a new entry to the mesh[topic] hashmap"
); );
// all peers should be subscribed to the topic
assert_eq!(
gs.topic_peers.get(&topic_hashes[0]).map(|p| p.len()),
Some(20),
"Peers should be subscribed to the topic"
);
// publish on topic // publish on topic
let publish_data = vec![0; 42]; let publish_data = vec![0; 42];
gs.publish(&Topic::new(publish_topic), publish_data); gs.publish(&Topic::new(publish_topic), publish_data)
.unwrap();
// Collect all publish messages // Collect all publish messages
let publishes = gs let publishes = gs
@ -336,8 +332,10 @@ mod tests {
let msg_id = let msg_id =
(gs.config.message_id_fn)(&publishes.first().expect("Should contain > 0 entries")); (gs.config.message_id_fn)(&publishes.first().expect("Should contain > 0 entries"));
assert!( let config = GossipsubConfig::default();
publishes.len() == 20, assert_eq!(
publishes.len(),
config.mesh_n_low,
"Should send a publish message to all known peers" "Should send a publish message to all known peers"
); );
@ -345,10 +343,6 @@ mod tests {
gs.mcache.get(&msg_id).is_some(), gs.mcache.get(&msg_id).is_some(),
"Message cache should contain published message" "Message cache should contain published message"
); );
assert!(
gs.received.get(&msg_id).is_some(),
"Received cache should contain published message"
);
} }
/// Test local node publish to unsubscribed topic /// Test local node publish to unsubscribed topic
@ -374,7 +368,8 @@ mod tests {
// Publish on unsubscribed topic // Publish on unsubscribed topic
let publish_data = vec![0; 42]; let publish_data = vec![0; 42];
gs.publish(&Topic::new(fanout_topic.clone()), publish_data); gs.publish(&Topic::new(fanout_topic.clone()), publish_data)
.unwrap();
assert_eq!( assert_eq!(
gs.fanout gs.fanout
@ -412,10 +407,6 @@ mod tests {
gs.mcache.get(&msg_id).is_some(), gs.mcache.get(&msg_id).is_some(),
"Message cache should contain published message" "Message cache should contain published message"
); );
assert!(
gs.received.get(&msg_id).is_some(),
"Received cache should contain published message"
);
} }
#[test] #[test]
@ -433,7 +424,9 @@ mod tests {
.events .events
.iter() .iter()
.filter(|e| match e { .filter(|e| match e {
NetworkBehaviourAction::NotifyHandler { .. } => true, NetworkBehaviourAction::NotifyHandler { event, .. } => {
!event.subscriptions.is_empty()
}
_ => false, _ => false,
}) })
.collect(); .collect();
@ -461,7 +454,7 @@ mod tests {
for peer in peers { for peer in peers {
let known_topics = gs.peer_topics.get(&peer).unwrap(); let known_topics = gs.peer_topics.get(&peer).unwrap();
assert!( assert!(
known_topics == &topic_hashes, known_topics == &topic_hashes.iter().cloned().collect(),
"The topics for each node should all topics" "The topics for each node should all topics"
); );
} }
@ -508,12 +501,12 @@ mod tests {
let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
assert!( assert!(
peer_topics == topic_hashes[..3].to_vec(), peer_topics == topic_hashes.iter().take(3).cloned().collect(),
"First peer should be subscribed to three topics" "First peer should be subscribed to three topics"
); );
let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone(); let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone();
assert!( assert!(
peer_topics == topic_hashes[..3].to_vec(), peer_topics == topic_hashes.iter().take(3).cloned().collect(),
"Second peer should be subscribed to three topics" "Second peer should be subscribed to three topics"
); );
@ -525,7 +518,7 @@ mod tests {
for topic_hash in topic_hashes[..3].iter() { for topic_hash in topic_hashes[..3].iter() {
let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone(); let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone();
assert!( assert!(
topic_peers == peers[..2].to_vec(), topic_peers == peers[..2].into_iter().cloned().collect(),
"Two peers should be added to the first three topics" "Two peers should be added to the first three topics"
); );
} }
@ -542,13 +535,13 @@ mod tests {
let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
assert!( assert!(
peer_topics == topic_hashes[1..3].to_vec(), peer_topics == topic_hashes[1..3].into_iter().cloned().collect(),
"Peer should be subscribed to two topics" "Peer should be subscribed to two topics"
); );
let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment
assert!( assert!(
topic_peers == peers[1..2].to_vec(), topic_peers == peers[1..2].into_iter().cloned().collect(),
"Only the second peers should be in the first topic" "Only the second peers should be in the first topic"
); );
} }
@ -557,9 +550,10 @@ mod tests {
/// Test Gossipsub.get_random_peers() function /// Test Gossipsub.get_random_peers() function
fn test_get_random_peers() { fn test_get_random_peers() {
// generate a default GossipsubConfig // generate a default GossipsubConfig
let gs_config = GossipsubConfig::default(); let mut gs_config = GossipsubConfig::default();
gs_config.validation_mode = ValidationMode::Anonymous;
// create a gossipsub struct // create a gossipsub struct
let mut gs: Gossipsub = Gossipsub::new(PeerId::random(), gs_config); let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gs_config);
// create a topic and fill it with some peers // create a topic and fill it with some peers
let topic_hash = Topic::new("Test".into()).no_hash().clone(); let topic_hash = Topic::new("Test".into()).no_hash().clone();
@ -568,30 +562,31 @@ mod tests {
peers.push(PeerId::random()) peers.push(PeerId::random())
} }
gs.topic_peers.insert(topic_hash.clone(), peers.clone()); gs.topic_peers
.insert(topic_hash.clone(), peers.iter().cloned().collect());
let random_peers = let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, |_| true);
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, |_| true); assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned");
assert!(random_peers.len() == 5, "Expected 5 peers to be returned"); let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 30, |_| true);
let random_peers =
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 30, |_| true);
assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(random_peers == peers, "Expected no shuffling"); assert!(
let random_peers = random_peers == peers.iter().cloned().collect(),
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 20, |_| true); "Expected no shuffling"
);
let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 20, |_| true);
assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(random_peers == peers, "Expected no shuffling"); assert!(
let random_peers = random_peers == peers.iter().cloned().collect(),
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 0, |_| true); "Expected no shuffling"
);
let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 0, |_| true);
assert!(random_peers.len() == 0, "Expected 0 peers to be returned"); assert!(random_peers.len() == 0, "Expected 0 peers to be returned");
// test the filter // test the filter
let random_peers = let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, |_| false);
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, |_| false);
assert!(random_peers.len() == 0, "Expected 0 peers to be returned"); assert!(random_peers.len() == 0, "Expected 0 peers to be returned");
let random_peers = let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 10, {
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 10, { |peer| peers.contains(peer)
|peer| peers.contains(peer) });
});
assert!(random_peers.len() == 10, "Expected 10 peers to be returned"); assert!(random_peers.len() == 10, "Expected 10 peers to be returned");
} }
@ -603,10 +598,13 @@ mod tests {
let id = gs.config.message_id_fn; let id = gs.config.message_id_fn;
let message = GossipsubMessage { let message = GossipsubMessage {
source: peers[11].clone(), source: Some(peers[11].clone()),
data: vec![1, 2, 3, 4], data: vec![1, 2, 3, 4],
sequence_number: 1u64, sequence_number: Some(1u64),
topics: Vec::new(), topics: Vec::new(),
signature: None,
key: None,
validated: true,
}; };
let msg_id = id(&message); let msg_id = id(&message);
gs.mcache.put(message.clone()); gs.mcache.put(message.clone());
@ -642,10 +640,13 @@ mod tests {
// perform 10 memshifts and check that it leaves the cache // perform 10 memshifts and check that it leaves the cache
for shift in 1..10 { for shift in 1..10 {
let message = GossipsubMessage { let message = GossipsubMessage {
source: peers[11].clone(), source: Some(peers[11].clone()),
data: vec![1, 2, 3, 4], data: vec![1, 2, 3, 4],
sequence_number: shift, sequence_number: Some(shift),
topics: Vec::new(), topics: Vec::new(),
signature: None,
key: None,
validated: true,
}; };
let msg_id = id(&message); let msg_id = id(&message);
gs.mcache.put(message.clone()); gs.mcache.put(message.clone());
@ -683,7 +684,7 @@ mod tests {
let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true); let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true);
let events_before = gs.events.len(); let events_before = gs.events.len();
gs.handle_iwant(&peers[7], vec![MessageId(String::from("unknown id"))]); gs.handle_iwant(&peers[7], vec![MessageId::new(b"unknown id")]);
let events_after = gs.events.len(); let events_after = gs.events.len();
assert_eq!( assert_eq!(
@ -700,10 +701,7 @@ mod tests {
gs.handle_ihave( gs.handle_ihave(
&peers[7], &peers[7],
vec![( vec![(topic_hashes[0].clone(), vec![MessageId::new(b"unknown id")])],
topic_hashes[0].clone(),
vec![MessageId(String::from("unknown id"))],
)],
); );
// check that we sent an IWANT request for `unknown id` // check that we sent an IWANT request for `unknown id`
@ -711,7 +709,7 @@ mod tests {
Some(controls) => controls.iter().any(|c| match c { Some(controls) => controls.iter().any(|c| match c {
GossipsubControlAction::IWant { message_ids } => message_ids GossipsubControlAction::IWant { message_ids } => message_ids
.iter() .iter()
.any(|m| *m.0 == String::from("unknown id")), .any(|m| *m == MessageId::new(b"unknown id")),
_ => false, _ => false,
}), }),
_ => false, _ => false,
@ -730,8 +728,7 @@ mod tests {
let (mut gs, peers, topic_hashes) = let (mut gs, peers, topic_hashes) =
build_and_inject_nodes(20, vec![String::from("topic1")], true); build_and_inject_nodes(20, vec![String::from("topic1")], true);
let msg_id = MessageId(String::from("known id")); let msg_id = MessageId::new(b"known id");
gs.received.put(msg_id.clone(), ());
let events_before = gs.events.len(); let events_before = gs.events.len();
gs.handle_ihave(&peers[7], vec![(topic_hashes[0].clone(), vec![msg_id])]); gs.handle_ihave(&peers[7], vec![(topic_hashes[0].clone(), vec![msg_id])]);
@ -754,7 +751,7 @@ mod tests {
&peers[7], &peers[7],
vec![( vec![(
TopicHash::from_raw(String::from("unsubscribed topic")), TopicHash::from_raw(String::from("unsubscribed topic")),
vec![MessageId(String::from("irrelevant id"))], vec![MessageId::new(b"irrelevant id")],
)], )],
); );
let events_after = gs.events.len(); let events_after = gs.events.len();
@ -793,7 +790,7 @@ mod tests {
); );
assert!( assert!(
gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), !gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]),
"Expected peer to have been added to mesh" "Expected peer to have been added to mesh"
); );
} }
@ -836,7 +833,8 @@ mod tests {
build_and_inject_nodes(20, vec![String::from("topic1")], true); build_and_inject_nodes(20, vec![String::from("topic1")], true);
// insert peer into our mesh for 'topic1' // insert peer into our mesh for 'topic1'
gs.mesh.insert(topic_hashes[0].clone(), peers.clone()); gs.mesh
.insert(topic_hashes[0].clone(), peers.iter().cloned().collect());
assert!( assert!(
gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]),
"Expected peer to be in mesh" "Expected peer to be in mesh"
@ -848,4 +846,53 @@ mod tests {
"Expected peer to be removed from mesh" "Expected peer to be removed from mesh"
); );
} }
#[test]
// Tests the mesh maintenance addition
fn test_mesh_addition() {
let config = GossipsubConfig::default();
// Adds mesh_low peers and PRUNE 2 giving us a deficit.
let (mut gs, peers, topics) =
build_and_inject_nodes(config.mesh_n + 1, vec!["test".into()], true);
let to_remove_peers = config.mesh_n + 1 - config.mesh_n_low - 1;
for index in 0..to_remove_peers {
gs.handle_prune(&peers[index], topics.clone());
}
// Verify the pruned peers are removed from the mesh.
assert_eq!(
gs.mesh.get(&topics[0]).unwrap().len(),
config.mesh_n_low - 1
);
// run a heartbeat
gs.heartbeat();
// Peers should be added to reach mesh_n
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n);
}
#[test]
// Tests the mesh maintenance subtraction
fn test_mesh_subtraction() {
let config = GossipsubConfig::default();
// Adds mesh_low peers and PRUNE 2 giving us a deficit.
let (mut gs, peers, topics) =
build_and_inject_nodes(config.mesh_n_high + 10, vec!["test".into()], true);
// graft all the peers
for peer in peers {
gs.handle_graft(&peer, topics.clone());
}
// run a heartbeat
gs.heartbeat();
// Peers should be removed to reach mesh_n
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n);
}
} }

View File

@ -19,12 +19,31 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::protocol::{GossipsubMessage, MessageId}; use crate::protocol::{GossipsubMessage, MessageId};
use libp2p_core::PeerId;
use std::borrow::Cow; use std::borrow::Cow;
use std::time::Duration; use std::time::Duration;
/// If the `no_source_id` flag is set, the IDENTITY_SOURCE value is used as the source of the /// The types of message validation that can be employed by gossipsub.
/// packet. #[derive(Debug, Clone)]
pub const IDENTITY_SOURCE: [u8; 3] = [0, 1, 0]; pub enum ValidationMode {
/// This is the default setting. This requires the message author to be a valid `PeerId` and to
/// be present as well as the sequence number. All messages must have valid signatures.
///
/// NOTE: This setting will reject messages from nodes using `PrivacyMode::Anonymous` and
/// all messages that do not have signatures.
Strict,
/// This setting permits messages that have no author, sequence number or signature. If any of
/// these fields exist in the message these are validated.
Permissive,
/// This setting requires the author, sequence number and signature fields of a message to be
/// empty. Any message that contains these fields is considered invalid.
Anonymous,
/// This setting does not check the author, sequence number or signature fields of incoming
/// messages. If these fields contain data, they are simply ignored.
///
/// NOTE: This setting will consider messages with invalid signatures as valid messages.
None,
}
/// Configuration parameters that define the performance of the gossipsub network. /// Configuration parameters that define the performance of the gossipsub network.
#[derive(Clone)] #[derive(Clone)]
@ -42,7 +61,7 @@ pub struct GossipsubConfig {
/// Target number of peers for the mesh network (D in the spec, default is 6). /// Target number of peers for the mesh network (D in the spec, default is 6).
pub mesh_n: usize, pub mesh_n: usize,
/// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 4). /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 5).
pub mesh_n_low: usize, pub mesh_n_low: usize,
/// Maximum number of peers in mesh network before removing some (D_high in the spec, default /// Maximum number of peers in mesh network before removing some (D_high in the spec, default
@ -64,17 +83,26 @@ pub struct GossipsubConfig {
/// The maximum byte size for each gossip (default is 2048 bytes). /// The maximum byte size for each gossip (default is 2048 bytes).
pub max_transmit_size: usize, pub max_transmit_size: usize,
/// Duplicates are prevented by storing message id's of known messages in an LRU time cache.
/// This settings sets the time period that messages are stored in the cache. Duplicates can be
/// received if duplicate messages are sent at a time greater than this setting apart. The
/// default is 1 minute.
pub duplicate_cache_time: Duration,
/// Flag determining if gossipsub topics are hashed or sent as plain strings (default is false). /// Flag determining if gossipsub topics are hashed or sent as plain strings (default is false).
pub hash_topics: bool, pub hash_topics: bool,
/// When set, all published messages will have a 0 source `PeerId` (default is false).
pub no_source_id: bool,
/// When set to `true`, prevents automatic forwarding of all received messages. This setting /// When set to `true`, prevents automatic forwarding of all received messages. This setting
/// allows a user to validate the messages before propagating them to their peers. If set to /// allows a user to validate the messages before propagating them to their peers. If set to
/// true, the user must manually call `propagate_message()` on the behaviour to forward message /// true, the user must manually call `validate_message()` on the behaviour to forward message
/// once validated (default is false). /// once validated (default is `false`). Furthermore, the application may optionally call
pub manual_propagation: bool, /// `invalidate_message()` on the behaviour to remove the message from the memcache. The
/// default is false.
pub validate_messages: bool,
/// Determines the level of validation used when receiving messages. See [`ValidationMode`]
/// for the available types. The default is ValidationMode::Strict.
pub validation_mode: ValidationMode,
/// A user-defined function allowing the user to specify the message id of a gossipsub message. /// A user-defined function allowing the user to specify the message id of a gossipsub message.
/// The default value is to concatenate the source peer id with a sequence number. Setting this /// The default value is to concatenate the source peer id with a sequence number. Setting this
@ -94,26 +122,35 @@ impl Default for GossipsubConfig {
history_length: 5, history_length: 5,
history_gossip: 3, history_gossip: 3,
mesh_n: 6, mesh_n: 6,
mesh_n_low: 4, mesh_n_low: 5,
mesh_n_high: 12, mesh_n_high: 12,
gossip_lazy: 6, // default to mesh_n gossip_lazy: 6, // default to mesh_n
heartbeat_initial_delay: Duration::from_secs(5), heartbeat_initial_delay: Duration::from_secs(5),
heartbeat_interval: Duration::from_secs(1), heartbeat_interval: Duration::from_secs(1),
fanout_ttl: Duration::from_secs(60), fanout_ttl: Duration::from_secs(60),
max_transmit_size: 2048, max_transmit_size: 2048,
duplicate_cache_time: Duration::from_secs(60),
hash_topics: false, // default compatibility with floodsub hash_topics: false, // default compatibility with floodsub
no_source_id: false, validate_messages: false,
manual_propagation: false, validation_mode: ValidationMode::Strict,
message_id_fn: |message| { message_id_fn: |message| {
// default message id is: source + sequence number // default message id is: source + sequence number
let mut source_string = message.source.to_base58(); // NOTE: If either the peer_id or source is not provided, we set to 0;
source_string.push_str(&message.sequence_number.to_string()); let mut source_string = if let Some(peer_id) = message.source.as_ref() {
MessageId(source_string) peer_id.to_base58()
} else {
PeerId::from_bytes(vec![0, 1, 0])
.expect("Valid peer id")
.to_base58()
};
source_string.push_str(&message.sequence_number.unwrap_or_default().to_string());
MessageId::from(source_string)
}, },
} }
} }
} }
/// The builder struct for constructing a gossipsub configuration.
pub struct GossipsubConfigBuilder { pub struct GossipsubConfigBuilder {
config: GossipsubConfig, config: GossipsubConfig,
} }
@ -129,14 +166,18 @@ impl Default for GossipsubConfigBuilder {
impl GossipsubConfigBuilder { impl GossipsubConfigBuilder {
// set default values // set default values
pub fn new() -> GossipsubConfigBuilder { pub fn new() -> GossipsubConfigBuilder {
GossipsubConfigBuilder::default() GossipsubConfigBuilder {
config: GossipsubConfig::default(),
}
} }
/// The protocol id to negotiate this protocol (default is `/meshsub/1.0.0`).
pub fn protocol_id(&mut self, protocol_id: impl Into<Cow<'static, [u8]>>) -> &mut Self { pub fn protocol_id(&mut self, protocol_id: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.config.protocol_id = protocol_id.into(); self.config.protocol_id = protocol_id.into();
self self
} }
/// Number of heartbeats to keep in the `memcache` (default is 5).
pub fn history_length(&mut self, history_length: usize) -> &mut Self { pub fn history_length(&mut self, history_length: usize) -> &mut Self {
assert!( assert!(
history_length >= self.config.history_gossip, history_length >= self.config.history_gossip,
@ -146,6 +187,7 @@ impl GossipsubConfigBuilder {
self self
} }
/// Number of past heartbeats to gossip about (default is 3).
pub fn history_gossip(&mut self, history_gossip: usize) -> &mut Self { pub fn history_gossip(&mut self, history_gossip: usize) -> &mut Self {
assert!( assert!(
self.config.history_length >= history_gossip, self.config.history_length >= history_gossip,
@ -155,6 +197,7 @@ impl GossipsubConfigBuilder {
self self
} }
/// Target number of peers for the mesh network (D in the spec, default is 6).
pub fn mesh_n(&mut self, mesh_n: usize) -> &mut Self { pub fn mesh_n(&mut self, mesh_n: usize) -> &mut Self {
assert!( assert!(
self.config.mesh_n_low <= mesh_n && mesh_n <= self.config.mesh_n_high, self.config.mesh_n_low <= mesh_n && mesh_n <= self.config.mesh_n_high,
@ -164,6 +207,7 @@ impl GossipsubConfigBuilder {
self self
} }
/// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 4).
pub fn mesh_n_low(&mut self, mesh_n_low: usize) -> &mut Self { pub fn mesh_n_low(&mut self, mesh_n_low: usize) -> &mut Self {
assert!( assert!(
mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= self.config.mesh_n_high, mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= self.config.mesh_n_high,
@ -173,6 +217,8 @@ impl GossipsubConfigBuilder {
self self
} }
/// Maximum number of peers in mesh network before removing some (D_high in the spec, default
/// is 12).
pub fn mesh_n_high(&mut self, mesh_n_high: usize) -> &mut Self { pub fn mesh_n_high(&mut self, mesh_n_high: usize) -> &mut Self {
assert!( assert!(
self.config.mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= mesh_n_high, self.config.mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= mesh_n_high,
@ -182,48 +228,81 @@ impl GossipsubConfigBuilder {
self self
} }
/// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec, default is 6).
pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self { pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self {
self.config.gossip_lazy = gossip_lazy; self.config.gossip_lazy = gossip_lazy;
self self
} }
/// Initial delay in each heartbeat (default is 5 seconds).
pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self { pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self {
self.config.heartbeat_initial_delay = heartbeat_initial_delay; self.config.heartbeat_initial_delay = heartbeat_initial_delay;
self self
} }
/// Time between each heartbeat (default is 1 second).
pub fn heartbeat_interval(&mut self, heartbeat_interval: Duration) -> &mut Self { pub fn heartbeat_interval(&mut self, heartbeat_interval: Duration) -> &mut Self {
self.config.heartbeat_interval = heartbeat_interval; self.config.heartbeat_interval = heartbeat_interval;
self self
} }
/// Time to live for fanout peers (default is 60 seconds).
pub fn fanout_ttl(&mut self, fanout_ttl: Duration) -> &mut Self { pub fn fanout_ttl(&mut self, fanout_ttl: Duration) -> &mut Self {
self.config.fanout_ttl = fanout_ttl; self.config.fanout_ttl = fanout_ttl;
self self
} }
/// The maximum byte size for each gossip (default is 2048 bytes).
pub fn max_transmit_size(&mut self, max_transmit_size: usize) -> &mut Self { pub fn max_transmit_size(&mut self, max_transmit_size: usize) -> &mut Self {
self.config.max_transmit_size = max_transmit_size; self.config.max_transmit_size = max_transmit_size;
self self
} }
/// Duplicates are prevented by storing message id's of known messages in an LRU time cache.
/// This settings sets the time period that messages are stored in the cache. Duplicates can be
/// received if duplicate messages are sent at a time greater than this setting apart. The
/// default is 1 minute.
pub fn duplicate_cache_time(&mut self, cache_size: Duration) -> &mut Self {
self.config.duplicate_cache_time = cache_size;
self
}
/// When set, gossipsub topics are hashed instead of being sent as plain strings.
pub fn hash_topics(&mut self) -> &mut Self { pub fn hash_topics(&mut self) -> &mut Self {
self.config.hash_topics = true; self.config.hash_topics = true;
self self
} }
pub fn no_source_id(&mut self) -> &mut Self { /// When set, prevents automatic forwarding of all received messages. This setting
self.config.no_source_id = true; /// allows a user to validate the messages before propagating them to their peers. If set,
/// the user must manually call `validate_message()` on the behaviour to forward a message
/// once validated.
pub fn validate_messages(&mut self) -> &mut Self {
self.config.validate_messages = true;
self self
} }
pub fn manual_propagation(&mut self) -> &mut Self { /// Determines the level of validation used when receiving messages. See [`ValidationMode`]
self.config.manual_propagation = true; /// for the available types. The default is ValidationMode::Strict.
pub fn validation_mode(&mut self, validation_mode: ValidationMode) -> &mut Self {
self.config.validation_mode = validation_mode;
self self
} }
/// A user-defined function allowing the user to specify the message id of a gossipsub message.
/// The default value is to concatenate the source peer id with a sequence number. Setting this
/// parameter allows the user to address packets arbitrarily. One example is content based
/// addressing, where this function may be set to `hash(message)`. This would prevent messages
/// of the same content from being duplicated.
///
/// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as
/// the message id.
pub fn message_id_fn(&mut self, id_fn: fn(&GossipsubMessage) -> MessageId) -> &mut Self { pub fn message_id_fn(&mut self, id_fn: fn(&GossipsubMessage) -> MessageId) -> &mut Self {
self.config.message_id_fn = id_fn; self.config.message_id_fn = id_fn;
self self
} }
/// Constructs a `GossipsubConfig` from the given configuration.
pub fn build(&self) -> GossipsubConfig { pub fn build(&self) -> GossipsubConfig {
self.config.clone() self.config.clone()
} }
@ -247,9 +326,9 @@ impl std::fmt::Debug for GossipsubConfig {
let _ = builder.field("heartbeat_interval", &self.heartbeat_interval); let _ = builder.field("heartbeat_interval", &self.heartbeat_interval);
let _ = builder.field("fanout_ttl", &self.fanout_ttl); let _ = builder.field("fanout_ttl", &self.fanout_ttl);
let _ = builder.field("max_transmit_size", &self.max_transmit_size); let _ = builder.field("max_transmit_size", &self.max_transmit_size);
let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time);
let _ = builder.field("hash_topics", &self.hash_topics); let _ = builder.field("hash_topics", &self.hash_topics);
let _ = builder.field("no_source_id", &self.no_source_id); let _ = builder.field("validate_messages", &self.validate_messages);
let _ = builder.field("manual_propagation", &self.manual_propagation);
builder.finish() builder.finish()
} }
} }

View File

@ -0,0 +1,40 @@
// Copyright 2020 Sigma Prime Pty Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Error types that can result from gossipsub.
use libp2p_core::identity::error::SigningError;
/// Error associated with publishing a gossipsub message.
#[derive(Debug)]
pub enum PublishError {
/// This message has already been published.
Duplicate,
/// An error occurred whilst signing the message.
SigningError(SigningError),
/// There were no peers to send this message to.
InsufficientPeers,
}
impl From<SigningError> for PublishError {
fn from(error: SigningError) -> Self {
PublishError::SigningError(error)
}
}

View File

@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::behaviour::GossipsubRpc; use crate::behaviour::GossipsubRpc;
use crate::config::ValidationMode;
use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::protocol::{GossipsubCodec, ProtocolConfig};
use futures::prelude::*; use futures::prelude::*;
use futures_codec::Framed; use futures_codec::Framed;
@ -50,6 +51,10 @@ pub struct GossipsubHandler {
/// Queue of values that we want to send to the remote. /// Queue of values that we want to send to the remote.
send_queue: SmallVec<[GossipsubRpc; 16]>, send_queue: SmallVec<[GossipsubRpc; 16]>,
/// Flag indicating that an outbound substream is being established to prevent duplicate
/// requests.
outbound_substream_establishing: bool,
/// Flag determining whether to maintain the connection to the peer. /// Flag determining whether to maintain the connection to the peer.
keep_alive: KeepAlive, keep_alive: KeepAlive,
} }
@ -80,26 +85,20 @@ enum OutboundSubstreamState {
impl GossipsubHandler { impl GossipsubHandler {
/// Builds a new `GossipsubHandler`. /// Builds a new `GossipsubHandler`.
pub fn new(protocol_id: impl Into<Cow<'static, [u8]>>, max_transmit_size: usize) -> Self { pub fn new(
protocol_id: impl Into<Cow<'static, [u8]>>,
max_transmit_size: usize,
validation_mode: ValidationMode,
) -> Self {
GossipsubHandler { GossipsubHandler {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::new( listen_protocol: SubstreamProtocol::new(ProtocolConfig::new(
protocol_id, protocol_id,
max_transmit_size, max_transmit_size,
validation_mode,
)), )),
inbound_substream: None, inbound_substream: None,
outbound_substream: None, outbound_substream: None,
send_queue: SmallVec::new(), outbound_substream_establishing: false,
keep_alive: KeepAlive::Yes,
}
}
}
impl Default for GossipsubHandler {
fn default() -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()),
inbound_substream: None,
outbound_substream: None,
send_queue: SmallVec::new(), send_queue: SmallVec::new(),
keep_alive: KeepAlive::Yes, keep_alive: KeepAlive::Yes,
} }
@ -132,6 +131,7 @@ impl ProtocolsHandler for GossipsubHandler {
substream: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, substream: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
message: Self::OutboundOpenInfo, message: Self::OutboundOpenInfo,
) { ) {
self.outbound_substream_establishing = false;
// Should never establish a new outbound substream if one already exists. // Should never establish a new outbound substream if one already exists.
// If this happens, an outbound message is not sent. // If this happens, an outbound message is not sent.
if self.outbound_substream.is_some() { if self.outbound_substream.is_some() {
@ -154,6 +154,7 @@ impl ProtocolsHandler for GossipsubHandler {
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error, <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>, >,
) { ) {
self.outbound_substream_establishing = false;
// Ignore upgrade errors for now. // Ignore upgrade errors for now.
// If a peer doesn't support this protocol, this will just ignore them, but not disconnect // If a peer doesn't support this protocol, this will just ignore them, but not disconnect
// them. // them.
@ -175,9 +176,13 @@ impl ProtocolsHandler for GossipsubHandler {
>, >,
> { > {
// determine if we need to create the stream // determine if we need to create the stream
if !self.send_queue.is_empty() && self.outbound_substream.is_none() { if !self.send_queue.is_empty()
&& self.outbound_substream.is_none()
&& !self.outbound_substream_establishing
{
let message = self.send_queue.remove(0); let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit(); self.send_queue.shrink_to_fit();
self.outbound_substream_establishing = true;
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: self.listen_protocol.clone(), protocol: self.listen_protocol.clone(),
info: message, info: message,
@ -198,9 +203,21 @@ impl ProtocolsHandler for GossipsubHandler {
return Poll::Ready(ProtocolsHandlerEvent::Custom(message)); return Poll::Ready(ProtocolsHandlerEvent::Custom(message));
} }
Poll::Ready(Some(Err(e))) => { Poll::Ready(Some(Err(e))) => {
debug!("Inbound substream error while awaiting input: {:?}", e); match e.kind() {
self.inbound_substream = std::io::ErrorKind::InvalidData => {
Some(InboundSubstreamState::Closing(substream)); // Invalid message, ignore it and reset to waiting
warn!("Invalid message received. Error: {}", e);
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
}
_ => {
// More serious errors, close this side of the stream. If the
// peer is still around, they will re-establish their
// connection
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
}
}
} }
// peer closed the stream // peer closed the stream
Poll::Ready(None) => { Poll::Ready(None) => {
@ -242,7 +259,7 @@ impl ProtocolsHandler for GossipsubHandler {
break; break;
} }
Some(InboundSubstreamState::Poisoned) => { Some(InboundSubstreamState::Poisoned) => {
panic!("Error occurred during inbound stream processing") unreachable!("Error occurred during inbound stream processing")
} }
} }
} }
@ -338,7 +355,7 @@ impl ProtocolsHandler for GossipsubHandler {
break; break;
} }
Some(OutboundSubstreamState::Poisoned) => { Some(OutboundSubstreamState::Poisoned) => {
panic!("Error occurred during outbound stream processing") unreachable!("Error occurred during outbound stream processing")
} }
} }
} }

View File

@ -135,6 +135,7 @@
//! println!("Listening on {:?}", addr); //! println!("Listening on {:?}", addr);
//! ``` //! ```
pub mod error;
pub mod protocol; pub mod protocol;
mod behaviour; mod behaviour;
@ -147,7 +148,7 @@ mod rpc_proto {
include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs"));
} }
pub use self::behaviour::{Gossipsub, GossipsubEvent, GossipsubRpc}; pub use self::behaviour::{Gossipsub, GossipsubEvent, GossipsubRpc, MessageAuthenticity};
pub use self::config::{GossipsubConfig, GossipsubConfigBuilder}; pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode};
pub use self::protocol::{GossipsubMessage, MessageId}; pub use self::protocol::{GossipsubMessage, MessageId};
pub use self::topic::{Topic, TopicHash}; pub use self::topic::{Topic, TopicHash};

View File

@ -65,34 +65,22 @@ impl MessageCache {
} }
} }
/// Creates a `MessageCache` with a default message id function. /// Put a message into the memory cache.
#[allow(dead_code)] ///
pub fn new_default(gossip: usize, history_capacity: usize) -> MessageCache { /// Returns the message if it already exists.
let default_id = |message: &GossipsubMessage| { pub fn put(&mut self, msg: GossipsubMessage) -> Option<GossipsubMessage> {
// default message id is: source + sequence number
let mut source_string = message.source.to_base58();
source_string.push_str(&message.sequence_number.to_string());
MessageId(source_string)
};
MessageCache {
gossip,
msgs: HashMap::default(),
history: vec![Vec::new(); history_capacity],
msg_id: default_id,
}
}
/// Put a message into the memory cache
pub fn put(&mut self, msg: GossipsubMessage) {
let message_id = (self.msg_id)(&msg); let message_id = (self.msg_id)(&msg);
let cache_entry = CacheEntry { let cache_entry = CacheEntry {
mid: message_id.clone(), mid: message_id.clone(),
topics: msg.topics.clone(), topics: msg.topics.clone(),
}; };
self.msgs.insert(message_id, msg); let seen_message = self.msgs.insert(message_id, msg);
if seen_message.is_none() {
self.history[0].push(cache_entry); // Don't add duplicate entries to the cache.
self.history[0].push(cache_entry);
}
seen_message
} }
/// Get a message with `message_id` /// Get a message with `message_id`
@ -100,6 +88,14 @@ impl MessageCache {
self.msgs.get(message_id) self.msgs.get(message_id)
} }
/// Gets and validates a message with `message_id`.
pub fn validate(&mut self, message_id: &MessageId) -> Option<&GossipsubMessage> {
self.msgs.get_mut(message_id).map(|message| {
message.validated = true;
&*message
})
}
/// Get a list of GossipIds for a given topic /// Get a list of GossipIds for a given topic
pub fn get_gossip_ids(&self, topic: &TopicHash) -> Vec<MessageId> { pub fn get_gossip_ids(&self, topic: &TopicHash) -> Vec<MessageId> {
self.history[..self.gossip] self.history[..self.gossip]
@ -110,7 +106,13 @@ impl MessageCache {
.iter() .iter()
.filter_map(|entry| { .filter_map(|entry| {
if entry.topics.iter().any(|t| t == topic) { if entry.topics.iter().any(|t| t == topic) {
Some(entry.mid.clone()) let mid = &entry.mid;
// Only gossip validated messages
if let Some(true) = self.msgs.get(mid).map(|msg| msg.validated) {
Some(mid.clone())
} else {
None
}
} else { } else {
None None
} }
@ -143,30 +145,38 @@ mod tests {
fn gen_testm(x: u64, topics: Vec<TopicHash>) -> GossipsubMessage { fn gen_testm(x: u64, topics: Vec<TopicHash>) -> GossipsubMessage {
let u8x: u8 = x as u8; let u8x: u8 = x as u8;
let source = PeerId::random(); let source = Some(PeerId::random());
let data: Vec<u8> = vec![u8x]; let data: Vec<u8> = vec![u8x];
let sequence_number = x; let sequence_number = Some(x);
let m = GossipsubMessage { let m = GossipsubMessage {
source, source,
data, data,
sequence_number, sequence_number,
topics, topics,
signature: None,
key: None,
validated: true,
}; };
m m
} }
fn new_cache(gossip_size: usize, history: usize) -> MessageCache {
let default_id = |message: &GossipsubMessage| {
// default message id is: source + sequence number
let mut source_string = message.source.as_ref().unwrap().to_base58();
source_string.push_str(&message.sequence_number.unwrap().to_string());
MessageId::from(source_string)
};
MessageCache::new(gossip_size, history, default_id)
}
#[test] #[test]
/// Test that the message cache can be created. /// Test that the message cache can be created.
fn test_new_cache() { fn test_new_cache() {
let default_id = |message: &GossipsubMessage| {
// default message id is: source + sequence number
let mut source_string = message.source.to_base58();
source_string.push_str(&message.sequence_number.to_string());
MessageId(source_string)
};
let x: usize = 3; let x: usize = 3;
let mc = MessageCache::new(x, 5, default_id); let mc = new_cache(x, 5);
assert_eq!(mc.gossip, x); assert_eq!(mc.gossip, x);
} }
@ -174,7 +184,7 @@ mod tests {
#[test] #[test]
/// Test you can put one message and get one. /// Test you can put one message and get one.
fn test_put_get_one() { fn test_put_get_one() {
let mut mc = MessageCache::new_default(10, 15); let mut mc = new_cache(10, 15);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
@ -200,7 +210,7 @@ mod tests {
#[test] #[test]
/// Test attempting to 'get' with a wrong id. /// Test attempting to 'get' with a wrong id.
fn test_get_wrong() { fn test_get_wrong() {
let mut mc = MessageCache::new_default(10, 15); let mut mc = new_cache(10, 15);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
@ -210,7 +220,7 @@ mod tests {
mc.put(m.clone()); mc.put(m.clone());
// Try to get an incorrect ID // Try to get an incorrect ID
let wrong_id = MessageId(String::from("wrongid")); let wrong_id = MessageId::new(b"wrongid");
let fetched = mc.get(&wrong_id); let fetched = mc.get(&wrong_id);
assert_eq!(fetched.is_none(), true); assert_eq!(fetched.is_none(), true);
} }
@ -218,10 +228,10 @@ mod tests {
#[test] #[test]
/// Test attempting to 'get' empty message cache. /// Test attempting to 'get' empty message cache.
fn test_get_empty() { fn test_get_empty() {
let mc = MessageCache::new_default(10, 15); let mc = new_cache(10, 15);
// Try to get an incorrect ID // Try to get an incorrect ID
let wrong_string = MessageId(String::from("imempty")); let wrong_string = MessageId::new(b"imempty");
let fetched = mc.get(&wrong_string); let fetched = mc.get(&wrong_string);
assert_eq!(fetched.is_none(), true); assert_eq!(fetched.is_none(), true);
} }
@ -229,7 +239,7 @@ mod tests {
#[test] #[test]
/// Test adding a message with no topics. /// Test adding a message with no topics.
fn test_no_topic_put() { fn test_no_topic_put() {
let mut mc = MessageCache::new_default(3, 5); let mut mc = new_cache(3, 5);
// Build the message // Build the message
let m = gen_testm(1, vec![]); let m = gen_testm(1, vec![]);
@ -247,7 +257,7 @@ mod tests {
#[test] #[test]
/// Test shift mechanism. /// Test shift mechanism.
fn test_shift() { fn test_shift() {
let mut mc = MessageCache::new_default(1, 5); let mut mc = new_cache(1, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
@ -271,7 +281,7 @@ mod tests {
#[test] #[test]
/// Test Shift with no additions. /// Test Shift with no additions.
fn test_empty_shift() { fn test_empty_shift() {
let mut mc = MessageCache::new_default(1, 5); let mut mc = new_cache(1, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone();
@ -297,7 +307,7 @@ mod tests {
#[test] #[test]
/// Test shift to see if the last history messages are removed. /// Test shift to see if the last history messages are removed.
fn test_remove_last_from_shift() { fn test_remove_last_from_shift() {
let mut mc = MessageCache::new_default(4, 5); let mut mc = new_cache(4, 5);
let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic1_hash = Topic::new("topic1".into()).no_hash().clone();
let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone();

View File

@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::behaviour::GossipsubRpc; use crate::behaviour::GossipsubRpc;
use crate::config::ValidationMode;
use crate::rpc_proto; use crate::rpc_proto;
use crate::topic::TopicHash; use crate::topic::TopicHash;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
@ -27,25 +28,23 @@ use bytes::BytesMut;
use futures::future; use futures::future;
use futures::prelude::*; use futures::prelude::*;
use futures_codec::{Decoder, Encoder, Framed}; use futures_codec::{Decoder, Encoder, Framed};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; use libp2p_core::{identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
use log::{debug, warn};
use prost::Message as ProtobufMessage; use prost::Message as ProtobufMessage;
use std::{borrow::Cow, fmt, io, iter, pin::Pin}; use std::{borrow::Cow, fmt, io, iter, pin::Pin};
use unsigned_varint::codec; use unsigned_varint::codec;
/// Implementation of the `ConnectionUpgrade` for the Gossipsub protocol. pub const SIGNING_PREFIX: &'static [u8] = b"libp2p-pubsub:";
#[derive(Debug, Clone)]
pub struct ProtocolConfig {
protocol_id: Cow<'static, [u8]>,
max_transmit_size: usize,
}
impl Default for ProtocolConfig { /// Implementation of the `ConnectionUpgrade` for the Gossipsub protocol.
fn default() -> Self { #[derive(Clone)]
Self { pub struct ProtocolConfig {
protocol_id: Cow::Borrowed(b"/meshsub/1.0.0"), /// The gossipsub protocol id to listen on.
max_transmit_size: 2048, protocol_id: Cow<'static, [u8]>,
} /// The maximum transmit size for a packet.
} max_transmit_size: usize,
/// Determines the level of validation to be done on incoming messages.
validation_mode: ValidationMode,
} }
impl ProtocolConfig { impl ProtocolConfig {
@ -54,10 +53,12 @@ impl ProtocolConfig {
pub fn new( pub fn new(
protocol_id: impl Into<Cow<'static, [u8]>>, protocol_id: impl Into<Cow<'static, [u8]>>,
max_transmit_size: usize, max_transmit_size: usize,
validation_mode: ValidationMode,
) -> ProtocolConfig { ) -> ProtocolConfig {
ProtocolConfig { ProtocolConfig {
protocol_id: protocol_id.into(), protocol_id: protocol_id.into(),
max_transmit_size, max_transmit_size,
validation_mode,
} }
} }
} }
@ -84,7 +85,7 @@ where
length_codec.set_max_len(self.max_transmit_size); length_codec.set_max_len(self.max_transmit_size);
Box::pin(future::ok(Framed::new( Box::pin(future::ok(Framed::new(
socket, socket,
GossipsubCodec { length_codec }, GossipsubCodec::new(length_codec, self.validation_mode),
))) )))
} }
} }
@ -102,7 +103,7 @@ where
length_codec.set_max_len(self.max_transmit_size); length_codec.set_max_len(self.max_transmit_size);
Box::pin(future::ok(Framed::new( Box::pin(future::ok(Framed::new(
socket, socket,
GossipsubCodec { length_codec }, GossipsubCodec::new(length_codec, self.validation_mode),
))) )))
} }
} }
@ -112,6 +113,81 @@ where
pub struct GossipsubCodec { pub struct GossipsubCodec {
/// Codec to encode/decode the Unsigned varint length prefix of the frames. /// Codec to encode/decode the Unsigned varint length prefix of the frames.
length_codec: codec::UviBytes, length_codec: codec::UviBytes,
/// Determines the level of validation performed on incoming messages.
validation_mode: ValidationMode,
}
impl GossipsubCodec {
pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> Self {
GossipsubCodec {
length_codec,
validation_mode,
}
}
/// Verifies a gossipsub message. This returns either a success or failure. All errors
/// are logged, which prevents error handling in the codec and handler. We simply drop invalid
/// messages and log warnings, rather than propagating errors through the codec.
fn verify_signature(message: &rpc_proto::Message) -> bool {
let from = match message.from.as_ref() {
Some(v) => v,
None => {
debug!("Signature verification failed: No source id given");
return false;
}
};
let source = match PeerId::from_bytes(from.clone()) {
Ok(v) => v,
Err(_) => {
debug!("Signature verification failed: Invalid Peer Id");
return false;
}
};
let signature = match message.signature.as_ref() {
Some(v) => v,
None => {
debug!("Signature verification failed: No signature provided");
return false;
}
};
// If there is a key value in the protobuf, use that key otherwise the key must be
// obtained from the inlined source peer_id.
let public_key = match message
.key
.as_ref()
.map(|key| PublicKey::from_protobuf_encoding(&key))
{
Some(Ok(key)) => key,
_ => match PublicKey::from_protobuf_encoding(&source.as_bytes()[2..]) {
Ok(v) => v,
Err(_) => {
warn!("Signature verification failed: No valid public key supplied");
return false;
}
},
};
// The key must match the peer_id
if source != public_key.clone().into_peer_id() {
warn!("Signature verification failed: Public key doesn't match source peer id");
return false;
}
// Construct the signature bytes
let mut message_sig = message.clone();
message_sig.signature = None;
message_sig.key = None;
let mut buf = Vec::with_capacity(message_sig.encoded_len());
message_sig
.encode(&mut buf)
.expect("Buffer has sufficient capacity");
let mut signature_bytes = SIGNING_PREFIX.to_vec();
signature_bytes.extend_from_slice(&buf);
public_key.verify(&signature_bytes, signature)
}
} }
impl Encoder for GossipsubCodec { impl Encoder for GossipsubCodec {
@ -119,21 +195,20 @@ impl Encoder for GossipsubCodec {
type Error = io::Error; type Error = io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
// messages // Messages
let publish = item let mut publish = Vec::new();
.messages
.into_iter() for message in item.messages.into_iter() {
.map(|message| rpc_proto::Message { let message = rpc_proto::Message {
from: Some(message.source.into_bytes()), from: message.source.map(|m| m.into_bytes()),
data: Some(message.data), data: Some(message.data),
seqno: Some(message.sequence_number.to_be_bytes().to_vec()), seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
topic_ids: message topic_ids: message.topics.into_iter().map(TopicHash::into).collect(),
.topics signature: message.signature,
.into_iter() key: message.key,
.map(TopicHash::into_string) };
.collect(), publish.push(message);
}) }
.collect::<Vec<_>>();
// subscriptions // subscriptions
let subscriptions = item let subscriptions = item
@ -141,7 +216,7 @@ impl Encoder for GossipsubCodec {
.into_iter() .into_iter()
.map(|sub| rpc_proto::rpc::SubOpts { .map(|sub| rpc_proto::rpc::SubOpts {
subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe), subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe),
topic_id: Some(sub.topic_hash.into_string()), topic_id: Some(sub.topic_hash.into()),
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -163,7 +238,7 @@ impl Encoder for GossipsubCodec {
message_ids, message_ids,
} => { } => {
let rpc_ihave = rpc_proto::ControlIHave { let rpc_ihave = rpc_proto::ControlIHave {
topic_id: Some(topic_hash.into_string()), topic_id: Some(topic_hash.into()),
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
}; };
control.ihave.push(rpc_ihave); control.ihave.push(rpc_ihave);
@ -176,13 +251,13 @@ impl Encoder for GossipsubCodec {
} }
GossipsubControlAction::Graft { topic_hash } => { GossipsubControlAction::Graft { topic_hash } => {
let rpc_graft = rpc_proto::ControlGraft { let rpc_graft = rpc_proto::ControlGraft {
topic_id: Some(topic_hash.into_string()), topic_id: Some(topic_hash.into()),
}; };
control.graft.push(rpc_graft); control.graft.push(rpc_graft);
} }
GossipsubControlAction::Prune { topic_hash } => { GossipsubControlAction::Prune { topic_hash } => {
let rpc_prune = rpc_proto::ControlPrune { let rpc_prune = rpc_proto::ControlPrune {
topic_id: Some(topic_hash.into_string()), topic_id: Some(topic_hash.into()),
}; };
control.prune.push(rpc_prune); control.prune.push(rpc_prune);
} }
@ -222,30 +297,101 @@ impl Decoder for GossipsubCodec {
let rpc = rpc_proto::Rpc::decode(&packet[..])?; let rpc = rpc_proto::Rpc::decode(&packet[..])?;
let mut messages = Vec::with_capacity(rpc.publish.len()); let mut messages = Vec::with_capacity(rpc.publish.len());
for publish in rpc.publish.into_iter() { for message in rpc.publish.into_iter() {
// ensure the sequence number is a u64 let mut verify_signature = false;
let seq_no = publish.seqno.ok_or_else(|| { let mut verify_sequence_no = false;
io::Error::new( let mut verify_source = false;
io::ErrorKind::InvalidData,
"sequence number was not provided", match self.validation_mode {
) ValidationMode::Strict => {
})?; // Validate everything
if seq_no.len() != 8 { verify_signature = true;
return Err(io::Error::new( verify_sequence_no = true;
io::ErrorKind::InvalidData, verify_source = true;
"sequence number has an incorrect size", }
)); ValidationMode::Permissive => {
// If the fields exist, validate them
if message.signature.is_some() {
verify_signature = true;
}
if message.seqno.is_some() {
verify_sequence_no = true;
}
if message.from.is_some() {
verify_source = true;
}
}
ValidationMode::Anonymous => {
if message.signature.is_some() {
warn!("Message dropped. Signature field was non-empty and anonymous validation mode is set");
return Ok(None);
}
if message.seqno.is_some() {
warn!("Message dropped. Sequence number was non-empty and anonymous validation mode is set");
return Ok(None);
}
if message.from.is_some() {
warn!("Message dropped. Message source was non-empty and anonymous validation mode is set");
return Ok(None);
}
}
ValidationMode::None => {}
} }
// verify message signatures if required
if verify_signature {
// If a single message is unsigned, we will drop all of them
// Most implementations should not have a list of mixed signed/not-signed messages in a single RPC
// NOTE: Invalid messages are simply dropped with a warning log. We don't throw an
// error to avoid extra logic to deal with these errors in the handler.
if !GossipsubCodec::verify_signature(&message) {
warn!("Message dropped. Invalid signature");
// Drop the message
return Ok(None);
}
}
// ensure the sequence number is a u64
let sequence_number = if verify_sequence_no {
let seq_no = message.seqno.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"sequence number was not provided",
)
})?;
if seq_no.len() != 8 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"sequence number has an incorrect size",
));
}
Some(BigEndian::read_u64(&seq_no))
} else {
None
};
let source = if verify_source {
Some(
PeerId::from_bytes(message.from.unwrap_or_default()).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id")
})?,
)
} else {
None
};
messages.push(GossipsubMessage { messages.push(GossipsubMessage {
source: PeerId::from_bytes(publish.from.unwrap_or_default()) source,
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id"))?, data: message.data.unwrap_or_default(),
data: publish.data.unwrap_or_default(), sequence_number,
sequence_number: BigEndian::read_u64(&seq_no), topics: message
topics: publish
.topic_ids .topic_ids
.into_iter() .into_iter()
.map(TopicHash::from_raw) .map(TopicHash::from_raw)
.collect(), .collect(),
signature: message.signature,
key: message.key,
validated: false,
}); });
} }
@ -261,7 +407,7 @@ impl Decoder for GossipsubCodec {
message_ids: ihave message_ids: ihave
.message_ids .message_ids
.into_iter() .into_iter()
.map(|x| MessageId(x)) .map(MessageId::from)
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
}) })
.collect(); .collect();
@ -273,7 +419,7 @@ impl Decoder for GossipsubCodec {
message_ids: iwant message_ids: iwant
.message_ids .message_ids
.into_iter() .into_iter()
.map(|x| MessageId(x)) .map(MessageId::from)
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
}) })
.collect(); .collect();
@ -320,18 +466,30 @@ impl Decoder for GossipsubCodec {
} }
/// A type for gossipsub message ids. /// A type for gossipsub message ids.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct MessageId(pub String); pub struct MessageId(Vec<u8>);
impl std::fmt::Display for MessageId { impl MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { pub fn new(value: &[u8]) -> Self {
write!(f, "{}", self.0) Self(value.to_vec())
} }
} }
impl Into<String> for MessageId { impl<T: Into<Vec<u8>>> From<T> for MessageId {
fn into(self) -> String { fn from(value: T) -> Self {
self.0.into() Self(value.into())
}
}
impl std::fmt::Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex_fmt::HexFmt(&self.0))
}
}
impl std::fmt::Debug for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0))
} }
} }
@ -339,18 +497,27 @@ impl Into<String> for MessageId {
#[derive(Clone, PartialEq, Eq, Hash)] #[derive(Clone, PartialEq, Eq, Hash)]
pub struct GossipsubMessage { pub struct GossipsubMessage {
/// Id of the peer that published this message. /// Id of the peer that published this message.
pub source: PeerId, pub source: Option<PeerId>,
/// Content of the message. Its meaning is out of scope of this library. /// Content of the message. Its meaning is out of scope of this library.
pub data: Vec<u8>, pub data: Vec<u8>,
/// A random sequence number. /// A random sequence number.
pub sequence_number: u64, pub sequence_number: Option<u64>,
/// List of topics this message belongs to. /// List of topics this message belongs to.
/// ///
/// Each message can belong to multiple topics at once. /// Each message can belong to multiple topics at once.
pub topics: Vec<TopicHash>, pub topics: Vec<TopicHash>,
/// The signature of the message if it's signed.
pub signature: Option<Vec<u8>>,
/// The public key of the message if it is signed and the source `PeerId` cannot be inlined.
pub key: Option<Vec<u8>>,
/// Flag indicating if this message has been validated by the application or not.
pub validated: bool,
} }
impl fmt::Debug for GossipsubMessage { impl fmt::Debug for GossipsubMessage {
@ -408,3 +575,96 @@ pub enum GossipsubControlAction {
topic_hash: TopicHash, topic_hash: TopicHash,
}, },
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::topic::Topic;
use crate::{Gossipsub, GossipsubConfig};
use libp2p_core::identity::Keypair;
use quickcheck::*;
use rand::Rng;
#[derive(Clone, Debug)]
struct Message(GossipsubMessage);
impl Arbitrary for Message {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let keypair = TestKeypair::arbitrary(g);
// generate an arbitrary GossipsubMessage using the behaviour signing functionality
let config = GossipsubConfig::default();
let gs = Gossipsub::new(
crate::MessageAuthenticity::Signed(keypair.0.clone()),
config,
);
let data = (0..g.gen_range(1, 1024)).map(|_| g.gen()).collect();
let topics = Vec::arbitrary(g)
.into_iter()
.map(|id: TopicId| id.0)
.collect();
Message(gs.build_message(topics, data).unwrap())
}
}
#[derive(Clone, Debug)]
struct TopicId(TopicHash);
impl Arbitrary for TopicId {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
TopicId(
Topic::new((0..g.gen_range(0, 1024)).map(|_| g.gen::<char>()).collect())
.sha256_hash(),
)
}
}
#[derive(Clone)]
struct TestKeypair(Keypair);
impl Arbitrary for TestKeypair {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let keypair = if g.gen() {
// Small enough to be inlined.
Keypair::generate_secp256k1()
} else {
// Too large to be inlined.
let mut rsa_key = hex::decode("308204bd020100300d06092a864886f70d0101010500048204a7308204a30201000282010100ef930f41a71288b643c1cbecbf5f72ab53992249e2b00835bf07390b6745419f3848cbcc5b030faa127bc88cdcda1c1d6f3ff699f0524c15ab9d2c9d8015f5d4bd09881069aad4e9f91b8b0d2964d215cdbbae83ddd31a7622a8228acee07079f6e501aea95508fa26c6122816ef7b00ac526d422bd12aed347c37fff6c1c307f3ba57bb28a7f28609e0bdcc839da4eedca39f5d2fa855ba4b0f9c763e9764937db929a1839054642175312a3de2d3405c9d27bdf6505ef471ce85c5e015eee85bf7874b3d512f715de58d0794fd8afe021c197fbd385bb88a930342fac8da31c27166e2edab00fa55dc1c3814448ba38363077f4e8fe2bdea1c081f85f1aa6f02030100010282010028ff427a1aac1a470e7b4879601a6656193d3857ea79f33db74df61e14730e92bf9ffd78200efb0c40937c3356cbe049cd32e5f15be5c96d5febcaa9bd3484d7fded76a25062d282a3856a1b3b7d2c525cdd8434beae147628e21adf241dd64198d5819f310d033743915ba40ea0b6acdbd0533022ad6daa1ff42de51885f9e8bab2306c6ef1181902d1cd7709006eba1ab0587842b724e0519f295c24f6d848907f772ae9a0953fc931f4af16a07df450fb8bfa94572562437056613647818c238a6ff3f606cffa0533e4b8755da33418dfbc64a85110b1a036623c947400a536bb8df65e5ebe46f2dfd0cfc86e7aeeddd7574c253e8fbf755562b3669525d902818100f9fff30c6677b78dd31ec7a634361438457e80be7a7faf390903067ea8355faa78a1204a82b6e99cb7d9058d23c1ecf6cfe4a900137a00cecc0113fd68c5931602980267ea9a95d182d48ba0a6b4d5dd32fdac685cb2e5d8b42509b2eb59c9579ea6a67ccc7547427e2bd1fb1f23b0ccb4dd6ba7d206c8dd93253d70a451701302818100f5530dfef678d73ce6a401ae47043af10a2e3f224c71ae933035ecd68ccbc4df52d72bc6ca2b17e8faf3e548b483a2506c0369ab80df3b137b54d53fac98f95547c2bc245b416e650ce617e0d29db36066f1335a9ba02ad3e0edf9dc3d58fd835835042663edebce81803972696c789012847cb1f854ab2ac0a1bd3867ac7fb502818029c53010d456105f2bf52a9a8482bca2224a5eac74bf3cc1a4d5d291fafcdffd15a6a6448cce8efdd661f6617ca5fc37c8c885cc3374e109ac6049bcbf72b37eabf44602a2da2d4a1237fd145c863e6d75059976de762d9d258c42b0984e2a2befa01c95217c3ee9c736ff209c355466ff99375194eff943bc402ea1d172a1ed02818027175bf493bbbfb8719c12b47d967bf9eac061c90a5b5711172e9095c38bb8cc493c063abffe4bea110b0a2f22ac9311b3947ba31b7ef6bfecf8209eebd6d86c316a2366bbafda7279b2b47d5bb24b6202254f249205dcad347b574433f6593733b806f84316276c1990a016ce1bbdbe5f650325acc7791aefe515ecc60063bd02818100b6a2077f4adcf15a17092d9c4a346d6022ac48f3861b73cf714f84c440a07419a7ce75a73b9cbff4597c53c128bf81e87b272d70428a272d99f90cd9b9ea1033298e108f919c6477400145a102df3fb5601ffc4588203cf710002517bfa24e6ad32f4d09c6b1a995fa28a3104131bedd9072f3b4fb4a5c2056232643d310453f").unwrap();
Keypair::rsa_from_pkcs8(&mut rsa_key).unwrap()
};
TestKeypair(keypair)
}
}
impl std::fmt::Debug for TestKeypair {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestKeypair")
.field("public", &self.0.public())
.finish()
}
}
#[test]
fn encode_decode() {
fn prop(message: Message) {
let message = message.0;
let rpc = GossipsubRpc {
messages: vec![message],
subscriptions: vec![],
control_msgs: vec![],
};
let mut codec = GossipsubCodec::new(codec::UviBytes::default(), ValidationMode::Strict);
let mut buf = BytesMut::new();
codec.encode(rpc.clone(), &mut buf).unwrap();
let mut decoded_rpc = codec.decode(&mut buf).unwrap().unwrap();
// mark as validated as its a published message
decoded_rpc.messages[0].validated = true;
assert_eq!(rpc, decoded_rpc);
}
QuickCheck::new().quickcheck(prop as fn(_) -> _)
}
}

View File

@ -19,6 +19,8 @@ message Message {
optional bytes data = 2; optional bytes data = 2;
optional bytes seqno = 3; optional bytes seqno = 3;
repeated string topic_ids = 4; repeated string topic_ids = 4;
optional bytes signature = 5;
optional bytes key = 6;
} }
message ControlMessage { message ControlMessage {
@ -30,11 +32,11 @@ message ControlMessage {
message ControlIHave { message ControlIHave {
optional string topic_id = 1; optional string topic_id = 1;
repeated string message_ids = 2; repeated bytes message_ids = 2;
} }
message ControlIWant { message ControlIWant {
repeated string message_ids= 1; repeated bytes message_ids= 1;
} }
message ControlGraft { message ControlGraft {

View File

@ -24,7 +24,7 @@ use prost::Message;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::fmt; use std::fmt;
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct TopicHash { pub struct TopicHash {
/// The topic hash. Stored as a string to align with the protobuf API. /// The topic hash. Stored as a string to align with the protobuf API.
hash: String, hash: String,
@ -35,17 +35,13 @@ impl TopicHash {
TopicHash { hash: hash.into() } TopicHash { hash: hash.into() }
} }
pub fn into_string(self) -> String {
self.hash
}
pub fn as_str(&self) -> &str { pub fn as_str(&self) -> &str {
&self.hash &self.hash
} }
} }
/// A gossipsub topic. /// A gossipsub topic.
#[derive(Debug, Clone)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Topic { pub struct Topic {
topic: String, topic: String,
} }
@ -80,6 +76,12 @@ impl Topic {
} }
} }
impl Into<String> for TopicHash {
fn into(self) -> String {
self.hash
}
}
impl fmt::Display for Topic { impl fmt::Display for Topic {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.topic) write!(f, "{}", self.topic)

View File

@ -30,15 +30,12 @@ use std::{
}; };
use libp2p_core::{ use libp2p_core::{
Multiaddr, identity, multiaddr::Protocol, muxing::StreamMuxerBox, transport::MemoryTransport, upgrade,
Transport, Multiaddr, Transport,
identity, };
multiaddr::Protocol, use libp2p_gossipsub::{
muxing::StreamMuxerBox, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity, Topic, ValidationMode,
transport::MemoryTransport,
upgrade,
}; };
use libp2p_gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic};
use libp2p_plaintext::PlainText2Config; use libp2p_plaintext::PlainText2Config;
use libp2p_swarm::Swarm; use libp2p_swarm::Swarm;
use libp2p_yamux as yamux; use libp2p_yamux as yamux;
@ -133,6 +130,25 @@ impl Graph {
futures::executor::block_on(fut).unwrap() futures::executor::block_on(fut).unwrap()
} }
/// Polls the graph until Poll::Pending is obtained, completing the underlying polls.
fn drain_poll(self) -> Self {
// The future below should return self. Given that it is a FnMut and not a FnOnce, one needs
// to wrap `self` in an Option, leaving a `None` behind after the final `Poll::Ready`.
let mut this = Some(self);
let fut = futures::future::poll_fn(move |cx| match &mut this {
Some(graph) => loop {
match graph.poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Ready(this.take().unwrap()),
}
},
None => panic!("future called after final return"),
});
let fut = async_std::future::timeout(Duration::from_secs(10), fut);
futures::executor::block_on(fut).unwrap()
}
} }
fn build_node() -> (Multiaddr, Swarm<Gossipsub>) { fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
@ -150,7 +166,20 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
.boxed(); .boxed();
let peer_id = public_key.clone().into_peer_id(); let peer_id = public_key.clone().into_peer_id();
let behaviour = Gossipsub::new(peer_id.clone(), GossipsubConfig::default());
// NOTE: The graph of created nodes can be disconnected from the mesh point of view as nodes
// can reach their d_lo value and not add other nodes to their mesh. To speed up this test, we
// reduce the default values of the heartbeat, so that all nodes will receive gossip in a
// timely fashion.
let config = GossipsubConfigBuilder::new()
.heartbeat_initial_delay(Duration::from_millis(100))
.heartbeat_interval(Duration::from_millis(200))
.history_length(10)
.history_gossip(10)
.validation_mode(ValidationMode::Permissive)
.build();
let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id.clone()), config);
let mut swarm = Swarm::new(transport, behaviour, peer_id); let mut swarm = Swarm::new(transport, behaviour, peer_id);
let port = 1 + random::<u64>(); let port = 1 + random::<u64>();
@ -168,14 +197,14 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
fn multi_hop_propagation() { fn multi_hop_propagation() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
fn prop(num_nodes: usize, seed: u64) -> TestResult { fn prop(num_nodes: u8, seed: u64) -> TestResult {
if num_nodes < 2 || num_nodes > 100 { if num_nodes < 2 || num_nodes > 100 {
return TestResult::discard(); return TestResult::discard();
} }
debug!("number nodes: {:?}, seed: {:?}", num_nodes, seed); debug!("number nodes: {:?}, seed: {:?}", num_nodes, seed);
let mut graph = Graph::new_connected(num_nodes, seed); let mut graph = Graph::new_connected(num_nodes as usize, seed);
let number_nodes = graph.nodes.len(); let number_nodes = graph.nodes.len();
// Subscribe each node to the same topic. // Subscribe each node to the same topic.
@ -197,8 +226,12 @@ fn multi_hop_propagation() {
false false
}); });
// It can happen that the publish occurs before all grafts have completed causing this test
// to fail. We drain all the poll messages before publishing.
graph = graph.drain_poll();
// Publish a single message. // Publish a single message.
graph.nodes[0].1.publish(&topic, vec![1, 2, 3]); graph.nodes[0].1.publish(&topic, vec![1, 2, 3]).unwrap();
// Wait for all nodes to receive the published message. // Wait for all nodes to receive the published message.
let mut received_msgs = 0; let mut received_msgs = 0;
@ -218,5 +251,5 @@ fn multi_hop_propagation() {
QuickCheck::new() QuickCheck::new()
.max_tests(10) .max_tests(10)
.quickcheck(prop as fn(usize, u64) -> TestResult) .quickcheck(prop as fn(u8, u64) -> TestResult)
} }