refactor(gossipsub): don't store messages within Arc (#3243)

Currently, we store messages to be sent to the `ConnectionHandler` in an `Arc`. However, we never actually clone these messages as we can see with this patch, hence we remove this wrapping.

Related: https://github.com/libp2p/rust-libp2p/pull/3242
This commit is contained in:
Thomas Eizinger
2022-12-20 11:59:25 +11:00
committed by GitHub
parent de61a74d33
commit 88fa8e66b8
3 changed files with 108 additions and 118 deletions

View File

@ -25,7 +25,6 @@ use std::{
collections::{BTreeSet, HashMap}, collections::{BTreeSet, HashMap},
fmt, fmt,
net::IpAddr, net::IpAddr,
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
time::Duration, time::Duration,
}; };
@ -201,9 +200,6 @@ impl From<MessageAuthenticity> for PublishConfig {
} }
} }
type GossipsubNetworkBehaviourAction =
NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, Arc<GossipsubHandlerIn>>;
/// Network behaviour that handles the gossipsub protocol. /// Network behaviour that handles the gossipsub protocol.
/// ///
/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If /// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If
@ -223,7 +219,7 @@ pub struct Gossipsub<
config: GossipsubConfig, config: GossipsubConfig,
/// Events that need to be yielded to the outside when polling. /// Events that need to be yielded to the outside when polling.
events: VecDeque<GossipsubNetworkBehaviourAction>, events: VecDeque<NetworkBehaviourAction<GossipsubEvent, GossipsubHandler>>,
/// Pools non-urgent control messages between heartbeats. /// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>, control_pool: HashMap<PeerId, Vec<GossipsubControlAction>>,
@ -2903,7 +2899,7 @@ where
self.events self.events
.push_back(NetworkBehaviourAction::NotifyHandler { .push_back(NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
event: Arc::new(GossipsubHandlerIn::Message(message)), event: GossipsubHandlerIn::Message(message),
handler: NotifyHandler::Any, handler: NotifyHandler::Any,
}) })
} }
@ -3163,7 +3159,7 @@ where
self.events self.events
.push_back(NetworkBehaviourAction::NotifyHandler { .push_back(NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh), event: GossipsubHandlerIn::JoinedMesh,
handler: NotifyHandler::One(connections.connections[0]), handler: NotifyHandler::One(connections.connections[0]),
}); });
break; break;
@ -3449,10 +3445,7 @@ where
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> { ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Poll::Ready(event.map_in(|e: Arc<GossipsubHandlerIn>| { return Poll::Ready(event);
// clone send event reference if others references are present
Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone())
}));
} }
// update scores // update scores
@ -3499,7 +3492,7 @@ fn peer_added_to_mesh(
new_topics: Vec<&TopicHash>, new_topics: Vec<&TopicHash>,
mesh: &HashMap<TopicHash, BTreeSet<PeerId>>, mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
known_topics: Option<&BTreeSet<TopicHash>>, known_topics: Option<&BTreeSet<TopicHash>>,
events: &mut VecDeque<GossipsubNetworkBehaviourAction>, events: &mut VecDeque<NetworkBehaviourAction<GossipsubEvent, GossipsubHandler>>,
connections: &HashMap<PeerId, PeerConnections>, connections: &HashMap<PeerId, PeerConnections>,
) { ) {
// Ensure there is an active connection // Ensure there is an active connection
@ -3527,7 +3520,7 @@ fn peer_added_to_mesh(
// This is the first mesh the peer has joined, inform the handler // This is the first mesh the peer has joined, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler { events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
event: Arc::new(GossipsubHandlerIn::JoinedMesh), event: GossipsubHandlerIn::JoinedMesh,
handler: NotifyHandler::One(connection_id), handler: NotifyHandler::One(connection_id),
}); });
} }
@ -3540,7 +3533,7 @@ fn peer_removed_from_mesh(
old_topic: &TopicHash, old_topic: &TopicHash,
mesh: &HashMap<TopicHash, BTreeSet<PeerId>>, mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
known_topics: Option<&BTreeSet<TopicHash>>, known_topics: Option<&BTreeSet<TopicHash>>,
events: &mut VecDeque<GossipsubNetworkBehaviourAction>, events: &mut VecDeque<NetworkBehaviourAction<GossipsubEvent, GossipsubHandler>>,
connections: &HashMap<PeerId, PeerConnections>, connections: &HashMap<PeerId, PeerConnections>,
) { ) {
// Ensure there is an active connection // Ensure there is an active connection
@ -3566,7 +3559,7 @@ fn peer_removed_from_mesh(
// The peer is not in any other mesh, inform the handler // The peer is not in any other mesh, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler { events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
event: Arc::new(GossipsubHandlerIn::LeftMesh), event: GossipsubHandlerIn::LeftMesh,
handler: NotifyHandler::One(*connection_id), handler: NotifyHandler::One(*connection_id),
}); });
} }

View File

