diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5d1b69ec..08031373 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -25,7 +25,6 @@ use std::{ collections::{BTreeSet, HashMap}, fmt, net::IpAddr, - sync::Arc, task::{Context, Poll}, time::Duration, }; @@ -201,9 +200,6 @@ impl From for PublishConfig { } } -type GossipsubNetworkBehaviourAction = - NetworkBehaviourAction>; - /// Network behaviour that handles the gossipsub protocol. /// /// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If @@ -223,7 +219,7 @@ pub struct Gossipsub< config: GossipsubConfig, /// Events that need to be yielded to the outside when polling. - events: VecDeque, + events: VecDeque>, /// Pools non-urgent control messages between heartbeats. control_pool: HashMap>, @@ -2903,7 +2899,7 @@ where self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: Arc::new(GossipsubHandlerIn::Message(message)), + event: GossipsubHandlerIn::Message(message), handler: NotifyHandler::Any, }) } @@ -3163,7 +3159,7 @@ where self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: Arc::new(GossipsubHandlerIn::JoinedMesh), + event: GossipsubHandlerIn::JoinedMesh, handler: NotifyHandler::One(connections.connections[0]), }); break; @@ -3449,10 +3445,7 @@ where _: &mut impl PollParameters, ) -> Poll> { if let Some(event) = self.events.pop_front() { - return Poll::Ready(event.map_in(|e: Arc| { - // clone send event reference if others references are present - Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone()) - })); + return Poll::Ready(event); } // update scores @@ -3499,7 +3492,7 @@ fn peer_added_to_mesh( new_topics: Vec<&TopicHash>, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3527,7 +3520,7 @@ fn peer_added_to_mesh( // This is the first mesh the peer has joined, inform the handler events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: Arc::new(GossipsubHandlerIn::JoinedMesh), + event: GossipsubHandlerIn::JoinedMesh, handler: NotifyHandler::One(connection_id), }); } @@ -3540,7 +3533,7 @@ fn peer_removed_from_mesh( old_topic: &TopicHash, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3566,7 +3559,7 @@ fn peer_removed_from_mesh( // The peer is not in any other mesh, inform the handler events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: Arc::new(GossipsubHandlerIn::LeftMesh), + event: GossipsubHandlerIn::LeftMesh, handler: NotifyHandler::One(*connection_id), }); } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 42c28061..f5fd8a3d 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -375,17 +375,17 @@ fn test_subscribe() { .events .iter() .fold(vec![], |mut collected_subscriptions, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { - GossipsubHandlerIn::Message(ref message) => { - for s in &message.subscriptions { - if let Some(true) = s.subscribe { - collected_subscriptions.push(s.clone()) - }; - } - collected_subscriptions + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref message), + .. + } => { + for s in &message.subscriptions { + if let Some(true) = s.subscribe { + collected_subscriptions.push(s.clone()) + }; } - _ => collected_subscriptions, - }, + collected_subscriptions + } _ => collected_subscriptions, }); @@ -443,17 +443,17 @@ fn test_unsubscribe() { .events .iter() .fold(vec![], |mut collected_subscriptions, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { - GossipsubHandlerIn::Message(ref message) => { - for s in &message.subscriptions { - if let Some(true) = s.subscribe { - collected_subscriptions.push(s.clone()) - }; - } - collected_subscriptions + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref message), + .. + } => { + for s in &message.subscriptions { + if let Some(true) = s.subscribe { + collected_subscriptions.push(s.clone()) + }; } - _ => collected_subscriptions, - }, + collected_subscriptions + } _ => collected_subscriptions, }); @@ -630,16 +630,16 @@ fn test_publish_without_flood_publishing() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { - GossipsubHandlerIn::Message(ref message) => { - let event = proto_to_message(message); - for s in &event.messages { - collected_publish.push(s.clone()); - } - collected_publish + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref message), + .. + } => { + let event = proto_to_message(message); + for s in &event.messages { + collected_publish.push(s.clone()); } - _ => collected_publish, - }, + collected_publish + } _ => collected_publish, }); @@ -720,16 +720,16 @@ fn test_fanout() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { - GossipsubHandlerIn::Message(ref message) => { - let event = proto_to_message(message); - for s in &event.messages { - collected_publish.push(s.clone()); - } - collected_publish + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref message), + .. + } => { + let event = proto_to_message(message); + for s in &event.messages { + collected_publish.push(s.clone()); } - _ => collected_publish, - }, + collected_publish + } _ => collected_publish, }); @@ -773,26 +773,25 @@ fn test_inject_connected() { .events .iter() .filter(|e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - !m.subscriptions.is_empty() - } else { - false - } - } + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref m), + .. + } => !m.subscriptions.is_empty(), _ => false, }) .collect(); // check that there are two subscriptions sent to each peer for sevent in send_events.clone() { - if let NetworkBehaviourAction::NotifyHandler { event, .. } = sevent { - if let GossipsubHandlerIn::Message(ref m) = **event { - assert!( - m.subscriptions.len() == 2, - "There should be two subscriptions sent to each peer (1 for each topic)." - ); - } + if let NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref m), + .. + } = sevent + { + assert!( + m.subscriptions.len() == 2, + "There should be two subscriptions sent to each peer (1 for each topic)." + ); }; } @@ -1018,7 +1017,7 @@ fn test_handle_iwant_msg_cached() { .iter() .fold(vec![], |mut collected_messages, e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for c in &event.messages { collected_messages.push(c.clone()) @@ -1075,17 +1074,16 @@ fn test_handle_iwant_msg_cached_shifted() { // is the message is being sent? let message_exists = gs.events.iter().any(|e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - let event = proto_to_message(m); - event - .messages - .iter() - .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap()) - .any(|msg| gs.config.message_id(&msg) == msg_id) - } else { - false - } + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref m), + .. + } => { + let event = proto_to_message(m); + event + .messages + .iter() + .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap()) + .any(|msg| gs.config.message_id(&msg) == msg_id) } _ => false, }); @@ -1317,17 +1315,17 @@ fn count_control_msgs( + gs.events .iter() .map(|e| match e { - NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - let event = proto_to_message(m); - event - .control_msgs - .iter() - .filter(|m| filter(peer_id, m)) - .count() - } else { - 0 - } + NetworkBehaviourAction::NotifyHandler { + peer_id, + event: GossipsubHandlerIn::Message(ref m), + .. + } => { + let event = proto_to_message(m); + event + .control_msgs + .iter() + .filter(|m| filter(peer_id, m)) + .count() } _ => 0, }) @@ -1540,19 +1538,19 @@ fn do_forward_messages_to_explicit_peers() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - let event = proto_to_message(m); - peer_id == &peers[0] - && event - .messages - .iter() - .filter(|m| m.data == message.data) - .count() - > 0 - } else { - false - } + NetworkBehaviourAction::NotifyHandler { + peer_id, + event: GossipsubHandlerIn::Message(ref m), + .. + } => { + let event = proto_to_message(m); + peer_id == &peers[0] + && event + .messages + .iter() + .filter(|m| m.data == message.data) + .count() + > 0 } _ => false, }) @@ -2107,7 +2105,7 @@ fn test_flood_publish() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -2668,7 +2666,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { .iter() .fold(vec![], |mut collected_messages, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for c in &event.messages { collected_messages.push((*peer_id, c.clone())) @@ -2816,7 +2814,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push((*peer_id, s.clone())); @@ -2873,7 +2871,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push((*peer_id, s.clone())); @@ -4407,13 +4405,12 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.events .iter() .map(|e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - let event = proto_to_message(m); - event.messages.len() - } else { - 0 - } + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref m), + .. + } => { + let event = proto_to_message(m); + event.messages.len() } _ => 0, }) @@ -4816,7 +4813,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -4873,7 +4870,7 @@ fn test_do_not_use_floodsub_in_fanout() { .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -5187,7 +5184,7 @@ fn test_subscribe_and_graft_with_negative_score() { let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == p1 { - if let GossipsubHandlerIn::Message(m) = Arc::try_unwrap(event).unwrap() { + if let GossipsubHandlerIn::Message(m) = event { Some(m) } else { None diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 68bcf912..8fd563c3 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -64,7 +64,7 @@ pub enum HandlerEvent { } /// A message sent from the behaviour to the handler. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum GossipsubHandlerIn { /// A gossipsub message to send. Message(crate::rpc_proto::Rpc),