refactor(gossipsub): revise symbol naming to follow conventions (#3303)

Changes regarding the  #2217
This commit is contained in:
StemCll
2023-01-27 05:44:04 +01:00
committed by GitHub
parent e2b3c1190a
commit ab59af4d46
18 changed files with 639 additions and 633 deletions

View File

@ -47,11 +47,6 @@
use async_std::io;
use futures::{prelude::*, select};
use libp2p::gossipsub::MessageId;
use libp2p::gossipsub::{
Gossipsub, GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity,
ValidationMode,
};
use libp2p::{
gossipsub, identity, mdns, swarm::NetworkBehaviour, swarm::SwarmEvent, PeerId, Swarm,
};
@ -73,31 +68,34 @@ async fn main() -> Result<(), Box<dyn Error>> {
// We create a custom network behaviour that combines Gossipsub and Mdns.
#[derive(NetworkBehaviour)]
struct MyBehaviour {
gossipsub: Gossipsub,
gossipsub: gossipsub::Behaviour,
mdns: mdns::async_io::Behaviour,
}
// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &GossipsubMessage| {
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
MessageId::from(s.finish().to_string())
gossipsub::MessageId::from(s.finish().to_string())
};
// Set a custom gossipsub configuration
let gossipsub_config = gossipsub::GossipsubConfigBuilder::default()
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.build()
.expect("Valid config");
// build a gossipsub network behaviour
let mut gossipsub = Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config)
let mut gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(local_key),
gossipsub_config,
)
.expect("Correct configuration");
// Create a Gossipsub topic
let topic = Topic::new("test-net");
let topic = gossipsub::IdentTopic::new("test-net");
// subscribes to our topic
gossipsub.subscribe(&topic)?;
@ -140,7 +138,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(GossipsubEvent::Message {
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,

View File

@ -36,8 +36,7 @@ use either::Either;
use futures::{prelude::*, select};
use libp2p::{
core::{muxing::StreamMuxerBox, transport, transport::upgrade::Version},
gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity},
identify, identity,
gossipsub, identify, identity,
multiaddr::Protocol,
noise, ping,
pnet::{PnetConfig, PreSharedKey},
@ -150,19 +149,19 @@ async fn main() -> Result<(), Box<dyn Error>> {
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "MyBehaviourEvent")]
struct MyBehaviour {
gossipsub: Gossipsub,
gossipsub: gossipsub::Behaviour,
identify: identify::Behaviour,
ping: ping::Behaviour,
}
enum MyBehaviourEvent {
Gossipsub(GossipsubEvent),
Gossipsub(gossipsub::Event),
Identify(identify::Event),
Ping(ping::Event),
}
impl From<GossipsubEvent> for MyBehaviourEvent {
fn from(event: GossipsubEvent) -> Self {
impl From<gossipsub::Event> for MyBehaviourEvent {
fn from(event: gossipsub::Event) -> Self {
MyBehaviourEvent::Gossipsub(event)
}
}
@ -181,13 +180,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Swarm to manage peers and events
let mut swarm = {
let gossipsub_config = GossipsubConfigBuilder::default()
let gossipsub_config = gossipsub::ConfigBuilder::default()
.max_transmit_size(262144)
.build()
.expect("valid config");
let mut behaviour = MyBehaviour {
gossipsub: Gossipsub::new(
MessageAuthenticity::Signed(local_key.clone()),
gossipsub: gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(local_key.clone()),
gossipsub_config,
)
.expect("Valid configuration"),
@ -236,7 +235,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(event)) => {
println!("identify: {event:?}");
}
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(GossipsubEvent::Message {
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,

View File

@ -36,9 +36,9 @@ impl Metrics {
}
}
impl super::Recorder<libp2p_gossipsub::GossipsubEvent> for Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
if let libp2p_gossipsub::GossipsubEvent::Message { .. } = event {
impl super::Recorder<libp2p_gossipsub::Event> for Metrics {
fn record(&self, event: &libp2p_gossipsub::Event) {
if let libp2p_gossipsub::Event::Message { .. } = event {
self.messages.inc();
}
}

View File

@ -108,8 +108,8 @@ impl Recorder<libp2p_dcutr::Event> for Metrics {
#[cfg(feature = "gossipsub")]
#[cfg(not(target_os = "unknown"))]
impl Recorder<libp2p_gossipsub::GossipsubEvent> for Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
impl Recorder<libp2p_gossipsub::Event> for Metrics {
fn record(&self, event: &libp2p_gossipsub::Event) {
self.gossipsub.record(event)
}
}

View File

@ -8,8 +8,16 @@
- Initialize `ProtocolConfig` via `GossipsubConfig`. See [PR 3381].
- Rename types as per [discussion 2174].
`Gossipsub` has been renamed to `Behaviour`.
The `Gossipsub` prefix has been removed from various types like `GossipsubConfig` or `GossipsubMessage`.
It is preferred to import the gossipsub protocol as a module (`use libp2p::gossipsub;`), and refer to its types via `gossipsub::`.
For example: `gossipsub::Behaviour` or `gossipsub::RawMessage`. See [PR 3303].
[PR 3207]: https://github.com/libp2p/rust-libp2p/pull/3207/
[PR 3303]: https://github.com/libp2p/rust-libp2p/pull/3303/
[PR 3381]: https://github.com/libp2p/rust-libp2p/pull/3381/
[discussion 2174]: https://github.com/libp2p/rust-libp2p/discussions/2174
# 0.43.0

View File

@ -32,7 +32,7 @@ use std::{
use futures::StreamExt;
use log::{debug, error, trace, warn};
use prometheus_client::registry::Registry;
use prost::Message;
use prost::Message as _;
use rand::{seq::SliceRandom, thread_rng};
use libp2p_core::{
@ -47,10 +47,10 @@ use libp2p_swarm::{
use wasm_timer::Instant;
use crate::backoff::BackoffStorage;
use crate::config::{GossipsubConfig, ValidationMode};
use crate::config::{Config, ValidationMode};
use crate::error::{PublishError, SubscriptionError, ValidationError};
use crate::gossip_promises::GossipPromises;
use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent};
use crate::handler::{Handler, HandlerEvent, HandlerIn};
use crate::mcache::MessageCache;
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
@ -60,10 +60,10 @@ use crate::time_cache::{DuplicateCache, TimeCache};
use crate::topic::{Hasher, Topic, TopicHash};
use crate::transform::{DataTransform, IdentityTransform};
use crate::types::{
FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription,
GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
ControlAction, FastMessageId, Message, MessageAcceptance, MessageId, PeerInfo, RawMessage,
Subscription, SubscriptionAction,
};
use crate::types::{GossipsubRpc, PeerConnections, PeerKind};
use crate::types::{PeerConnections, PeerKind, Rpc};
use crate::{rpc_proto, TopicScoreParams};
use std::{cmp::Ordering::Equal, fmt::Debug};
use wasm_timer::Interval;
@ -76,7 +76,7 @@ mod tests;
/// Without signing, a number of privacy preserving modes can be selected.
///
/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`]
/// should be updated in the [`GossipsubConfig`] to allow for unsigned messages.
/// should be updated in the [`Config`] to allow for unsigned messages.
#[derive(Clone)]
pub enum MessageAuthenticity {
/// Message signing is enabled. The author will be the owner of the key and the sequence number
@ -97,7 +97,7 @@ pub enum MessageAuthenticity {
/// The author of the message and the sequence numbers are excluded from the message.
///
/// NOTE: Excluding these fields may make these messages invalid by other nodes who
/// enforce validation of these fields. See [`ValidationMode`] in the [`GossipsubConfig`]
/// enforce validation of these fields. See [`ValidationMode`] in the [`Config`]
/// for how to customise this for rust-libp2p gossipsub. A custom `message_id`
/// function will need to be set to prevent all messages from a peer being filtered
/// as duplicates.
@ -117,7 +117,7 @@ impl MessageAuthenticity {
/// Event that can be emitted by the gossipsub behaviour.
#[derive(Debug)]
pub enum GossipsubEvent {
pub enum Event {
/// A message has been received.
Message {
/// The peer that forwarded us this message.
@ -126,7 +126,7 @@ pub enum GossipsubEvent {
/// validating a message (if required).
message_id: MessageId,
/// The decompressed message itself.
message: GossipsubMessage,
message: Message,
},
/// A remote subscribed to a topic.
Subscribed {
@ -201,7 +201,7 @@ impl From<MessageAuthenticity> for PublishConfig {
/// Network behaviour that handles the gossipsub protocol.
///
/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If
/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`Config`] instance. If
/// message signing is disabled, the [`ValidationMode`] in the config should be adjusted to an
/// appropriate level to accept unsigned messages.
///
@ -210,15 +210,15 @@ impl From<MessageAuthenticity> for PublishConfig {
///
/// The TopicSubscriptionFilter allows applications to implement specific filters on topics to
/// prevent unwanted messages being propagated and evaluated.
pub struct Gossipsub<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Configuration providing gossipsub performance parameters.
config: GossipsubConfig,
config: Config,
/// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<GossipsubEvent, GossipsubHandler>>,
events: VecDeque<NetworkBehaviourAction<Event, Handler>>,
/// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
control_pool: HashMap<PeerId, Vec<ControlAction>>,
/// Information used for publishing messages.
publish_config: PublishConfig,
@ -310,17 +310,14 @@ pub struct Gossipsub<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
metrics: Option<Metrics>,
}
impl<D, F> Gossipsub<D, F>
impl<D, F> Behaviour<D, F>
where
D: DataTransform + Default,
F: TopicSubscriptionFilter + Default,
{
/// Creates a [`Gossipsub`] struct given a set of parameters specified via a
/// [`GossipsubConfig`]. This has no subscription filter and uses no compression.
pub fn new(
privacy: MessageAuthenticity,
config: GossipsubConfig,
) -> Result<Self, &'static str> {
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`]. This has no subscription filter and uses no compression.
pub fn new(privacy: MessageAuthenticity, config: Config) -> Result<Self, &'static str> {
Self::new_with_subscription_filter_and_transform(
privacy,
config,
@ -330,12 +327,12 @@ where
)
}
/// Creates a [`Gossipsub`] struct given a set of parameters specified via a
/// [`GossipsubConfig`]. This has no subscription filter and uses no compression.
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`]. This has no subscription filter and uses no compression.
/// Metrics can be evaluated by passing a reference to a [`Registry`].
pub fn new_with_metrics(
privacy: MessageAuthenticity,
config: GossipsubConfig,
config: Config,
metrics_registry: &mut Registry,
metrics_config: MetricsConfig,
) -> Result<Self, &'static str> {
@ -349,16 +346,16 @@ where
}
}
impl<D, F> Gossipsub<D, F>
impl<D, F> Behaviour<D, F>
where
D: DataTransform + Default,
F: TopicSubscriptionFilter,
{
/// Creates a [`Gossipsub`] struct given a set of parameters specified via a
/// [`GossipsubConfig`] and a custom subscription filter.
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`] and a custom subscription filter.
pub fn new_with_subscription_filter(
privacy: MessageAuthenticity,
config: GossipsubConfig,
config: Config,
metrics: Option<(&mut Registry, MetricsConfig)>,
subscription_filter: F,
) -> Result<Self, &'static str> {
@ -372,16 +369,16 @@ where
}
}
impl<D, F> Gossipsub<D, F>
impl<D, F> Behaviour<D, F>
where
D: DataTransform,
F: TopicSubscriptionFilter + Default,
{
/// Creates a [`Gossipsub`] struct given a set of parameters specified via a
/// [`GossipsubConfig`] and a custom data transform.
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`] and a custom data transform.
pub fn new_with_transform(
privacy: MessageAuthenticity,
config: GossipsubConfig,
config: Config,
metrics: Option<(&mut Registry, MetricsConfig)>,
data_transform: D,
) -> Result<Self, &'static str> {
@ -395,16 +392,16 @@ where
}
}
impl<D, F> Gossipsub<D, F>
impl<D, F> Behaviour<D, F>
where
D: DataTransform,
F: TopicSubscriptionFilter,
{
/// Creates a [`Gossipsub`] struct given a set of parameters specified via a
/// [`GossipsubConfig`] and a custom subscription filter and data transform.
/// Creates a Gossipsub [`Behaviour`] struct given a set of parameters specified via a
/// [`Config`] and a custom subscription filter and data transform.
pub fn new_with_subscription_filter_and_transform(
privacy: MessageAuthenticity,
config: GossipsubConfig,
config: Config,
metrics: Option<(&mut Registry, MetricsConfig)>,
subscription_filter: F,
data_transform: D,
@ -415,7 +412,7 @@ where
// were received locally.
validate_config(&privacy, config.validation_mode())?;
Ok(Gossipsub {
Ok(Behaviour {
metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
events: VecDeque::new(),
control_pool: HashMap::new(),
@ -455,7 +452,7 @@ where
}
}
impl<D, F> Gossipsub<D, F>
impl<D, F> Behaviour<D, F>
where
D: DataTransform + Send + 'static,
F: TopicSubscriptionFilter + Send + 'static,
@ -516,11 +513,11 @@ where
// send subscription request to all peers
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = GossipsubRpc {
let event = Rpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
subscriptions: vec![Subscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
action: SubscriptionAction::Subscribe,
}],
control_msgs: Vec::new(),
}
@ -556,11 +553,11 @@ where
// announce to all peers
let peer_list = self.peer_topics.keys().cloned().collect::<Vec<_>>();
if !peer_list.is_empty() {
let event = GossipsubRpc {
let event = Rpc {
messages: Vec::new(),
subscriptions: vec![GossipsubSubscription {
subscriptions: vec![Subscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Unsubscribe,
action: SubscriptionAction::Unsubscribe,
}],
control_msgs: Vec::new(),
}
@ -597,14 +594,14 @@ where
let raw_message = self.build_raw_message(topic, transformed_data)?;
// calculate the message id from the un-transformed data
let msg_id = self.config.message_id(&GossipsubMessage {
let msg_id = self.config.message_id(&Message {
source: raw_message.source,
data, // the uncompressed form
sequence_number: raw_message.sequence_number,
topic: raw_message.topic.clone(),
});
let event = GossipsubRpc {
let event = Rpc {
subscriptions: Vec::new(),
messages: vec![raw_message.clone()],
control_msgs: Vec::new(),
@ -742,7 +739,7 @@ where
Ok(msg_id)
}
/// This function should be called when [`GossipsubConfig::validate_messages()`] is `true` after
/// This function should be called when [`Config::validate_messages()`] is `true` after
/// the message got validated by the caller. Messages are stored in the ['Memcache'] and
/// validation is expected to be fast enough that the messages should still exist in the cache.
/// There are three possible validation outcomes and the outcome is given in acceptance.
@ -1011,7 +1008,7 @@ where
Self::control_pool_add(
&mut self.control_pool,
peer_id,
GossipsubControlAction::Graft {
ControlAction::Graft {
topic_hash: topic_hash.clone(),
},
);
@ -1042,7 +1039,7 @@ where
peer: &PeerId,
do_px: bool,
on_unsubscribe: bool,
) -> GossipsubControlAction {
) -> ControlAction {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer, topic_hash.clone());
}
@ -1053,7 +1050,7 @@ where
}
Some(PeerKind::Gossipsub) => {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return GossipsubControlAction::Prune {
return ControlAction::Prune {
topic_hash: topic_hash.clone(),
peers: Vec::new(),
backoff: None,
@ -1090,7 +1087,7 @@ where
// update backoff
self.backoffs.update_backoff(topic_hash, peer, backoff);
GossipsubControlAction::Prune {
ControlAction::Prune {
topic_hash: topic_hash.clone(),
peers,
backoff: Some(backoff.as_secs()),
@ -1285,7 +1282,7 @@ where
Self::control_pool_add(
&mut self.control_pool,
*peer_id,
GossipsubControlAction::IWant {
ControlAction::IWant {
message_ids: iwant_ids_vec,
},
);
@ -1335,7 +1332,7 @@ where
.map(|message| message.topic.clone())
.collect::<HashSet<TopicHash>>();
let message = GossipsubRpc {
let message = Rpc {
subscriptions: Vec::new(),
messages: message_list,
control_msgs: Vec::new(),
@ -1510,7 +1507,7 @@ where
if let Err(e) = self.send_message(
*peer_id,
GossipsubRpc {
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: prune_messages,
@ -1648,7 +1645,7 @@ where
fn message_is_valid(
&mut self,
msg_id: &MessageId,
raw_message: &mut RawGossipsubMessage,
raw_message: &mut RawMessage,
propagation_source: &PeerId,
) -> bool {
debug!(
@ -1719,12 +1716,12 @@ where
true
}
/// Handles a newly received [`RawGossipsubMessage`].
/// Handles a newly received [`RawMessage`].
///
/// Forwards the message to all peers in the mesh.
fn handle_received_message(
&mut self,
mut raw_message: RawGossipsubMessage,
mut raw_message: RawMessage,
propagation_source: &PeerId,
) {
// Record the received metric
@ -1821,13 +1818,12 @@ where
// Dispatch the message to the user if we are subscribed to any of the topics
if self.mesh.contains_key(&message.topic) {
debug!("Sending received message to user");
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Message {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Message {
propagation_source: *propagation_source,
message_id: msg_id.clone(),
message,
},
));
}));
} else {
debug!(
"Received message on a topic we are not subscribed to: {:?}",
@ -1857,7 +1853,7 @@ where
fn handle_invalid_message(
&mut self,
propagation_source: &PeerId,
raw_message: &RawGossipsubMessage,
raw_message: &RawMessage,
reject_reason: RejectReason,
) {
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
@ -1891,7 +1887,7 @@ where
/// Handles received subscriptions.
fn handle_received_subscriptions(
&mut self,
subscriptions: &[GossipsubSubscription],
subscriptions: &[Subscription],
propagation_source: &PeerId,
) {
debug!(
@ -1943,7 +1939,7 @@ where
.or_insert_with(Default::default);
match subscription.action {
GossipsubSubscriptionAction::Subscribe => {
SubscriptionAction::Subscribe => {
if peer_list.insert(*propagation_source) {
debug!(
"SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}",
@ -2000,13 +1996,13 @@ where
}
// generates a subscription event to be polled
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Subscribed {
Event::Subscribed {
peer_id: *propagation_source,
topic: topic_hash.clone(),
},
));
}
GossipsubSubscriptionAction::Unsubscribe => {
SubscriptionAction::Unsubscribe => {
if peer_list.remove(propagation_source) {
debug!(
"SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}",
@ -2020,7 +2016,7 @@ where
unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
// generate an unsubscribe event to be polled
application_event.push(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::Unsubscribed {
Event::Unsubscribed {
peer_id: *propagation_source,
topic: topic_hash.clone(),
},
@ -2057,12 +2053,12 @@ where
&& self
.send_message(
*propagation_source,
GossipsubRpc {
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: topics_to_graft
.into_iter()
.map(|topic_hash| GossipsubControlAction::Graft { topic_hash })
.map(|topic_hash| ControlAction::Graft { topic_hash })
.collect(),
}
.into_protobuf(),
@ -2557,7 +2553,7 @@ where
Self::control_pool_add(
&mut self.control_pool,
peer,
GossipsubControlAction::IHave {
ControlAction::IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
},
@ -2593,9 +2589,9 @@ where
&self.connected_peers,
);
}
let mut control_msgs: Vec<GossipsubControlAction> = topics
let mut control_msgs: Vec<ControlAction> = topics
.iter()
.map(|topic_hash| GossipsubControlAction::Graft {
.map(|topic_hash| ControlAction::Graft {
topic_hash: topic_hash.clone(),
})
.collect();
@ -2626,7 +2622,7 @@ where
if self
.send_message(
peer,
GossipsubRpc {
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs,
@ -2666,7 +2662,7 @@ where
if self
.send_message(
*peer,
GossipsubRpc {
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: remaining_prunes,
@ -2686,7 +2682,7 @@ where
fn forward_msg(
&mut self,
msg_id: &MessageId,
message: RawGossipsubMessage,
message: RawMessage,
propagation_source: Option<&PeerId>,
originating_peers: HashSet<PeerId>,
) -> Result<bool, PublishError> {
@ -2733,7 +2729,7 @@ where
// forward the message to peers
if !recipient_peers.is_empty() {
let event = GossipsubRpc {
let event = Rpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
control_msgs: Vec::new(),
@ -2755,12 +2751,12 @@ where
}
}
/// Constructs a [`RawGossipsubMessage`] performing message signing if required.
/// Constructs a [`RawMessage`] performing message signing if required.
pub(crate) fn build_raw_message(
&self,
topic: TopicHash,
data: Vec<u8>,
) -> Result<RawGossipsubMessage, PublishError> {
) -> Result<RawMessage, PublishError> {
match &self.publish_config {
PublishConfig::Signing {
ref keypair,
@ -2791,7 +2787,7 @@ where
Some(keypair.sign(&signature_bytes)?)
};
Ok(RawGossipsubMessage {
Ok(RawMessage {
source: Some(*author),
data,
// To be interoperable with the go-implementation this is treated as a 64-bit
@ -2804,7 +2800,7 @@ where
})
}
PublishConfig::Author(peer_id) => {
Ok(RawGossipsubMessage {
Ok(RawMessage {
source: Some(*peer_id),
data,
// To be interoperable with the go-implementation this is treated as a 64-bit
@ -2817,7 +2813,7 @@ where
})
}
PublishConfig::RandomAuthor => {
Ok(RawGossipsubMessage {
Ok(RawMessage {
source: Some(PeerId::random()),
data,
// To be interoperable with the go-implementation this is treated as a 64-bit
@ -2830,7 +2826,7 @@ where
})
}
PublishConfig::Anonymous => {
Ok(RawGossipsubMessage {
Ok(RawMessage {
source: None,
data,
// To be interoperable with the go-implementation this is treated as a 64-bit
@ -2847,9 +2843,9 @@ where
// adds a control action to control_pool
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
control_pool: &mut HashMap<PeerId, Vec<ControlAction>>,
peer: PeerId,
control: GossipsubControlAction,
control: ControlAction,
) {
control_pool
.entry(peer)
@ -2863,7 +2859,7 @@ where
if self
.send_message(
peer,
GossipsubRpc {
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: controls,
@ -2880,7 +2876,7 @@ where
self.pending_iwant_msgs.clear();
}
/// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it
/// Send a [`Rpc`] message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(
&mut self,
@ -2897,7 +2893,7 @@ where
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: GossipsubHandlerIn::Message(message),
event: HandlerIn::Message(message),
handler: NotifyHandler::Any,
})
}
@ -3080,9 +3076,9 @@ where
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
for topic_hash in self.mesh.keys() {
subscriptions.push(GossipsubSubscription {
subscriptions.push(Subscription {
topic_hash: topic_hash.clone(),
action: GossipsubSubscriptionAction::Subscribe,
action: SubscriptionAction::Subscribe,
});
}
@ -3091,7 +3087,7 @@ where
if self
.send_message(
peer_id,
GossipsubRpc {
Rpc {
messages: Vec::new(),
subscriptions,
control_msgs: Vec::new(),
@ -3157,7 +3153,7 @@ where
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: GossipsubHandlerIn::JoinedMesh,
event: HandlerIn::JoinedMesh,
handler: NotifyHandler::One(connections.connections[0]),
});
break;
@ -3289,16 +3285,16 @@ fn get_ip_addr(addr: &Multiaddr) -> Option<IpAddr> {
})
}
impl<C, F> NetworkBehaviour for Gossipsub<C, F>
impl<C, F> NetworkBehaviour for Behaviour<C, F>
where
C: Send + 'static + DataTransform,
F: Send + 'static + TopicSubscriptionFilter,
{
type ConnectionHandler = GossipsubHandler;
type OutEvent = GossipsubEvent;
type ConnectionHandler = Handler;
type OutEvent = Event;
fn new_handler(&mut self) -> Self::ConnectionHandler {
GossipsubHandler::new(
Handler::new(
ProtocolConfig::new(&self.config),
self.config.idle_timeout(),
)
@ -3324,7 +3320,7 @@ where
propagation_source
);
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GossipsubEvent::GossipsubNotSupported {
Event::GossipsubNotSupported {
peer_id: propagation_source,
},
));
@ -3401,17 +3397,17 @@ where
let mut prune_msgs = vec![];
for control_msg in rpc.control_msgs {
match control_msg {
GossipsubControlAction::IHave {
ControlAction::IHave {
topic_hash,
message_ids,
} => {
ihave_msgs.push((topic_hash, message_ids));
}
GossipsubControlAction::IWant { message_ids } => {
ControlAction::IWant { message_ids } => {
self.handle_iwant(&propagation_source, message_ids)
}
GossipsubControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
GossipsubControlAction::Prune {
ControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
ControlAction::Prune {
topic_hash,
peers,
backoff,
@ -3484,7 +3480,7 @@ fn peer_added_to_mesh(
new_topics: Vec<&TopicHash>,
mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
known_topics: Option<&BTreeSet<TopicHash>>,
events: &mut VecDeque<NetworkBehaviourAction<GossipsubEvent, GossipsubHandler>>,
events: &mut VecDeque<NetworkBehaviourAction<Event, Handler>>,
connections: &HashMap<PeerId, PeerConnections>,
) {
// Ensure there is an active connection
@ -3512,7 +3508,7 @@ fn peer_added_to_mesh(
// This is the first mesh the peer has joined, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: GossipsubHandlerIn::JoinedMesh,
event: HandlerIn::JoinedMesh,
handler: NotifyHandler::One(connection_id),
});
}
@ -3525,7 +3521,7 @@ fn peer_removed_from_mesh(
old_topic: &TopicHash,
mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
known_topics: Option<&BTreeSet<TopicHash>>,
events: &mut VecDeque<NetworkBehaviourAction<GossipsubEvent, GossipsubHandler>>,
events: &mut VecDeque<NetworkBehaviourAction<Event, Handler>>,
connections: &HashMap<PeerId, PeerConnections>,
) {
// Ensure there is an active connection
@ -3551,7 +3547,7 @@ fn peer_removed_from_mesh(
// The peer is not in any other mesh, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: GossipsubHandlerIn::LeftMesh,
event: HandlerIn::LeftMesh,
handler: NotifyHandler::One(*connection_id),
});
}
@ -3641,9 +3637,9 @@ fn validate_config(
Ok(())
}
impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Gossipsub<C, F> {
impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Gossipsub")
f.debug_struct("Behaviour")
.field("config", &self.config)
.field("events", &self.events.len())
.field("control_pool", &self.control_pool)
@ -3681,16 +3677,16 @@ mod local_test {
use asynchronous_codec::Encoder;
use quickcheck::*;
fn empty_rpc() -> GossipsubRpc {
GossipsubRpc {
fn empty_rpc() -> Rpc {
Rpc {
subscriptions: Vec::new(),
messages: Vec::new(),
control_msgs: Vec::new(),
}
}
fn test_message() -> RawGossipsubMessage {
RawGossipsubMessage {
fn test_message() -> RawMessage {
RawMessage {
source: Some(PeerId::random()),
data: vec![0; 100],
sequence_number: None,
@ -3701,21 +3697,21 @@ mod local_test {
}
}
fn test_subscription() -> GossipsubSubscription {
GossipsubSubscription {
action: GossipsubSubscriptionAction::Subscribe,
fn test_subscription() -> Subscription {
Subscription {
action: SubscriptionAction::Subscribe,
topic_hash: IdentTopic::new("TestTopic").hash(),
}
}
fn test_control() -> GossipsubControlAction {
GossipsubControlAction::IHave {
fn test_control() -> ControlAction {
ControlAction::IHave {
topic_hash: IdentTopic::new("TestTopic").hash(),
message_ids: vec![MessageId(vec![12u8]); 5],
}
}
impl Arbitrary for GossipsubRpc {
impl Arbitrary for Rpc {
fn arbitrary(g: &mut Gen) -> Self {
let mut rpc = empty_rpc();
@ -3736,12 +3732,12 @@ mod local_test {
/// Tests RPC message fragmentation
fn test_message_fragmentation_deterministic() {
let max_transmit_size = 500;
let config = crate::GossipsubConfigBuilder::default()
let config = crate::config::ConfigBuilder::default()
.max_transmit_size(max_transmit_size)
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap();
// Message under the limit should be fine.
let mut rpc = empty_rpc();
@ -3782,14 +3778,14 @@ mod local_test {
#[test]
fn test_message_fragmentation() {
fn prop(rpc: GossipsubRpc) {
fn prop(rpc: Rpc) {
let max_transmit_size = 500;
let config = crate::GossipsubConfigBuilder::default()
let config = crate::config::ConfigBuilder::default()
.max_transmit_size(max_transmit_size)
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap();
let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap();
let mut length_codec = unsigned_varint::codec::UviBytes::default();
length_codec.set_max_len(max_transmit_size);

File diff suppressed because it is too large Load Diff

View File

@ -24,7 +24,7 @@ use std::time::Duration;
use libp2p_core::PeerId;
use crate::types::{FastMessageId, GossipsubMessage, MessageId, RawGossipsubMessage};
use crate::types::{FastMessageId, Message, MessageId, RawMessage};
/// The types of message validation that can be employed by gossipsub.
#[derive(Debug, Clone)]
@ -51,16 +51,16 @@ pub enum ValidationMode {
/// Selector for custom Protocol Id
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum GossipsubVersion {
pub enum Version {
V1_0,
V1_1,
}
/// Configuration parameters that define the performance of the gossipsub network.
#[derive(Clone)]
pub struct GossipsubConfig {
pub struct Config {
protocol_id: Cow<'static, str>,
custom_id_version: Option<GossipsubVersion>,
custom_id_version: Option<Version>,
history_length: usize,
history_gossip: usize,
mesh_n: usize,
@ -78,9 +78,8 @@ pub struct GossipsubConfig {
duplicate_cache_time: Duration,
validate_messages: bool,
validation_mode: ValidationMode,
message_id_fn: Arc<dyn Fn(&GossipsubMessage) -> MessageId + Send + Sync + 'static>,
fast_message_id_fn:
Option<Arc<dyn Fn(&RawGossipsubMessage) -> FastMessageId + Send + Sync + 'static>>,
message_id_fn: Arc<dyn Fn(&Message) -> MessageId + Send + Sync + 'static>,
fast_message_id_fn: Option<Arc<dyn Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static>>,
allow_self_origin: bool,
do_px: bool,
prune_peers: usize,
@ -101,22 +100,22 @@ pub struct GossipsubConfig {
published_message_ids_cache_time: Duration,
}
impl GossipsubConfig {
impl Config {
// All the getters
/// The protocol id to negotiate this protocol. By default, the resulting protocol id has the form
/// `/<prefix>/<supported-versions>`, but can optionally be changed to a literal form by providing some GossipsubVersion as custom_id_version.
/// `/<prefix>/<supported-versions>`, but can optionally be changed to a literal form by providing some Version as custom_id_version.
/// As gossipsub supports version 1.0 and 1.1, there are two suffixes supported for the resulting protocol id.
///
/// Calling `GossipsubConfigBuilder::protocol_id_prefix` will set a new prefix and retain the prefix logic.
/// Calling `GossipsubConfigBuilder::protocol_id` will set a custom `protocol_id` and disable the prefix logic.
/// Calling [`ConfigBuilder::protocol_id_prefix`] will set a new prefix and retain the prefix logic.
/// Calling [`ConfigBuilder::protocol_id`] will set a custom `protocol_id` and disable the prefix logic.
///
/// The default prefix is `meshsub`, giving the supported protocol ids: `/meshsub/1.1.0` and `/meshsub/1.0.0`, negotiated in that order.
pub fn protocol_id(&self) -> &Cow<'static, str> {
&self.protocol_id
}
pub fn custom_id_version(&self) -> &Option<GossipsubVersion> {
pub fn custom_id_version(&self) -> &Option<Version> {
&self.custom_id_version
}
@ -217,7 +216,7 @@ impl GossipsubConfig {
/// When set to `true`, prevents automatic forwarding of all received messages. This setting
/// allows a user to validate the messages before propagating them to their peers. If set to
/// true, the user must manually call [`crate::Gossipsub::report_message_validation_result()`]
/// true, the user must manually call [`crate::Behaviour::report_message_validation_result()`]
/// on the behaviour to forward message once validated (default is `false`).
/// The default is `false`.
pub fn validate_messages(&self) -> bool {
@ -236,21 +235,21 @@ impl GossipsubConfig {
/// addressing, where this function may be set to `hash(message)`. This would prevent messages
/// of the same content from being duplicated.
///
/// The function takes a [`GossipsubMessage`] as input and outputs a String to be interpreted as
/// The function takes a [`Message`] as input and outputs a String to be interpreted as
/// the message id.
pub fn message_id(&self, message: &GossipsubMessage) -> MessageId {
pub fn message_id(&self, message: &Message) -> MessageId {
(self.message_id_fn)(message)
}
/// A user-defined optional function that computes fast ids from raw messages. This can be used
/// to avoid possibly expensive transformations from [`RawGossipsubMessage`] to
/// [`GossipsubMessage`] for duplicates. Two semantically different messages must always
/// to avoid possibly expensive transformations from [`RawMessage`] to
/// [`Message`] for duplicates. Two semantically different messages must always
/// have different fast message ids, but it is allowed that two semantically identical messages
/// have different fast message ids as long as the message_id_fn produces the same id for them.
///
/// The function takes a [`RawGossipsubMessage`] as input and outputs a String to be
/// The function takes a [`RawMessage`] as input and outputs a String to be
/// interpreted as the fast message id. Default is None.
pub fn fast_message_id(&self, message: &RawGossipsubMessage) -> Option<FastMessageId> {
pub fn fast_message_id(&self, message: &RawMessage) -> Option<FastMessageId> {
self.fast_message_id_fn
.as_ref()
.map(|fast_message_id_fn| fast_message_id_fn(message))
@ -391,24 +390,24 @@ impl GossipsubConfig {
}
}
impl Default for GossipsubConfig {
impl Default for Config {
fn default() -> Self {
// use ConfigBuilder to also validate defaults
GossipsubConfigBuilder::default()
ConfigBuilder::default()
.build()
.expect("Default config parameters should be valid parameters")
}
}
/// The builder struct for constructing a gossipsub configuration.
pub struct GossipsubConfigBuilder {
config: GossipsubConfig,
pub struct ConfigBuilder {
config: Config,
}
impl Default for GossipsubConfigBuilder {
impl Default for ConfigBuilder {
fn default() -> Self {
GossipsubConfigBuilder {
config: GossipsubConfig {
ConfigBuilder {
config: Config {
protocol_id: Cow::Borrowed("meshsub"),
custom_id_version: None,
history_length: 5,
@ -466,13 +465,13 @@ impl Default for GossipsubConfigBuilder {
}
}
impl From<GossipsubConfig> for GossipsubConfigBuilder {
fn from(config: GossipsubConfig) -> Self {
GossipsubConfigBuilder { config }
impl From<Config> for ConfigBuilder {
fn from(config: Config) -> Self {
ConfigBuilder { config }
}
}
impl GossipsubConfigBuilder {
impl ConfigBuilder {
/// The protocol id prefix to negotiate this protocol (default is `/meshsub/1.0.0`).
pub fn protocol_id_prefix(
&mut self,
@ -487,7 +486,7 @@ impl GossipsubConfigBuilder {
pub fn protocol_id(
&mut self,
protocol_id: impl Into<Cow<'static, str>>,
custom_id_version: GossipsubVersion,
custom_id_version: Version,
) -> &mut Self {
self.config.custom_id_version = Some(custom_id_version);
self.config.protocol_id = protocol_id.into();
@ -600,7 +599,7 @@ impl GossipsubConfigBuilder {
/// When set, prevents automatic forwarding of all received messages. This setting
/// allows a user to validate the messages before propagating them to their peers. If set,
/// the user must manually call [`crate::Gossipsub::report_message_validation_result()`] on the
/// the user must manually call [`crate::Behaviour::report_message_validation_result()`] on the
/// behaviour to forward a message once validated.
pub fn validate_messages(&mut self) -> &mut Self {
self.config.validate_messages = true;
@ -620,27 +619,27 @@ impl GossipsubConfigBuilder {
/// addressing, where this function may be set to `hash(message)`. This would prevent messages
/// of the same content from being duplicated.
///
/// The function takes a [`GossipsubMessage`] as input and outputs a String to be
/// The function takes a [`Message`] as input and outputs a String to be
/// interpreted as the message id.
pub fn message_id_fn<F>(&mut self, id_fn: F) -> &mut Self
where
F: Fn(&GossipsubMessage) -> MessageId + Send + Sync + 'static,
F: Fn(&Message) -> MessageId + Send + Sync + 'static,
{
self.config.message_id_fn = Arc::new(id_fn);
self
}
/// A user-defined optional function that computes fast ids from raw messages. This can be used
/// to avoid possibly expensive transformations from [`RawGossipsubMessage`] to
/// [`GossipsubMessage`] for duplicates. Two semantically different messages must always
/// to avoid possibly expensive transformations from [`RawMessage`] to
/// [`Message`] for duplicates. Two semantically different messages must always
/// have different fast message ids, but it is allowed that two semantically identical messages
/// have different fast message ids as long as the message_id_fn produces the same id for them.
///
/// The function takes a [`RawGossipsubMessage`] as input and outputs a String to be interpreted
/// The function takes a [`Message`] as input and outputs a String to be interpreted
/// as the fast message id. Default is None.
pub fn fast_message_id_fn<F>(&mut self, fast_id_fn: F) -> &mut Self
where
F: Fn(&RawGossipsubMessage) -> FastMessageId + Send + Sync + 'static,
F: Fn(&RawMessage) -> FastMessageId + Send + Sync + 'static,
{
self.config.fast_message_id_fn = Some(Arc::new(fast_id_fn));
self
@ -801,8 +800,8 @@ impl GossipsubConfigBuilder {
self
}
/// Constructs a [`GossipsubConfig`] from the given configuration and validates the settings.
pub fn build(&self) -> Result<GossipsubConfig, &'static str> {
/// Constructs a [`Config`] from the given configuration and validates the settings.
pub fn build(&self) -> Result<Config, &'static str> {
// check all constraints on config
if self.config.max_transmit_size < 100 {
@ -838,7 +837,7 @@ impl GossipsubConfigBuilder {
}
}
impl std::fmt::Debug for GossipsubConfig {
impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut builder = f.debug_struct("GossipsubConfig");
let _ = builder.field("protocol_id", &self.protocol_id);
@ -895,7 +894,7 @@ mod test {
#[test]
fn create_thing() {
let builder: GossipsubConfig = GossipsubConfigBuilder::default()
let builder: Config = ConfigBuilder::default()
.protocol_id_prefix("purple")
.build()
.unwrap();
@ -903,8 +902,8 @@ mod test {
dbg!(builder);
}
fn get_gossipsub_message() -> GossipsubMessage {
GossipsubMessage {
fn get_gossipsub_message() -> Message {
Message {
source: None,
data: vec![12, 34, 56],
sequence_number: None,
@ -918,7 +917,7 @@ mod test {
])
}
fn message_id_plain_function(message: &GossipsubMessage) -> MessageId {
fn message_id_plain_function(message: &Message) -> MessageId {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
let mut v = s.finish().to_string();
@ -928,7 +927,7 @@ mod test {
#[test]
fn create_config_with_message_id_as_plain_function() {
let builder: GossipsubConfig = GossipsubConfigBuilder::default()
let builder: Config = ConfigBuilder::default()
.protocol_id_prefix("purple")
.message_id_fn(message_id_plain_function)
.build()
@ -940,7 +939,7 @@ mod test {
#[test]
fn create_config_with_message_id_as_closure() {
let closure = |message: &GossipsubMessage| {
let closure = |message: &Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
let mut v = s.finish().to_string();
@ -948,7 +947,7 @@ mod test {
MessageId::from(v)
};
let builder: GossipsubConfig = GossipsubConfigBuilder::default()
let builder: Config = ConfigBuilder::default()
.protocol_id_prefix("purple")
.message_id_fn(closure)
.build()
@ -961,7 +960,7 @@ mod test {
#[test]
fn create_config_with_message_id_as_closure_with_variable_capture() {
let captured: char = 'e';
let closure = move |message: &GossipsubMessage| {
let closure = move |message: &Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
let mut v = s.finish().to_string();
@ -969,7 +968,7 @@ mod test {
MessageId::from(v)
};
let builder: GossipsubConfig = GossipsubConfigBuilder::default()
let builder: Config = ConfigBuilder::default()
.protocol_id_prefix("purple")
.message_id_fn(closure)
.build()
@ -981,7 +980,7 @@ mod test {
#[test]
fn create_config_with_protocol_id_prefix() {
let builder: GossipsubConfig = GossipsubConfigBuilder::default()
let builder: Config = ConfigBuilder::default()
.protocol_id_prefix("purple")
.validation_mode(ValidationMode::Anonymous)
.message_id_fn(message_id_plain_function)
@ -1005,15 +1004,15 @@ mod test {
#[test]
fn create_config_with_custom_protocol_id() {
let builder: GossipsubConfig = GossipsubConfigBuilder::default()
.protocol_id("purple", GossipsubVersion::V1_0)
let builder: Config = ConfigBuilder::default()
.protocol_id("purple", Version::V1_0)
.validation_mode(ValidationMode::Anonymous)
.message_id_fn(message_id_plain_function)
.build()
.unwrap();
assert_eq!(builder.protocol_id(), "purple");
assert_eq!(builder.custom_id_version(), &Some(GossipsubVersion::V1_0));
assert_eq!(builder.custom_id_version(), &Some(Version::V1_0));
let protocol_config = ProtocolConfig::new(&builder);
let protocol_ids = protocol_config.protocol_info();

View File

@ -86,9 +86,15 @@ impl From<SigningError> for PublishError {
}
}
#[deprecated(
since = "0.44.0",
note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::HandlerError"
)]
pub type GossipsubHandlerError = HandlerError;
/// Errors that can occur in the protocols handler.
#[derive(Debug, Error)]
pub enum GossipsubHandlerError {
pub enum HandlerError {
#[error("The maximum number of inbound substreams created has been exceeded.")]
MaxInboundSubstreams,
#[error("The maximum number of outbound substreams created has been exceeded.")]
@ -134,9 +140,9 @@ impl std::fmt::Display for ValidationError {
impl std::error::Error for ValidationError {}
impl From<std::io::Error> for GossipsubHandlerError {
fn from(error: std::io::Error) -> GossipsubHandlerError {
GossipsubHandlerError::Codec(prost_codec::Error::from(error))
impl From<std::io::Error> for HandlerError {
fn from(error: std::io::Error) -> HandlerError {
HandlerError::Codec(prost_codec::Error::from(error))
}
}

View File

@ -18,9 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::error::{GossipsubHandlerError, ValidationError};
use crate::error::{HandlerError, ValidationError};
use crate::protocol::{GossipsubCodec, ProtocolConfig};
use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage};
use crate::types::{PeerKind, RawMessage, Rpc};
use asynchronous_codec::Framed;
use futures::prelude::*;
use futures::StreamExt;
@ -53,10 +53,10 @@ pub enum HandlerEvent {
/// any) that were received.
Message {
/// The GossipsubRPC message excluding any invalid messages.
rpc: GossipsubRpc,
rpc: Rpc,
/// Any invalid messages that were received in the RPC, along with the associated
/// validation error.
invalid_messages: Vec<(RawGossipsubMessage, ValidationError)>,
invalid_messages: Vec<(RawMessage, ValidationError)>,
},
/// An inbound or outbound substream has been established with the peer and this informs over
/// which protocol. This message only occurs once per connection.
@ -65,7 +65,7 @@ pub enum HandlerEvent {
/// A message sent from the behaviour to the handler.
#[derive(Debug)]
pub enum GossipsubHandlerIn {
pub enum HandlerIn {
/// A gossipsub message to send.
Message(crate::rpc_proto::Rpc),
/// The peer has joined the mesh.
@ -82,7 +82,7 @@ pub enum GossipsubHandlerIn {
const MAX_SUBSTREAM_CREATION: usize = 5;
/// Protocol Handler that manages a single long-lived substream with a peer.
pub struct GossipsubHandler {
pub struct Handler {
/// Upgrade configuration for the gossipsub protocol.
listen_protocol: SubstreamProtocol<ProtocolConfig, ()>,
@ -124,7 +124,7 @@ pub struct GossipsubHandler {
idle_timeout: Duration,
/// Collection of errors from attempting an upgrade.
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<GossipsubHandlerError>>,
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<HandlerError>>,
/// Flag determining whether to maintain the connection to the peer.
keep_alive: KeepAlive,
@ -161,10 +161,10 @@ enum OutboundSubstreamState {
Poisoned,
}
impl GossipsubHandler {
/// Builds a new [`GossipsubHandler`].
impl Handler {
/// Builds a new [`Handler`].
pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self {
GossipsubHandler {
Handler {
listen_protocol: SubstreamProtocol::new(protocol_config, ()),
inbound_substream: None,
outbound_substream: None,
@ -245,10 +245,10 @@ impl GossipsubHandler {
}
}
impl ConnectionHandler for GossipsubHandler {
type InEvent = GossipsubHandlerIn;
impl ConnectionHandler for Handler {
type InEvent = HandlerIn;
type OutEvent = HandlerEvent;
type Error = GossipsubHandlerError;
type Error = HandlerError;
type InboundOpenInfo = ();
type InboundProtocol = ProtocolConfig;
type OutboundOpenInfo = crate::rpc_proto::Rpc;
@ -258,17 +258,17 @@ impl ConnectionHandler for GossipsubHandler {
self.listen_protocol.clone()
}
fn on_behaviour_event(&mut self, message: GossipsubHandlerIn) {
fn on_behaviour_event(&mut self, message: HandlerIn) {
if !self.protocol_unsupported {
match message {
GossipsubHandlerIn::Message(m) => self.send_queue.push(m),
HandlerIn::Message(m) => self.send_queue.push(m),
// If we have joined the mesh, keep the connection alive.
GossipsubHandlerIn::JoinedMesh => {
HandlerIn::JoinedMesh => {
self.in_mesh = true;
self.keep_alive = KeepAlive::Yes;
}
// If we have left the mesh, start the idle timer.
GossipsubHandlerIn::LeftMesh => {
HandlerIn::LeftMesh => {
self.in_mesh = false;
self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout);
}
@ -296,7 +296,7 @@ impl ConnectionHandler for GossipsubHandler {
let reported_error = match error {
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => {
Some(GossipsubHandlerError::NegotiationTimeout)
Some(HandlerError::NegotiationTimeout)
}
// There was an error post negotiation, close the connection.
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e),
@ -319,7 +319,7 @@ impl ConnectionHandler for GossipsubHandler {
}
}
NegotiationError::ProtocolError(e) => {
Some(GossipsubHandlerError::NegotiationProtocolError(e))
Some(HandlerError::NegotiationProtocolError(e))
}
}
}
@ -343,7 +343,7 @@ impl ConnectionHandler for GossipsubHandler {
if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION {
// Too many inbound substreams have been created, end the connection.
return Poll::Ready(ConnectionHandlerEvent::Close(
GossipsubHandlerError::MaxInboundSubstreams,
HandlerError::MaxInboundSubstreams,
));
}
@ -354,7 +354,7 @@ impl ConnectionHandler for GossipsubHandler {
{
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION {
return Poll::Ready(ConnectionHandlerEvent::Close(
GossipsubHandlerError::MaxOutboundSubstreams,
HandlerError::MaxOutboundSubstreams,
));
}
let message = self.send_queue.remove(0);
@ -384,7 +384,7 @@ impl ConnectionHandler for GossipsubHandler {
}
Poll::Ready(Some(Err(error))) => {
match error {
GossipsubHandlerError::MaxTransmissionSize => {
HandlerError::MaxTransmissionSize => {
warn!("Message exceeded the maximum transmission size");
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
@ -471,7 +471,7 @@ impl ConnectionHandler for GossipsubHandler {
self.outbound_substream =
Some(OutboundSubstreamState::PendingFlush(substream))
}
Err(GossipsubHandlerError::MaxTransmissionSize) => {
Err(HandlerError::MaxTransmissionSize) => {
error!("Message exceeded the maximum transmission size and was not sent.");
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));

View File

@ -67,31 +67,31 @@
//!
//! # Using Gossipsub
//!
//! ## GossipsubConfig
//! ## Gossipsub Config
//!
//! The [`GossipsubConfig`] struct specifies various network performance/tuning configuration
//! The [`Config`] struct specifies various network performance/tuning configuration
//! parameters. Specifically it specifies:
//!
//! [`GossipsubConfig`]: struct.Config.html
//! [`Config`]: struct.Config.html
//!
//! This struct implements the [`Default`] trait and can be initialised via
//! [`GossipsubConfig::default()`].
//! [`Config::default()`].
//!
//!
//! ## Gossipsub
//! ## Behaviour
//!
//! The [`Gossipsub`] struct implements the [`libp2p_swarm::NetworkBehaviour`] trait allowing it to
//! The [`Behaviour`] struct implements the [`libp2p_swarm::NetworkBehaviour`] trait allowing it to
//! act as the routing behaviour in a [`libp2p_swarm::Swarm`]. This struct requires an instance of
//! [`libp2p_core::PeerId`] and [`GossipsubConfig`].
//! [`libp2p_core::PeerId`] and [`Config`].
//!
//! [`Gossipsub`]: struct.Gossipsub.html
//! [`Behaviour`]: struct.Behaviour.html
//! ## Example
//!
//! An example of initialising a gossipsub compatible swarm:
//!
//! ```
//! use libp2p_gossipsub::GossipsubEvent;
//! use libp2p_gossipsub::Event;
//! use libp2p_core::{identity::Keypair,transport::{Transport, MemoryTransport}, Multiaddr};
//! use libp2p_gossipsub::MessageAuthenticity;
//! let local_key = Keypair::generate_ed25519();
@ -115,10 +115,10 @@
//! // Create a Swarm to manage peers and events
//! let mut swarm = {
//! // set default parameters for gossipsub
//! let gossipsub_config = libp2p_gossipsub::GossipsubConfig::default();
//! let gossipsub_config = libp2p_gossipsub::Config::default();
//! // build a gossipsub network behaviour
//! let mut gossipsub: libp2p_gossipsub::Gossipsub =
//! libp2p_gossipsub::Gossipsub::new(message_authenticity, gossipsub_config).unwrap();
//! let mut gossipsub: libp2p_gossipsub::Behaviour =
//! libp2p_gossipsub::Behaviour::new(message_authenticity, gossipsub_config).unwrap();
//! // subscribe to the topic
//! gossipsub.subscribe(&topic);
//! // create the swarm (use an executor in a real example)
@ -156,18 +156,64 @@ mod types;
mod rpc_proto;
pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity};
pub use self::transform::{DataTransform, IdentityTransform};
pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, GossipsubVersion, ValidationMode};
pub use self::behaviour::{Behaviour, Event, MessageAuthenticity};
pub use self::config::{Config, ConfigBuilder, ValidationMode, Version};
pub use self::error::{HandlerError, PublishError, SubscriptionError, ValidationError};
pub use self::peer_score::{
score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
TopicScoreParams,
};
pub use self::topic::{Hasher, Topic, TopicHash};
pub use self::types::{
FastMessageId, GossipsubMessage, GossipsubRpc, MessageAcceptance, MessageId,
RawGossipsubMessage,
};
pub use self::transform::{DataTransform, IdentityTransform};
pub use self::types::{FastMessageId, Message, MessageAcceptance, MessageId, RawMessage, Rpc};
#[deprecated(
since = "0.44.0",
note = "Use `Behaviour` instead of `Gossipsub` for Network Behaviour, i.e. `libp2p::gossipsub::Behaviour"
)]
pub type Gossipsub = Behaviour;
#[deprecated(
since = "0.44.0",
note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Event"
)]
pub type GossipsubEvent = Event;
#[deprecated(
since = "0.44.0",
note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Config"
)]
pub type GossipsubConfig = Config;
#[deprecated(
since = "0.44.0",
note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Message"
)]
pub type GossipsubMessage = Message;
#[deprecated(
since = "0.44.0",
note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Rpc"
)]
pub type GossipsubRpc = Rpc;
#[deprecated(
since = "0.44.0",
note = "Use re-exports that omit `Gossipsub` infix, i.e. `libp2p::gossipsub::RawMessage"
)]
pub type RawGossipsubMessage = RawMessage;
#[deprecated(
since = "0.44.0",
note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::ConfigBuilder"
)]
pub type GossipsubConfigBuilder = ConfigBuilder;
#[deprecated(
since = "0.44.0",
note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::Version"
)]
pub type GossipsubVersion = Version;
pub type IdentTopic = Topic<self::topic::IdentityHash>;
pub type Sha256Topic = Topic<self::topic::Sha256Hash>;

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::topic::TopicHash;
use crate::types::{MessageId, RawGossipsubMessage};
use crate::types::{MessageId, RawMessage};
use libp2p_core::PeerId;
use log::{debug, trace};
use std::collections::hash_map::Entry;
@ -39,7 +39,7 @@ pub struct CacheEntry {
/// MessageCache struct holding history of messages.
#[derive(Clone)]
pub struct MessageCache {
msgs: HashMap<MessageId, (RawGossipsubMessage, HashSet<PeerId>)>,
msgs: HashMap<MessageId, (RawMessage, HashSet<PeerId>)>,
/// For every message and peer the number of times this peer asked for the message
iwant_counts: HashMap<MessageId, HashMap<PeerId, u32>>,
history: Vec<Vec<CacheEntry>>,
@ -73,7 +73,7 @@ impl MessageCache {
/// Put a message into the memory cache.
///
/// Returns true if the message didn't already exist in the cache.
pub fn put(&mut self, message_id: &MessageId, msg: RawGossipsubMessage) -> bool {
pub fn put(&mut self, message_id: &MessageId, msg: RawMessage) -> bool {
match self.msgs.entry(message_id.clone()) {
Entry::Occupied(_) => {
// Don't add duplicate entries to the cache.
@ -108,7 +108,7 @@ impl MessageCache {
/// Get a message with `message_id`
#[cfg(test)]
pub fn get(&self, message_id: &MessageId) -> Option<&RawGossipsubMessage> {
pub fn get(&self, message_id: &MessageId) -> Option<&RawMessage> {
self.msgs.get(message_id).map(|(message, _)| message)
}
@ -118,7 +118,7 @@ impl MessageCache {
&mut self,
message_id: &MessageId,
peer: &PeerId,
) -> Option<(&RawGossipsubMessage, u32)> {
) -> Option<(&RawMessage, u32)> {
let iwant_counts = &mut self.iwant_counts;
self.msgs.get(message_id).and_then(|(message, _)| {
if !message.validated {
@ -140,10 +140,7 @@ impl MessageCache {
/// Gets a message with [`MessageId`] and tags it as validated.
/// This function also returns the known peers that have sent us this message. This is used to
/// prevent us sending redundant messages to peers who have already propagated it.
pub fn validate(
&mut self,
message_id: &MessageId,
) -> Option<(&RawGossipsubMessage, HashSet<PeerId>)> {
pub fn validate(&mut self, message_id: &MessageId) -> Option<(&RawMessage, HashSet<PeerId>)> {
self.msgs.get_mut(message_id).map(|(message, known_peers)| {
message.validated = true;
// Clear the known peers list (after a message is validated, it is forwarded and we no
@ -207,10 +204,7 @@ impl MessageCache {
}
/// Removes a message from the cache and returns it if existent
pub fn remove(
&mut self,
message_id: &MessageId,
) -> Option<(RawGossipsubMessage, HashSet<PeerId>)> {
pub fn remove(&mut self, message_id: &MessageId) -> Option<(RawMessage, HashSet<PeerId>)> {
//We only remove the message from msgs and iwant_count and keep the message_id in the
// history vector. Zhe id in the history vector will simply be ignored on popping.
@ -222,12 +216,12 @@ impl MessageCache {
#[cfg(test)]
mod tests {
use super::*;
use crate::types::RawGossipsubMessage;
use crate::types::RawMessage;
use crate::{IdentTopic as Topic, TopicHash};
use libp2p_core::PeerId;
fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawGossipsubMessage) {
let default_id = |message: &RawGossipsubMessage| {
fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawMessage) {
let default_id = |message: &RawMessage| {
// default message id is: source + sequence number
let mut source_string = message.source.as_ref().unwrap().to_base58();
source_string.push_str(&message.sequence_number.unwrap().to_string());
@ -238,7 +232,7 @@ mod tests {
let data: Vec<u8> = vec![u8x];
let sequence_number = Some(x);
let m = RawGossipsubMessage {
let m = RawMessage {
source,
data,
sequence_number,

View File

@ -21,8 +21,8 @@
/// A collection of unit tests mostly ported from the go implementation.
use super::*;
use crate::types::RawGossipsubMessage;
use crate::{GossipsubMessage, IdentTopic as Topic};
use crate::types::RawMessage;
use crate::{IdentTopic as Topic, Message};
// estimates a value within variance
fn within_variance(value: f64, expected: f64, variance: f64) -> bool {
@ -33,8 +33,8 @@ fn within_variance(value: f64, expected: f64, variance: f64) -> bool {
}
// generates a random gossipsub message with sequence number i
fn make_test_message(seq: u64) -> (MessageId, RawGossipsubMessage) {
let raw_message = RawGossipsubMessage {
fn make_test_message(seq: u64) -> (MessageId, RawMessage) {
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![12, 34, 56],
sequence_number: Some(seq),
@ -44,7 +44,7 @@ fn make_test_message(seq: u64) -> (MessageId, RawGossipsubMessage) {
validated: true,
};
let message = GossipsubMessage {
let message = Message {
source: raw_message.source,
data: raw_message.data.clone(),
sequence_number: raw_message.sequence_number,
@ -55,7 +55,7 @@ fn make_test_message(seq: u64) -> (MessageId, RawGossipsubMessage) {
(id, raw_message)
}
fn default_message_id() -> fn(&GossipsubMessage) -> MessageId {
fn default_message_id() -> fn(&Message) -> MessageId {
|message| {
// default message id is: source + sequence number
// NOTE: If either the peer_id or source is not provided, we set to 0;

View File

@ -18,15 +18,14 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::config::{GossipsubVersion, ValidationMode};
use crate::error::{GossipsubHandlerError, ValidationError};
use crate::config::{ValidationMode, Version};
use crate::error::{HandlerError, ValidationError};
use crate::handler::HandlerEvent;
use crate::topic::TopicHash;
use crate::types::{
GossipsubControlAction, GossipsubRpc, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId, PeerInfo, PeerKind, RawGossipsubMessage,
ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction,
};
use crate::{rpc_proto, GossipsubConfig};
use crate::{rpc_proto, Config};
use asynchronous_codec::{Decoder, Encoder, Framed};
use byteorder::{BigEndian, ByteOrder};
use bytes::BytesMut;
@ -36,7 +35,7 @@ use libp2p_core::{
identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo,
};
use log::{debug, warn};
use prost::Message as ProtobufMessage;
use prost::Message as _;
use std::pin::Pin;
use unsigned_varint::codec;
@ -57,15 +56,15 @@ impl ProtocolConfig {
/// Builds a new [`ProtocolConfig`].
///
/// Sets the maximum gossip transmission size.
pub fn new(gossipsub_config: &GossipsubConfig) -> ProtocolConfig {
pub fn new(gossipsub_config: &Config) -> ProtocolConfig {
let protocol_ids = match gossipsub_config.custom_id_version() {
Some(v) => match v {
GossipsubVersion::V1_0 => vec![ProtocolId::new(
Version::V1_0 => vec![ProtocolId::new(
gossipsub_config.protocol_id(),
PeerKind::Gossipsub,
false,
)],
GossipsubVersion::V1_1 => vec![ProtocolId::new(
Version::V1_1 => vec![ProtocolId::new(
gossipsub_config.protocol_id(),
PeerKind::Gossipsubv1_1,
false,
@ -149,7 +148,7 @@ where
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
type Error = GossipsubHandlerError;
type Error = HandlerError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
@ -170,7 +169,7 @@ where
TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
{
type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
type Error = GossipsubHandlerError;
type Error = HandlerError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
@ -271,22 +270,18 @@ impl GossipsubCodec {
impl Encoder for GossipsubCodec {
type Item = rpc_proto::Rpc;
type Error = GossipsubHandlerError;
type Error = HandlerError;
fn encode(
&mut self,
item: Self::Item,
dst: &mut BytesMut,
) -> Result<(), GossipsubHandlerError> {
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), HandlerError> {
Ok(self.codec.encode(item, dst)?)
}
}
impl Decoder for GossipsubCodec {
type Item = HandlerEvent;
type Error = GossipsubHandlerError;
type Error = HandlerError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, GossipsubHandlerError> {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, HandlerError> {
let rpc = match self.codec.decode(src)? {
Some(p) => p,
None => return Ok(None),
@ -341,7 +336,7 @@ impl Decoder for GossipsubCodec {
// If the initial validation logic failed, add the message to invalid messages and
// continue processing the others.
if let Some(validation_error) = invalid_kind.take() {
let message = RawGossipsubMessage {
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
sequence_number: None, // don't inform the application
@ -361,7 +356,7 @@ impl Decoder for GossipsubCodec {
// Build the invalid message (ignoring further validation of sequence number
// and source)
let message = RawGossipsubMessage {
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
sequence_number: None, // don't inform the application
@ -386,7 +381,7 @@ impl Decoder for GossipsubCodec {
seq_no,
seq_no.len()
);
let message = RawGossipsubMessage {
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
sequence_number: None, // don't inform the application
@ -405,7 +400,7 @@ impl Decoder for GossipsubCodec {
} else {
// sequence number was not present
debug!("Sequence number not present but expected");
let message = RawGossipsubMessage {
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
sequence_number: None, // don't inform the application
@ -431,7 +426,7 @@ impl Decoder for GossipsubCodec {
Err(_) => {
// invalid peer id, add to invalid messages
debug!("Message source has an invalid PeerId");
let message = RawGossipsubMessage {
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
sequence_number,
@ -455,7 +450,7 @@ impl Decoder for GossipsubCodec {
};
// This message has passed all validation, add it to the validated messages.
messages.push(RawGossipsubMessage {
messages.push(RawMessage {
source,
data: message.data.unwrap_or_default(),
sequence_number,
@ -470,10 +465,10 @@ impl Decoder for GossipsubCodec {
if let Some(rpc_control) = rpc.control {
// Collect the gossipsub control messages
let ihave_msgs: Vec<GossipsubControlAction> = rpc_control
let ihave_msgs: Vec<ControlAction> = rpc_control
.ihave
.into_iter()
.map(|ihave| GossipsubControlAction::IHave {
.map(|ihave| ControlAction::IHave {
topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
message_ids: ihave
.message_ids
@ -483,10 +478,10 @@ impl Decoder for GossipsubCodec {
})
.collect();
let iwant_msgs: Vec<GossipsubControlAction> = rpc_control
let iwant_msgs: Vec<ControlAction> = rpc_control
.iwant
.into_iter()
.map(|iwant| GossipsubControlAction::IWant {
.map(|iwant| ControlAction::IWant {
message_ids: iwant
.message_ids
.into_iter()
@ -495,10 +490,10 @@ impl Decoder for GossipsubCodec {
})
.collect();
let graft_msgs: Vec<GossipsubControlAction> = rpc_control
let graft_msgs: Vec<ControlAction> = rpc_control
.graft
.into_iter()
.map(|graft| GossipsubControlAction::Graft {
.map(|graft| ControlAction::Graft {
topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
})
.collect();
@ -523,7 +518,7 @@ impl Decoder for GossipsubCodec {
.collect::<Vec<PeerInfo>>();
let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default());
prune_msgs.push(GossipsubControlAction::Prune {
prune_msgs.push(ControlAction::Prune {
topic_hash,
peers,
backoff: prune.backoff,
@ -537,16 +532,16 @@ impl Decoder for GossipsubCodec {
}
Ok(Some(HandlerEvent::Message {
rpc: GossipsubRpc {
rpc: Rpc {
messages,
subscriptions: rpc
.subscriptions
.into_iter()
.map(|sub| GossipsubSubscription {
.map(|sub| Subscription {
action: if Some(true) == sub.subscribe {
GossipsubSubscriptionAction::Subscribe
SubscriptionAction::Subscribe
} else {
GossipsubSubscriptionAction::Unsubscribe
SubscriptionAction::Unsubscribe
},
topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
})
@ -561,23 +556,23 @@ impl Decoder for GossipsubCodec {
#[cfg(test)]
mod tests {
use super::*;
use crate::config::GossipsubConfig;
use crate::Gossipsub;
use crate::config::Config;
use crate::Behaviour;
use crate::IdentTopic as Topic;
use libp2p_core::identity::Keypair;
use quickcheck::*;
#[derive(Clone, Debug)]
struct Message(RawGossipsubMessage);
struct Message(RawMessage);
impl Arbitrary for Message {
fn arbitrary(g: &mut Gen) -> Self {
let keypair = TestKeypair::arbitrary(g);
// generate an arbitrary GossipsubMessage using the behaviour signing functionality
let config = GossipsubConfig::default();
let gs: Gossipsub =
Gossipsub::new(crate::MessageAuthenticity::Signed(keypair.0), config).unwrap();
let config = Config::default();
let gs: Behaviour =
Behaviour::new(crate::MessageAuthenticity::Signed(keypair.0), config).unwrap();
let data = (0..g.gen_range(10..10024u32))
.map(|_| u8::arbitrary(g))
.collect::<Vec<_>>();
@ -636,7 +631,7 @@ mod tests {
fn prop(message: Message) {
let message = message.0;
let rpc = GossipsubRpc {
let rpc = Rpc {
messages: vec![message],
subscriptions: vec![],
control_msgs: vec![],

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::types::GossipsubSubscription;
use crate::types::Subscription;
use crate::TopicHash;
use log::debug;
use std::collections::{BTreeSet, HashMap, HashSet};
@ -32,10 +32,10 @@ pub trait TopicSubscriptionFilter {
/// [`Self::filter_incoming_subscription_set`] on the filtered set.
fn filter_incoming_subscriptions<'a>(
&mut self,
subscriptions: &'a [GossipsubSubscription],
subscriptions: &'a [Subscription],
currently_subscribed_topics: &BTreeSet<TopicHash>,
) -> Result<HashSet<&'a GossipsubSubscription>, String> {
let mut filtered_subscriptions: HashMap<TopicHash, &GossipsubSubscription> = HashMap::new();
) -> Result<HashSet<&'a Subscription>, String> {
let mut filtered_subscriptions: HashMap<TopicHash, &Subscription> = HashMap::new();
for subscription in subscriptions {
use std::collections::hash_map::Entry::*;
match filtered_subscriptions.entry(subscription.topic_hash.clone()) {
@ -59,9 +59,9 @@ pub trait TopicSubscriptionFilter {
/// By default this filters the elements based on [`Self::allow_incoming_subscription`].
fn filter_incoming_subscription_set<'a>(
&mut self,
mut subscriptions: HashSet<&'a GossipsubSubscription>,
mut subscriptions: HashSet<&'a Subscription>,
_currently_subscribed_topics: &BTreeSet<TopicHash>,
) -> Result<HashSet<&'a GossipsubSubscription>, String> {
) -> Result<HashSet<&'a Subscription>, String> {
subscriptions.retain(|s| {
if self.allow_incoming_subscription(s) {
true
@ -78,7 +78,7 @@ pub trait TopicSubscriptionFilter {
/// whether to filter out a subscription or not.
/// By default this uses can_subscribe to decide the same for incoming subscriptions as for
/// outgoing ones.
fn allow_incoming_subscription(&mut self, subscription: &GossipsubSubscription) -> bool {
fn allow_incoming_subscription(&mut self, subscription: &Subscription) -> bool {
self.can_subscribe(&subscription.topic_hash)
}
}
@ -119,9 +119,9 @@ impl<T: TopicSubscriptionFilter> TopicSubscriptionFilter for MaxCountSubscriptio
fn filter_incoming_subscriptions<'a>(
&mut self,
subscriptions: &'a [GossipsubSubscription],
subscriptions: &'a [Subscription],
currently_subscribed_topics: &BTreeSet<TopicHash>,
) -> Result<HashSet<&'a GossipsubSubscription>, String> {
) -> Result<HashSet<&'a Subscription>, String> {
if subscriptions.len() > self.max_subscriptions_per_request {
return Err("too many subscriptions per request".into());
}
@ -129,7 +129,7 @@ impl<T: TopicSubscriptionFilter> TopicSubscriptionFilter for MaxCountSubscriptio
.filter
.filter_incoming_subscriptions(subscriptions, currently_subscribed_topics)?;
use crate::types::GossipsubSubscriptionAction::*;
use crate::types::SubscriptionAction::*;
let mut unsubscribed = 0;
let mut new_subscribed = 0;
@ -176,9 +176,9 @@ where
fn filter_incoming_subscription_set<'a>(
&mut self,
subscriptions: HashSet<&'a GossipsubSubscription>,
subscriptions: HashSet<&'a Subscription>,
currently_subscribed_topics: &BTreeSet<TopicHash>,
) -> Result<HashSet<&'a GossipsubSubscription>, String> {
) -> Result<HashSet<&'a Subscription>, String> {
let intermediate = self
.filter1
.filter_incoming_subscription_set(subscriptions, currently_subscribed_topics)?;
@ -217,8 +217,8 @@ pub mod regex {
#[cfg(test)]
mod test {
use super::*;
use crate::types::GossipsubSubscription;
use crate::types::GossipsubSubscriptionAction::*;
use crate::types::Subscription;
use crate::types::SubscriptionAction::*;
#[test]
fn test_regex_subscription_filter() {
@ -230,15 +230,15 @@ pub mod regex {
let old = Default::default();
let subscriptions = vec![
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t1,
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t2,
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t3,
},
@ -255,7 +255,7 @@ pub mod regex {
#[cfg(test)]
mod test {
use super::*;
use crate::types::GossipsubSubscriptionAction::*;
use crate::types::SubscriptionAction::*;
use std::iter::FromIterator;
#[test]
@ -267,23 +267,23 @@ mod test {
let old = BTreeSet::from_iter(vec![t1.clone()].into_iter());
let subscriptions = vec![
GossipsubSubscription {
Subscription {
action: Unsubscribe,
topic_hash: t1.clone(),
},
GossipsubSubscription {
Subscription {
action: Unsubscribe,
topic_hash: t2.clone(),
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t2,
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t1.clone(),
},
GossipsubSubscription {
Subscription {
action: Unsubscribe,
topic_hash: t1,
},
@ -304,11 +304,11 @@ mod test {
let old = Default::default();
let subscriptions = vec![
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t1,
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t2,
},
@ -333,15 +333,15 @@ mod test {
let old = Default::default();
let subscriptions = vec![
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t1.clone(),
},
GossipsubSubscription {
Subscription {
action: Unsubscribe,
topic_hash: t1.clone(),
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t1,
},
@ -366,11 +366,11 @@ mod test {
let old = t[0..2].iter().cloned().collect();
let subscriptions = vec![
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t[2].clone(),
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t[3].clone(),
},
@ -395,23 +395,23 @@ mod test {
let old = t[0..2].iter().cloned().collect();
let subscriptions = vec![
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t[4].clone(),
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t[2].clone(),
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t[3].clone(),
},
GossipsubSubscription {
Subscription {
action: Unsubscribe,
topic_hash: t[0].clone(),
},
GossipsubSubscription {
Subscription {
action: Unsubscribe,
topic_hash: t[1].clone(),
},
@ -432,11 +432,11 @@ mod test {
let old = Default::default();
let subscriptions = vec![
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t1,
},
GossipsubSubscription {
Subscription {
action: Subscribe,
topic_hash: t2,
},

View File

@ -25,25 +25,22 @@
//! algorithms that can be topic-specific. Once the raw data is transformed the message-id is then
//! calculated, allowing for applications to employ message-id functions post compression.
use crate::{GossipsubMessage, RawGossipsubMessage, TopicHash};
use crate::{Message, RawMessage, TopicHash};
/// A general trait of transforming a [`RawGossipsubMessage`] into a [`GossipsubMessage`]. The
/// [`RawGossipsubMessage`] is obtained from the wire and the [`GossipsubMessage`] is used to
/// A general trait of transforming a [`RawMessage`] into a [`Message`]. The
/// [`RawMessage`] is obtained from the wire and the [`Message`] is used to
/// calculate the [`crate::MessageId`] of the message and is what is sent to the application.
///
/// The inbound/outbound transforms must be inverses. Applying the inbound transform and then the
/// outbound transform MUST leave the underlying data un-modified.
///
/// By default, this is the identity transform for all fields in [`GossipsubMessage`].
/// By default, this is the identity transform for all fields in [`Message`].
pub trait DataTransform {
/// Takes a [`RawGossipsubMessage`] received and converts it to a [`GossipsubMessage`].
fn inbound_transform(
&self,
raw_message: RawGossipsubMessage,
) -> Result<GossipsubMessage, std::io::Error>;
/// Takes a [`RawMessage`] received and converts it to a [`Message`].
fn inbound_transform(&self, raw_message: RawMessage) -> Result<Message, std::io::Error>;
/// Takes the data to be published (a topic and associated data) transforms the data. The
/// transformed data will then be used to create a [`crate::RawGossipsubMessage`] to be sent to peers.
/// transformed data will then be used to create a [`crate::RawMessage`] to be sent to peers.
fn outbound_transform(
&self,
topic: &TopicHash,
@ -56,11 +53,8 @@ pub trait DataTransform {
pub struct IdentityTransform;
impl DataTransform for IdentityTransform {
fn inbound_transform(
&self,
raw_message: RawGossipsubMessage,
) -> Result<GossipsubMessage, std::io::Error> {
Ok(GossipsubMessage {
fn inbound_transform(&self, raw_message: RawMessage) -> Result<Message, std::io::Error> {
Ok(Message {
source: raw_message.source,
data: raw_message.data,
sequence_number: raw_message.sequence_number,

View File

@ -24,7 +24,7 @@ use crate::TopicHash;
use libp2p_core::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
use prost::Message;
use prost::Message as _;
use std::fmt;
use std::fmt::Debug;
@ -110,7 +110,7 @@ pub enum PeerKind {
/// A message received by the gossipsub system and stored locally in caches..
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct RawGossipsubMessage {
pub struct RawMessage {
/// Id of the peer that published this message.
pub source: Option<PeerId>,
@ -133,7 +133,7 @@ pub struct RawGossipsubMessage {
pub validated: bool,
}
impl RawGossipsubMessage {
impl RawMessage {
/// Calculates the encoded length of this message (used for calculating metrics).
pub fn raw_protobuf_len(&self) -> usize {
let message = rpc_proto::Message {
@ -148,10 +148,10 @@ impl RawGossipsubMessage {
}
}
/// The message sent to the user after a [`RawGossipsubMessage`] has been transformed by a
/// The message sent to the user after a [`RawMessage`] has been transformed by a
/// [`crate::DataTransform`].
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct GossipsubMessage {
pub struct Message {
/// Id of the peer that published this message.
pub source: Option<PeerId>,
@ -165,9 +165,9 @@ pub struct GossipsubMessage {
pub topic: TopicHash,
}
impl fmt::Debug for GossipsubMessage {
impl fmt::Debug for Message {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GossipsubMessage")
f.debug_struct("Message")
.field(
"data",
&format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
@ -181,16 +181,16 @@ impl fmt::Debug for GossipsubMessage {
/// A subscription received by the gossipsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GossipsubSubscription {
pub struct Subscription {
/// Action to perform.
pub action: GossipsubSubscriptionAction,
pub action: SubscriptionAction,
/// The topic from which to subscribe or unsubscribe.
pub topic_hash: TopicHash,
}
/// Action that a subscription wants to perform.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum GossipsubSubscriptionAction {
pub enum SubscriptionAction {
/// The remote wants to subscribe to the given topic.
Subscribe,
/// The remote wants to unsubscribe from the given topic.
@ -207,7 +207,7 @@ pub struct PeerInfo {
/// A Control message received by the gossipsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum GossipsubControlAction {
pub enum ControlAction {
/// Node broadcasts known messages per topic - IHave control message.
IHave {
/// The topic of the messages.
@ -238,16 +238,16 @@ pub enum GossipsubControlAction {
/// An RPC received/sent.
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct GossipsubRpc {
pub struct Rpc {
/// List of messages that were part of this RPC query.
pub messages: Vec<RawGossipsubMessage>,
pub messages: Vec<RawMessage>,
/// List of subscriptions.
pub subscriptions: Vec<GossipsubSubscription>,
pub subscriptions: Vec<Subscription>,
/// List of Gossipsub control messages.
pub control_msgs: Vec<GossipsubControlAction>,
pub control_msgs: Vec<ControlAction>,
}
impl GossipsubRpc {
impl Rpc {
/// Converts the GossipsubRPC into its protobuf format.
// A convenience function to avoid explicitly specifying types.
pub fn into_protobuf(self) -> rpc_proto::Rpc {
@ -255,9 +255,9 @@ impl GossipsubRpc {
}
}
impl From<GossipsubRpc> for rpc_proto::Rpc {
impl From<Rpc> for rpc_proto::Rpc {
/// Converts the RPC into protobuf format.
fn from(rpc: GossipsubRpc) -> Self {
fn from(rpc: Rpc) -> Self {
// Messages
let mut publish = Vec::new();
@ -279,7 +279,7 @@ impl From<GossipsubRpc> for rpc_proto::Rpc {
.subscriptions
.into_iter()
.map(|sub| rpc_proto::rpc::SubOpts {
subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe),
subscribe: Some(sub.action == SubscriptionAction::Subscribe),
topic_id: Some(sub.topic_hash.into_string()),
})
.collect::<Vec<_>>();
@ -297,7 +297,7 @@ impl From<GossipsubRpc> for rpc_proto::Rpc {
for action in rpc.control_msgs {
match action {
// collect all ihave messages
GossipsubControlAction::IHave {
ControlAction::IHave {
topic_hash,
message_ids,
} => {
@ -307,19 +307,19 @@ impl From<GossipsubRpc> for rpc_proto::Rpc {
};
control.ihave.push(rpc_ihave);
}
GossipsubControlAction::IWant { message_ids } => {
ControlAction::IWant { message_ids } => {
let rpc_iwant = rpc_proto::ControlIWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.iwant.push(rpc_iwant);
}
GossipsubControlAction::Graft { topic_hash } => {
ControlAction::Graft { topic_hash } => {
let rpc_graft = rpc_proto::ControlGraft {
topic_id: Some(topic_hash.into_string()),
};
control.graft.push(rpc_graft);
}
GossipsubControlAction::Prune {
ControlAction::Prune {
topic_hash,
peers,
backoff,
@ -353,7 +353,7 @@ impl From<GossipsubRpc> for rpc_proto::Rpc {
}
}
impl fmt::Debug for GossipsubRpc {
impl fmt::Debug for Rpc {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut b = f.debug_struct("GossipsubRpc");
if !self.messages.is_empty() {

View File

@ -32,20 +32,17 @@ use futures::StreamExt;
use libp2p_core::{
identity, multiaddr::Protocol, transport::MemoryTransport, upgrade, Multiaddr, Transport,
};
use libp2p_gossipsub::{
Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic as Topic, MessageAuthenticity,
ValidationMode,
};
use libp2p_gossipsub as gossipsub;
use libp2p_plaintext::PlainText2Config;
use libp2p_swarm::{Swarm, SwarmEvent};
use libp2p_yamux as yamux;
struct Graph {
pub nodes: Vec<(Multiaddr, Swarm<Gossipsub>)>,
pub nodes: Vec<(Multiaddr, Swarm<gossipsub::Behaviour>)>,
}
impl Future for Graph {
type Output = (Multiaddr, GossipsubEvent);
type Output = (Multiaddr, gossipsub::Event);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
for (addr, node) in &mut self.nodes {
@ -77,7 +74,7 @@ impl Graph {
.cycle()
.take(num_nodes)
.map(|_| build_node())
.collect::<Vec<(Multiaddr, Swarm<Gossipsub>)>>();
.collect::<Vec<(Multiaddr, Swarm<gossipsub::Behaviour>)>>();
let mut connected_nodes = vec![not_connected_nodes.pop().unwrap()];
@ -112,7 +109,7 @@ impl Graph {
/// `true`.
///
/// Returns [`true`] on success and [`false`] on timeout.
fn wait_for<F: FnMut(&GossipsubEvent) -> bool>(&mut self, mut f: F) -> bool {
fn wait_for<F: FnMut(&gossipsub::Event) -> bool>(&mut self, mut f: F) -> bool {
let fut = futures::future::poll_fn(move |cx| match self.poll_unpin(cx) {
Poll::Ready((_addr, ev)) if f(&ev) => Poll::Ready(()),
_ => Poll::Pending,
@ -143,7 +140,7 @@ impl Graph {
}
}
fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
fn build_node() -> (Multiaddr, Swarm<gossipsub::Behaviour>) {
let key = identity::Keypair::generate_ed25519();
let public_key = key.public();
@ -162,15 +159,16 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
// reduce the default values of the heartbeat, so that all nodes will receive gossip in a
// timely fashion.
let config = GossipsubConfigBuilder::default()
let config = gossipsub::ConfigBuilder::default()
.heartbeat_initial_delay(Duration::from_millis(100))
.heartbeat_interval(Duration::from_millis(200))
.history_length(10)
.history_gossip(10)
.validation_mode(ValidationMode::Permissive)
.validation_mode(gossipsub::ValidationMode::Permissive)
.build()
.unwrap();
let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id), config).unwrap();
let behaviour =
gossipsub::Behaviour::new(gossipsub::MessageAuthenticity::Author(peer_id), config).unwrap();
let mut swarm = Swarm::without_executor(transport, behaviour, peer_id);
let port = 1 + random::<u64>();
@ -197,7 +195,7 @@ fn multi_hop_propagation() {
let number_nodes = graph.nodes.len();
// Subscribe each node to the same topic.
let topic = Topic::new("test-net");
let topic = gossipsub::IdentTopic::new("test-net");
for (_addr, node) in &mut graph.nodes {
node.behaviour_mut().subscribe(&topic).unwrap();
}
@ -205,7 +203,7 @@ fn multi_hop_propagation() {
// Wait for all nodes to be subscribed.
let mut subscribed = 0;
let all_subscribed = graph.wait_for(move |ev| {
if let GossipsubEvent::Subscribed { .. } = ev {
if let gossipsub::Event::Subscribed { .. } = ev {
subscribed += 1;
if subscribed == (number_nodes - 1) * 2 {
return true;
@ -234,7 +232,7 @@ fn multi_hop_propagation() {
// Wait for all nodes to receive the published message.
let mut received_msgs = 0;
let all_received = graph.wait_for(move |ev| {
if let GossipsubEvent::Message { .. } = ev {
if let gossipsub::Event::Message { .. } = ev {
received_msgs += 1;
if received_msgs == number_nodes - 1 {
return true;