@ -375,17 +375,17 @@ fn test_subscribe() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_subscriptions, e| match e { .fold(vec![], |mut collected_subscriptions, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { NetworkBehaviourAction::NotifyHandler {
GossipsubHandlerIn::Message(ref message) => { event: GossipsubHandlerIn::Message(ref message),
for s in &message.subscriptions { ..
if let Some(true) = s.subscribe { } => {
collected_subscriptions.push(s.clone()) for s in &message.subscriptions {
}; if let Some(true) = s.subscribe {
} collected_subscriptions.push(s.clone())
collected_subscriptions };
} }
_ => collected_subscriptions, collected_subscriptions
}, }
_ => collected_subscriptions, _ => collected_subscriptions,
}); });
@ -443,17 +443,17 @@ fn test_unsubscribe() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_subscriptions, e| match e { .fold(vec![], |mut collected_subscriptions, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { NetworkBehaviourAction::NotifyHandler {
GossipsubHandlerIn::Message(ref message) => { event: GossipsubHandlerIn::Message(ref message),
for s in &message.subscriptions { ..
if let Some(true) = s.subscribe { } => {
collected_subscriptions.push(s.clone()) for s in &message.subscriptions {
}; if let Some(true) = s.subscribe {
} collected_subscriptions.push(s.clone())
collected_subscriptions };
} }
_ => collected_subscriptions, collected_subscriptions
}, }
_ => collected_subscriptions, _ => collected_subscriptions,
}); });
@ -630,16 +630,16 @@ fn test_publish_without_flood_publishing() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { NetworkBehaviourAction::NotifyHandler {
GossipsubHandlerIn::Message(ref message) => { event: GossipsubHandlerIn::Message(ref message),
let event = proto_to_message(message); ..
for s in &event.messages { } => {
collected_publish.push(s.clone()); let event = proto_to_message(message);
} for s in &event.messages {
collected_publish collected_publish.push(s.clone());
} }
_ => collected_publish, collected_publish
}, }
_ => collected_publish, _ => collected_publish,
}); });
@ -720,16 +720,16 @@ fn test_fanout() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { NetworkBehaviourAction::NotifyHandler {
GossipsubHandlerIn::Message(ref message) => { event: GossipsubHandlerIn::Message(ref message),
let event = proto_to_message(message); ..
for s in &event.messages { } => {
collected_publish.push(s.clone()); let event = proto_to_message(message);
} for s in &event.messages {
collected_publish collected_publish.push(s.clone());
} }
_ => collected_publish, collected_publish
}, }
_ => collected_publish, _ => collected_publish,
}); });
@ -773,26 +773,25 @@ fn test_inject_connected() {
.events .events
.iter() .iter()
.filter(|e| match e { .filter(|e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => { NetworkBehaviourAction::NotifyHandler {
if let GossipsubHandlerIn::Message(ref m) = **event { event: GossipsubHandlerIn::Message(ref m),
!m.subscriptions.is_empty() ..
} else { } => !m.subscriptions.is_empty(),
false
}
}
_ => false, _ => false,
}) })
.collect(); .collect();
// check that there are two subscriptions sent to each peer // check that there are two subscriptions sent to each peer
for sevent in send_events.clone() { for sevent in send_events.clone() {
if let NetworkBehaviourAction::NotifyHandler { event, .. } = sevent { if let NetworkBehaviourAction::NotifyHandler {
if let GossipsubHandlerIn::Message(ref m) = **event { event: GossipsubHandlerIn::Message(ref m),
assert!( ..
m.subscriptions.len() == 2, } = sevent
"There should be two subscriptions sent to each peer (1 for each topic)." {
); assert!(
} m.subscriptions.len() == 2,
"There should be two subscriptions sent to each peer (1 for each topic)."
);
}; };
} }
@ -1018,7 +1017,7 @@ fn test_handle_iwant_msg_cached() {
.iter() .iter()
.fold(vec![], |mut collected_messages, e| match e { .fold(vec![], |mut collected_messages, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => { NetworkBehaviourAction::NotifyHandler { event, .. } => {
if let GossipsubHandlerIn::Message(ref m) = **event { if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for c in &event.messages { for c in &event.messages {
collected_messages.push(c.clone()) collected_messages.push(c.clone())
@ -1075,17 +1074,16 @@ fn test_handle_iwant_msg_cached_shifted() {
// is the message is being sent? // is the message is being sent?
let message_exists = gs.events.iter().any(|e| match e { let message_exists = gs.events.iter().any(|e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => { NetworkBehaviourAction::NotifyHandler {
if let GossipsubHandlerIn::Message(ref m) = **event { event: GossipsubHandlerIn::Message(ref m),
let event = proto_to_message(m); ..
event } => {
.messages let event = proto_to_message(m);
.iter() event
.map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap()) .messages
.any(|msg| gs.config.message_id(&msg) == msg_id) .iter()
} else { .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap())
false .any(|msg| gs.config.message_id(&msg) == msg_id)
}
} }
_ => false, _ => false,
}); });
@ -1317,17 +1315,17 @@ fn count_control_msgs<D: DataTransform, F: TopicSubscriptionFilter>(
+ gs.events + gs.events
.iter() .iter()
.map(|e| match e { .map(|e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { NetworkBehaviourAction::NotifyHandler {
if let GossipsubHandlerIn::Message(ref m) = **event { peer_id,
let event = proto_to_message(m); event: GossipsubHandlerIn::Message(ref m),
event ..
.control_msgs } => {
.iter() let event = proto_to_message(m);
.filter(|m| filter(peer_id, m)) event
.count() .control_msgs
} else { .iter()
0 .filter(|m| filter(peer_id, m))
} .count()
} }
_ => 0, _ => 0,
}) })
@ -1540,19 +1538,19 @@ fn do_forward_messages_to_explicit_peers() {
gs.events gs.events
.iter() .iter()
.filter(|e| match e { .filter(|e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { NetworkBehaviourAction::NotifyHandler {
if let GossipsubHandlerIn::Message(ref m) = **event { peer_id,
let event = proto_to_message(m); event: GossipsubHandlerIn::Message(ref m),
peer_id == &peers[0] ..
&& event } => {
.messages let event = proto_to_message(m);
.iter() peer_id == &peers[0]
.filter(|m| m.data == message.data) && event
.count() .messages
> 0 .iter()
} else { .filter(|m| m.data == message.data)
false .count()
} > 0
} }
_ => false, _ => false,
}) })
@ -2107,7 +2105,7 @@ fn test_flood_publish() {
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => { NetworkBehaviourAction::NotifyHandler { event, .. } => {
if let GossipsubHandlerIn::Message(ref m) = **event { if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for s in &event.messages { for s in &event.messages {
collected_publish.push(s.clone()); collected_publish.push(s.clone());
@ -2668,7 +2666,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() {
.iter() .iter()
.fold(vec![], |mut collected_messages, e| match e { .fold(vec![], |mut collected_messages, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => {
if let GossipsubHandlerIn::Message(ref m) = **event { if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for c in &event.messages { for c in &event.messages {
collected_messages.push((*peer_id, c.clone())) collected_messages.push((*peer_id, c.clone()))
@ -2816,7 +2814,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() {
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => {
if let GossipsubHandlerIn::Message(ref m) = **event { if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for s in &event.messages { for s in &event.messages {
collected_publish.push((*peer_id, s.clone())); collected_publish.push((*peer_id, s.clone()));
@ -2873,7 +2871,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() {
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => {
if let GossipsubHandlerIn::Message(ref m) = **event { if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for s in &event.messages { for s in &event.messages {
collected_publish.push((*peer_id, s.clone())); collected_publish.push((*peer_id, s.clone()));
@ -4407,13 +4405,12 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() {
gs.events gs.events
.iter() .iter()
.map(|e| match e { .map(|e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => { NetworkBehaviourAction::NotifyHandler {
if let GossipsubHandlerIn::Message(ref m) = **event { event: GossipsubHandlerIn::Message(ref m),
let event = proto_to_message(m); ..
event.messages.len() } => {
} else { let event = proto_to_message(m);
0 event.messages.len()
}
} }
_ => 0, _ => 0,
}) })
@ -4816,7 +4813,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() {
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
if peer_id == &p1 || peer_id == &p2 { if peer_id == &p1 || peer_id == &p2 {
if let GossipsubHandlerIn::Message(ref m) = **event { if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for s in &event.messages { for s in &event.messages {
collected_publish.push(s.clone()); collected_publish.push(s.clone());
@ -4873,7 +4870,7 @@ fn test_do_not_use_floodsub_in_fanout() {
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
if peer_id == &p1 || peer_id == &p2 { if peer_id == &p1 || peer_id == &p2 {
if let GossipsubHandlerIn::Message(ref m) = **event { if let GossipsubHandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for s in &event.messages { for s in &event.messages {
collected_publish.push(s.clone()); collected_publish.push(s.clone());
@ -5187,7 +5184,7 @@ fn test_subscribe_and_graft_with_negative_score() {
let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => {
if peer_id == p1 { if peer_id == p1 {
if let GossipsubHandlerIn::Message(m) = Arc::try_unwrap(event).unwrap() { if let GossipsubHandlerIn::Message(m) = event {
Some(m) Some(m)
} else { } else {
None None

View File

@ -64,7 +64,7 @@ pub enum HandlerEvent {
} }
/// A message sent from the behaviour to the handler. /// A message sent from the behaviour to the handler.
#[derive(Debug, Clone)] #[derive(Debug)]
pub enum GossipsubHandlerIn { pub enum GossipsubHandlerIn {
/// A gossipsub message to send. /// A gossipsub message to send.
Message(crate::rpc_proto::Rpc), Message(crate::rpc_proto::Rpc),