diff --git a/docs/coding-guidelines.md b/docs/coding-guidelines.md index 935e26f6..aef8dd69 100644 --- a/docs/coding-guidelines.md +++ b/docs/coding-guidelines.md @@ -310,7 +310,7 @@ request without having to guess. When accepting a **command** that eventually results in a response through an event require that command to contain a unique ID, which is later on contained in the asynchronous response event. One -such example is the `Swarm` accepting a `NetworkBehaviourAction::Dial` from the `NetworkBehaviour`. +such example is the `Swarm` accepting a `ToSwarm::Dial` from the `NetworkBehaviour`. ``` rust struct Command { diff --git a/misc/allow-block-list/src/lib.rs b/misc/allow-block-list/src/lib.rs index 16c47898..d43e2e89 100644 --- a/misc/allow-block-list/src/lib.rs +++ b/misc/allow-block-list/src/lib.rs @@ -65,7 +65,7 @@ use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::{ dummy, CloseConnection, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{HashSet, VecDeque}; use std::fmt; @@ -261,9 +261,9 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(peer) = self.close_connections.pop_front() { - return Poll::Ready(NetworkBehaviourAction::CloseConnection { + return Poll::Ready(ToSwarm::CloseConnection { peer_id: peer, connection: CloseConnection::All, }); diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs index 1f6927b3..6161fca6 100644 --- a/misc/connection-limits/src/lib.rs +++ b/misc/connection-limits/src/lib.rs @@ -22,7 +22,7 @@ use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::{ dummy, ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{HashMap, HashSet}; use std::fmt; @@ -355,7 +355,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { Poll::Pending } } diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 47f15b8d..85b3fa2e 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -39,7 +39,7 @@ use libp2p_swarm::{ ExpiredListenAddr, FromSwarm, }, ConnectionDenied, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::{ collections::{HashMap, VecDeque}, @@ -208,9 +208,7 @@ pub struct Behaviour { last_probe: Option, - pending_actions: VecDeque< - NetworkBehaviourAction<::OutEvent, THandlerInEvent>, - >, + pending_actions: VecDeque::OutEvent, THandlerInEvent>>, probe_id: ProbeId, @@ -336,9 +334,7 @@ impl Behaviour { } => { if let Some(event) = self.as_server().on_outbound_connection(&peer, address) { self.pending_actions - .push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( - event, - ))); + .push_back(ToSwarm::GenerateEvent(Event::InboundProbe(event))); } } ConnectedPoint::Dialer { @@ -399,9 +395,7 @@ impl Behaviour { })); if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) { self.pending_actions - .push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( - event, - ))); + .push_back(ToSwarm::GenerateEvent(Event::InboundProbe(event))); } } @@ -441,7 +435,7 @@ impl NetworkBehaviour for Behaviour { } match self.inner.poll(cx, params) { - Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + Poll::Ready(ToSwarm::GenerateEvent(event)) => { let actions = match event { request_response::Event::Message { message: request_response::Message::Response { .. }, @@ -474,9 +468,7 @@ impl NetworkBehaviour for Behaviour { match self.as_client().poll_auto_probe(cx) { Poll::Ready(event) => { self.pending_actions - .push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( - event, - ))); + .push_back(ToSwarm::GenerateEvent(Event::OutboundProbe(event))); continue; } Poll::Pending => {} @@ -601,8 +593,7 @@ impl NetworkBehaviour for Behaviour { } } -type Action = - NetworkBehaviourAction<::OutEvent, THandlerInEvent>; +type Action = ToSwarm<::OutEvent, THandlerInEvent>; // Trait implemented for `AsClient` and `AsServer` to handle events from the inner [`request_response::Behaviour`] Protocol. trait HandleInnerEvent { diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index 2ab66f8b..d544e839 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -31,8 +31,7 @@ use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; use libp2p_swarm::{ - AddressScore, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviourAction, - PollParameters, + AddressScore, ConnectionId, ExternalAddresses, ListenAddresses, PollParameters, ToSwarm, }; use rand::{seq::SliceRandom, thread_rng}; use std::{ @@ -143,17 +142,13 @@ impl<'a> HandleInnerEvent for AsClient<'a> { let mut actions = VecDeque::with_capacity(3); - actions.push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( - event, - ))); + actions.push_back(ToSwarm::GenerateEvent(Event::OutboundProbe(event))); if let Some(old) = self.handle_reported_status(response.result.clone().into()) { - actions.push_back(NetworkBehaviourAction::GenerateEvent( - Event::StatusChanged { - old, - new: self.nat_status.clone(), - }, - )); + actions.push_back(ToSwarm::GenerateEvent(Event::StatusChanged { + old, + new: self.nat_status.clone(), + })); } if let Ok(address) = response.result { @@ -165,7 +160,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { .find_map(|r| (r.addr == address).then_some(r.score)) .unwrap_or(AddressScore::Finite(0)); if let AddressScore::Finite(finite_score) = score { - actions.push_back(NetworkBehaviourAction::ReportObservedAddr { + actions.push_back(ToSwarm::ReportObservedAddr { address, score: AddressScore::Finite(finite_score + 1), }); @@ -191,7 +186,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { self.schedule_probe.reset(Duration::ZERO); - VecDeque::from([NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( + VecDeque::from([ToSwarm::GenerateEvent(Event::OutboundProbe( OutboundProbeEvent::Error { probe_id, peer: Some(peer), diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 69f0474e..822b0552 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -30,7 +30,7 @@ use libp2p_request_response::{ }; use libp2p_swarm::{ dial_opts::{DialOpts, PeerCondition}, - ConnectionId, DialError, NetworkBehaviourAction, PollParameters, + ConnectionId, DialError, PollParameters, ToSwarm, }; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -124,14 +124,14 @@ impl<'a> HandleInnerEvent for AsServer<'a> { self.throttled_clients.push((peer, Instant::now())); VecDeque::from([ - NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( + ToSwarm::GenerateEvent(Event::InboundProbe( InboundProbeEvent::Request { probe_id, peer, addresses: addrs.clone(), }, )), - NetworkBehaviourAction::Dial { + ToSwarm::Dial { opts: DialOpts::peer_id(peer) .condition(PeerCondition::Always) .override_dial_concurrency_factor( @@ -155,13 +155,13 @@ impl<'a> HandleInnerEvent for AsServer<'a> { }; let _ = self.inner.send_response(channel, response); - VecDeque::from([NetworkBehaviourAction::GenerateEvent( - Event::InboundProbe(InboundProbeEvent::Error { + VecDeque::from([ToSwarm::GenerateEvent(Event::InboundProbe( + InboundProbeEvent::Error { probe_id, peer, error: InboundProbeError::Response(error), - }), - )]) + }, + ))]) } } } @@ -183,7 +183,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> { _ => self.probe_id.next(), }; - VecDeque::from([NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( + VecDeque::from([ToSwarm::GenerateEvent(Event::InboundProbe( InboundProbeEvent::Error { probe_id, peer, diff --git a/protocols/dcutr/src/behaviour_impl.rs b/protocols/dcutr/src/behaviour_impl.rs index 3b2f287c..c6701ad8 100644 --- a/protocols/dcutr/src/behaviour_impl.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -30,8 +30,8 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailu use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent}; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, THandlerInEvent, + ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, + THandlerInEvent, ToSwarm, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; @@ -70,9 +70,7 @@ pub enum Error { pub struct Behaviour { /// Queue of actions to return when polled. - queued_events: VecDeque< - NetworkBehaviourAction>>, - >, + queued_events: VecDeque>>>, /// All direct (non-relayed) connections. direct_connections: HashMap>, @@ -130,7 +128,7 @@ impl Behaviour { // // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol self.queued_events.extend([ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), event: Either::Left(handler::relayed::Command::Connect { @@ -138,15 +136,13 @@ impl Behaviour { attempt: 1, }), }, - NetworkBehaviourAction::GenerateEvent( - Event::InitiatedDirectConnectionUpgrade { - remote_peer_id: peer_id, - local_relayed_addr: match connected_point { - ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(), - ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), - }, + ToSwarm::GenerateEvent(Event::InitiatedDirectConnectionUpgrade { + remote_peer_id: peer_id, + local_relayed_addr: match connected_point { + ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(), + ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), }, - ), + }), ]); } } else { @@ -190,23 +186,22 @@ impl Behaviour { }; if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - handler: NotifyHandler::One(relayed_connection_id), - peer_id, - event: Either::Left(handler::relayed::Command::Connect { - attempt: attempt + 1, - obs_addrs: self.observed_addreses(), - }), - }) + self.queued_events.push_back(ToSwarm::NotifyHandler { + handler: NotifyHandler::One(relayed_connection_id), + peer_id, + event: Either::Left(handler::relayed::Command::Connect { + attempt: attempt + 1, + obs_addrs: self.observed_addreses(), + }), + }) } else { self.queued_events.extend([ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(relayed_connection_id), event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive), }, - NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeFailed { remote_peer_id: peer_id, error: Error::Dial, }), @@ -341,7 +336,7 @@ impl NetworkBehaviour for Behaviour { remote_addr, }) => { self.queued_events.extend([ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { handler: NotifyHandler::One(relayed_connection_id), peer_id: event_source, event: Either::Left(handler::relayed::Command::AcceptInboundConnect { @@ -349,22 +344,19 @@ impl NetworkBehaviour for Behaviour { obs_addrs: self.observed_addreses(), }), }, - NetworkBehaviourAction::GenerateEvent( - Event::RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id: event_source, - remote_relayed_addr: remote_addr, - }, - ), + ToSwarm::GenerateEvent(Event::RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id: event_source, + remote_relayed_addr: remote_addr, + }), ]); } Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: event_source, - error: Error::Handler(error), - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + Event::DirectConnectionUpgradeFailed { + remote_peer_id: event_source, + error: Error::Handler(error), + }, + )); } Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { let opts = DialOpts::peer_id(event_source) @@ -376,17 +368,15 @@ impl NetworkBehaviour for Behaviour { self.direct_to_relayed_connections .insert(maybe_direct_connection_id, relayed_connection_id); - self.queued_events - .push_back(NetworkBehaviourAction::Dial { opts }); + self.queued_events.push_back(ToSwarm::Dial { opts }); } Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: event_source, - error: Error::Handler(error), - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + Event::DirectConnectionUpgradeFailed { + remote_peer_id: event_source, + error: Error::Handler(error), + }, + )); } Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => { let opts = DialOpts::peer_id(event_source) @@ -403,23 +393,20 @@ impl NetworkBehaviour for Behaviour { .outgoing_direct_connection_attempts .entry((relayed_connection_id, event_source)) .or_default() += 1; - self.queued_events - .push_back(NetworkBehaviourAction::Dial { opts }); + self.queued_events.push_back(ToSwarm::Dial { opts }); } Either::Right(Either::Left(handler::direct::Event::DirectConnectionEstablished)) => { self.queued_events.extend([ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id: event_source, handler: NotifyHandler::One(relayed_connection_id), event: Either::Left( handler::relayed::Command::UpgradeFinishedDontKeepAlive, ), }, - NetworkBehaviourAction::GenerateEvent( - Event::DirectConnectionUpgradeSucceeded { - remote_peer_id: event_source, - }, - ), + ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeSucceeded { + remote_peer_id: event_source, + }), ]); } Either::Right(Either::Right(never)) => void::unreachable(never), @@ -430,7 +417,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index ff29f27b..a3673a13 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -30,8 +30,8 @@ use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::{ - dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, + OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use log::warn; use smallvec::SmallVec; @@ -42,7 +42,7 @@ use std::{collections::VecDeque, iter}; /// Network behaviour that handles the floodsub protocol. pub struct Floodsub { /// Events that need to be yielded to the outside when polling. - events: VecDeque>, + events: VecDeque>, config: FloodsubConfig, @@ -87,23 +87,22 @@ impl Floodsub { // Send our topics to this node if we're already connected to it. if self.connected_peers.contains_key(&peer_id) { for topic in self.subscribed_topics.iter().cloned() { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic, - action: FloodsubSubscriptionAction::Subscribe, - }], - }, - }); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic, + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); } } if self.target_peers.insert(peer_id) { - self.events.push_back(NetworkBehaviourAction::Dial { + self.events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(peer_id).build(), }); } @@ -124,18 +123,17 @@ impl Floodsub { } for peer in self.connected_peers.keys() { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer, - handler: NotifyHandler::Any, - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic: topic.clone(), - action: FloodsubSubscriptionAction::Subscribe, - }], - }, - }); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.clone(), + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); } self.subscribed_topics.push(topic); @@ -156,18 +154,17 @@ impl Floodsub { self.subscribed_topics.remove(pos); for peer in self.connected_peers.keys() { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer, - handler: NotifyHandler::Any, - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic: topic.clone(), - action: FloodsubSubscriptionAction::Unsubscribe, - }], - }, - }); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.clone(), + action: FloodsubSubscriptionAction::Unsubscribe, + }], + }, + }); } true @@ -233,9 +230,10 @@ impl Floodsub { ); } if self.config.subscribe_local_messages { - self.events.push_back(NetworkBehaviourAction::GenerateEvent( - FloodsubEvent::Message(message.clone()), - )); + self.events + .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Message( + message.clone(), + ))); } } // Don't publish the message if we have to check subscriptions @@ -259,15 +257,14 @@ impl Floodsub { continue; } - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::Any, - event: FloodsubRpc { - subscriptions: Vec::new(), - messages: vec![message.clone()], - }, - }); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::Any, + event: FloodsubRpc { + subscriptions: Vec::new(), + messages: vec![message.clone()], + }, + }); } } @@ -287,18 +284,17 @@ impl Floodsub { // We need to send our subscriptions to the newly-connected node. if self.target_peers.contains(&peer_id) { for topic in self.subscribed_topics.iter().cloned() { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic, - action: FloodsubSubscriptionAction::Subscribe, - }], - }, - }); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic, + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); } } @@ -324,7 +320,7 @@ impl Floodsub { // We can be disconnected by the remote in case of inactivity for example, so we always // try to reconnect. if self.target_peers.contains(&peer_id) { - self.events.push_back(NetworkBehaviourAction::Dial { + self.events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(peer_id).build(), }); } @@ -377,12 +373,11 @@ impl NetworkBehaviour for Floodsub { if !remote_peer_topics.contains(&subscription.topic) { remote_peer_topics.push(subscription.topic.clone()); } - self.events.push_back(NetworkBehaviourAction::GenerateEvent( - FloodsubEvent::Subscribed { + self.events + .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Subscribed { peer_id: propagation_source, topic: subscription.topic, - }, - )); + })); } FloodsubSubscriptionAction::Unsubscribe => { if let Some(pos) = remote_peer_topics @@ -391,12 +386,11 @@ impl NetworkBehaviour for Floodsub { { remote_peer_topics.remove(pos); } - self.events.push_back(NetworkBehaviourAction::GenerateEvent( - FloodsubEvent::Unsubscribed { + self.events + .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Unsubscribed { peer_id: propagation_source, topic: subscription.topic, - }, - )); + })); } } } @@ -427,8 +421,7 @@ impl NetworkBehaviour for Floodsub { .any(|t| message.topics.iter().any(|u| t == u)) { let event = FloodsubEvent::Message(message.clone()); - self.events - .push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(ToSwarm::GenerateEvent(event)); } // Propagate the message to everyone else who is subscribed to any of the topics. @@ -465,12 +458,11 @@ impl NetworkBehaviour for Floodsub { } for (peer_id, rpc) in rpcs_to_dispatch { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: rpc, - }); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: rpc, + }); } } @@ -478,7 +470,7 @@ impl NetworkBehaviour for Floodsub { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 13ef81b3..4b358afd 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -40,8 +40,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm}, dial_opts::DialOpts, - ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }; use wasm_timer::Instant; @@ -218,7 +218,7 @@ pub struct Behaviour { config: Config, /// Events that need to be yielded to the outside when polling. - events: VecDeque>, + events: VecDeque>, /// Pools non-urgent control messages between heartbeats. control_pool: HashMap>, @@ -1133,7 +1133,7 @@ where if !self.peer_topics.contains_key(peer_id) { // Connect to peer debug!("Connecting to explicit peer {:?}", peer_id); - self.events.push_back(NetworkBehaviourAction::Dial { + self.events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(*peer_id).build(), }); } @@ -1632,7 +1632,7 @@ where self.px_peers.insert(peer_id); // dial peer - self.events.push_back(NetworkBehaviourAction::Dial { + self.events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(peer_id).build(), }); } @@ -1818,7 +1818,7 @@ where if self.mesh.contains_key(&message.topic) { debug!("Sending received message to user"); self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { + .push_back(ToSwarm::GenerateEvent(Event::Message { propagation_source: *propagation_source, message_id: msg_id.clone(), message, @@ -1994,12 +1994,10 @@ where } } // generates a subscription event to be polled - application_event.push(NetworkBehaviourAction::GenerateEvent( - Event::Subscribed { - peer_id: *propagation_source, - topic: topic_hash.clone(), - }, - )); + application_event.push(ToSwarm::GenerateEvent(Event::Subscribed { + peer_id: *propagation_source, + topic: topic_hash.clone(), + })); } SubscriptionAction::Unsubscribe => { if peer_list.remove(propagation_source) { @@ -2014,12 +2012,10 @@ where subscribed_topics.remove(topic_hash); unsubscribed_peers.push((*propagation_source, topic_hash.clone())); // generate an unsubscribe event to be polled - application_event.push(NetworkBehaviourAction::GenerateEvent( - Event::Unsubscribed { - peer_id: *propagation_source, - topic: topic_hash.clone(), - }, - )); + application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed { + peer_id: *propagation_source, + topic: topic_hash.clone(), + })); } } @@ -2890,12 +2886,11 @@ where let messages = self.fragment_message(message)?; for message in messages { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - event: HandlerIn::Message(message), - handler: NotifyHandler::Any, - }) + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + event: HandlerIn::Message(message), + handler: NotifyHandler::Any, + }) } Ok(()) } @@ -3150,12 +3145,11 @@ where for topic in topics { if let Some(mesh_peers) = self.mesh.get(topic) { if mesh_peers.contains(&peer_id) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - event: HandlerIn::JoinedMesh, - handler: NotifyHandler::One(connections.connections[0]), - }); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id, + event: HandlerIn::JoinedMesh, + handler: NotifyHandler::One(connections.connections[0]), + }); break; } } @@ -3338,11 +3332,10 @@ where "Peer does not support gossipsub protocols. {}", propagation_source ); - self.events.push_back(NetworkBehaviourAction::GenerateEvent( - Event::GossipsubNotSupported { + self.events + .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported { peer_id: propagation_source, - }, - )); + })); } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) { // Only change the value if the old value is Floodsub (the default set in // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished). @@ -3450,7 +3443,7 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } @@ -3499,7 +3492,7 @@ fn peer_added_to_mesh( new_topics: Vec<&TopicHash>, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque>, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3525,7 +3518,7 @@ fn peer_added_to_mesh( } } // This is the first mesh the peer has joined, inform the handler - events.push_back(NetworkBehaviourAction::NotifyHandler { + events.push_back(ToSwarm::NotifyHandler { peer_id, event: HandlerIn::JoinedMesh, handler: NotifyHandler::One(connection_id), @@ -3540,7 +3533,7 @@ fn peer_removed_from_mesh( old_topic: &TopicHash, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque>, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3564,7 +3557,7 @@ fn peer_removed_from_mesh( } } // The peer is not in any other mesh, inform the handler - events.push_back(NetworkBehaviourAction::NotifyHandler { + events.push_back(ToSwarm::NotifyHandler { peer_id, event: HandlerIn::LeftMesh, handler: NotifyHandler::One(*connection_id), diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index b3734525..bafceafe 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -412,7 +412,7 @@ fn test_subscribe() { .events .iter() .fold(vec![], |mut collected_subscriptions, e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { event: HandlerIn::Message(ref message), .. } => { @@ -480,7 +480,7 @@ fn test_unsubscribe() { .events .iter() .fold(vec![], |mut collected_subscriptions, e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { event: HandlerIn::Message(ref message), .. } => { @@ -668,7 +668,7 @@ fn test_publish_without_flood_publishing() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { event: HandlerIn::Message(ref message), .. } => { @@ -758,7 +758,7 @@ fn test_fanout() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { event: HandlerIn::Message(ref message), .. } => { @@ -811,7 +811,7 @@ fn test_inject_connected() { .events .iter() .filter(|e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { event: HandlerIn::Message(ref m), .. } => !m.subscriptions.is_empty(), @@ -821,7 +821,7 @@ fn test_inject_connected() { // check that there are two subscriptions sent to each peer for sevent in send_events.clone() { - if let NetworkBehaviourAction::NotifyHandler { + if let ToSwarm::NotifyHandler { event: HandlerIn::Message(ref m), .. } = sevent @@ -1054,7 +1054,7 @@ fn test_handle_iwant_msg_cached() { .events .iter() .fold(vec![], |mut collected_messages, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { + ToSwarm::NotifyHandler { event, .. } => { if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for c in &event.messages { @@ -1112,7 +1112,7 @@ fn test_handle_iwant_msg_cached_shifted() { // is the message is being sent? let message_exists = gs.events.iter().any(|e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { event: HandlerIn::Message(ref m), .. } => { @@ -1353,7 +1353,7 @@ fn count_control_msgs( + gs.events .iter() .map(|e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, event: HandlerIn::Message(ref m), .. @@ -1394,7 +1394,7 @@ fn test_explicit_peer_gets_connected() { .events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(peer), + ToSwarm::Dial { opts } => opts.get_peer_id() == Some(peer), _ => false, }) .count(); @@ -1435,7 +1435,7 @@ fn test_explicit_peer_reconnects() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(*peer), + ToSwarm::Dial { opts } => opts.get_peer_id() == Some(*peer), _ => false, }) .count(), @@ -1450,7 +1450,7 @@ fn test_explicit_peer_reconnects() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(*peer), + ToSwarm::Dial { opts } => opts.get_peer_id() == Some(*peer), _ => false, }) .count() @@ -1574,7 +1574,7 @@ fn do_forward_messages_to_explicit_peers() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, event: HandlerIn::Message(ref m), .. @@ -1830,7 +1830,7 @@ fn test_connect_to_px_peers_on_handle_prune() { .events .iter() .filter_map(|e| match e { - NetworkBehaviourAction::Dial { opts } => opts.get_peer_id(), + ToSwarm::Dial { opts } => opts.get_peer_id(), _ => None, }) .collect(); @@ -2122,7 +2122,7 @@ fn test_flood_publish() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { + ToSwarm::NotifyHandler { event, .. } => { if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { @@ -2488,7 +2488,7 @@ fn test_ignore_px_from_negative_scored_peer() { assert_eq!( gs.events .iter() - .filter(|e| matches!(e, NetworkBehaviourAction::Dial { .. })) + .filter(|e| matches!(e, ToSwarm::Dial { .. })) .count(), 0 ); @@ -2683,7 +2683,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { .events .iter() .fold(vec![], |mut collected_messages, e| match e { - NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { + ToSwarm::NotifyHandler { event, peer_id, .. } => { if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for c in &event.messages { @@ -2831,7 +2831,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { + ToSwarm::NotifyHandler { event, peer_id, .. } => { if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { @@ -2888,7 +2888,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { + ToSwarm::NotifyHandler { event, peer_id, .. } => { if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { @@ -3012,7 +3012,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { assert_eq!(gs.events.len(), 1); assert!(matches!( gs.events[0], - NetworkBehaviourAction::GenerateEvent(Event::Subscribed { .. }) + ToSwarm::GenerateEvent(Event::Subscribed { .. }) )); let control_action = ControlAction::IHave { @@ -3078,7 +3078,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() { assert_eq!( gs.events .iter() - .filter(|e| matches!(e, NetworkBehaviourAction::Dial { .. })) + .filter(|e| matches!(e, ToSwarm::Dial { .. })) .count(), 0 ); @@ -3100,7 +3100,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() { assert!( gs.events .iter() - .filter(|e| matches!(e, NetworkBehaviourAction::Dial { .. })) + .filter(|e| matches!(e, ToSwarm::Dial { .. })) .count() > 0 ); @@ -4413,7 +4413,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.events .iter() .map(|e| match e { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { event: HandlerIn::Message(ref m), .. } => { @@ -4819,7 +4819,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { + ToSwarm::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); @@ -4876,7 +4876,7 @@ fn test_do_not_use_floodsub_in_fanout() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { + ToSwarm::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { if let HandlerIn::Message(ref m) = event { let event = proto_to_message(m); @@ -5190,7 +5190,7 @@ fn test_subscribe_and_graft_with_negative_score() { let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, gs2: &mut Behaviour<_, _>| { //collect messages to p1 let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { - NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { + ToSwarm::NotifyHandler { peer_id, event, .. } => { if peer_id == p1 { if let HandlerIn::Message(m) = event { Some(m) diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index 88c75001..8ba50e2d 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -26,8 +26,8 @@ use libp2p_identity::PublicKey; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::{ dial_opts::DialOpts, AddressScore, ConnectionDenied, ConnectionHandlerUpgrErr, DialError, - ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, THandlerInEvent, + ExternalAddresses, ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, + THandlerInEvent, ToSwarm, }; use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent}; use lru::LruCache; @@ -44,7 +44,7 @@ use std::{ /// about them, and answers identify queries from other nodes. /// /// All external addresses of the local node supposedly observed by remotes -/// are reported via [`NetworkBehaviourAction::ReportObservedAddr`] with a +/// are reported via [`ToSwarm::ReportObservedAddr`] with a /// [score](AddressScore) of `1`. pub struct Behaviour { config: Config, @@ -55,7 +55,7 @@ pub struct Behaviour { /// with current information about the local peer. requests: Vec, /// Pending events to be emitted when polled. - events: VecDeque>, + events: VecDeque>, /// The addresses of all peers that we have discovered. discovered_peers: PeerCache, @@ -200,7 +200,7 @@ impl Behaviour { if !self.requests.contains(&request) { self.requests.push(request); - self.events.push_back(NetworkBehaviourAction::Dial { + self.events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(p).build(), }); } @@ -293,27 +293,19 @@ impl NetworkBehaviour for Behaviour { let observed = info.observed_addr.clone(); self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Received { - peer_id, - info, - })); - self.events - .push_back(NetworkBehaviourAction::ReportObservedAddr { - address: observed, - score: AddressScore::Finite(1), - }); + .push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info })); + self.events.push_back(ToSwarm::ReportObservedAddr { + address: observed, + score: AddressScore::Finite(1), + }); } handler::Event::Identification(peer) => { self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent { - peer_id: peer, - })); + .push_back(ToSwarm::GenerateEvent(Event::Sent { peer_id: peer })); } handler::Event::IdentificationPushed => { self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed { - peer_id, - })); + .push_back(ToSwarm::GenerateEvent(Event::Pushed { peer_id })); } handler::Event::Identify => { self.requests.push(Request { @@ -323,10 +315,7 @@ impl NetworkBehaviour for Behaviour { } handler::Event::IdentificationError(error) => { self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Error { - peer_id, - error, - })); + .push_back(ToSwarm::GenerateEvent(Event::Error { peer_id, error })); } } } @@ -335,7 +324,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } @@ -345,7 +334,7 @@ impl NetworkBehaviour for Behaviour { Some(Request { peer_id, protocol: Protocol::Push, - }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { + }) => Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::Any, event: InEvent { @@ -362,7 +351,7 @@ impl NetworkBehaviour for Behaviour { Some(Request { peer_id, protocol: Protocol::Identify(connection_id), - }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { + }) => Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), event: InEvent { diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index cd17cc1b..8d1ee716 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -47,8 +47,8 @@ use libp2p_swarm::behaviour::{ use libp2p_swarm::{ dial_opts::{self, DialOpts}, ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses, - NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, - THandlerInEvent, THandlerOutEvent, + NetworkBehaviour, NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + ToSwarm, }; use log::{debug, info, warn}; use smallvec::SmallVec; @@ -103,7 +103,7 @@ pub struct Kademlia { connection_idle_timeout: Duration, /// Queued events to return when the behaviour is being polled. - queued_events: VecDeque>>, + queued_events: VecDeque>>, listen_addresses: ListenAddresses, @@ -522,20 +522,19 @@ where match self.kbuckets.entry(&key) { kbucket::Entry::Present(mut entry, _) => { if entry.value().insert(address) { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutingUpdated { - peer: *peer, - is_new_peer: false, - addresses: entry.value().clone(), - old_peer: None, - bucket_range: self - .kbuckets - .bucket(&key) - .map(|b| b.range()) - .expect("Not kbucket::Entry::SelfEntry."), - }, - )) + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutingUpdated { + peer: *peer, + is_new_peer: false, + addresses: entry.value().clone(), + old_peer: None, + bucket_range: self + .kbuckets + .bucket(&key) + .map(|b| b.range()) + .expect("Not kbucket::Entry::SelfEntry."), + }, + )) } RoutingUpdate::Success } @@ -552,20 +551,19 @@ where }; match entry.insert(addresses.clone(), status) { kbucket::InsertResult::Inserted => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutingUpdated { - peer: *peer, - is_new_peer: true, - addresses, - old_peer: None, - bucket_range: self - .kbuckets - .bucket(&key) - .map(|b| b.range()) - .expect("Not kbucket::Entry::SelfEntry."), - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutingUpdated { + peer: *peer, + is_new_peer: true, + addresses, + old_peer: None, + bucket_range: self + .kbuckets + .bucket(&key) + .map(|b| b.range()) + .expect("Not kbucket::Entry::SelfEntry."), + }, + )); RoutingUpdate::Success } kbucket::InsertResult::Full => { @@ -573,7 +571,7 @@ where RoutingUpdate::Failed } kbucket::InsertResult::Pending { disconnected } => { - self.queued_events.push_back(NetworkBehaviourAction::Dial { + self.queued_events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(disconnected.into_preimage()).build(), }); RoutingUpdate::Pending @@ -727,15 +725,14 @@ where let stats = QueryStats::empty(); if let Some(record) = record { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { - id, - result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))), - step, - stats, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))), + step, + stats, + }, + )); } id @@ -975,18 +972,17 @@ where let stats = QueryStats::empty(); if !providers.is_empty() { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { - id, - result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { - key, - providers, - })), - step, - stats, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { + key, + providers, + })), + step, + stats, + }, + )); } id } @@ -1134,20 +1130,19 @@ where } if let Some(address) = address { if entry.value().insert(address) { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutingUpdated { - peer, - is_new_peer: false, - addresses: entry.value().clone(), - old_peer: None, - bucket_range: self - .kbuckets - .bucket(&key) - .map(|b| b.range()) - .expect("Not kbucket::Entry::SelfEntry."), - }, - )) + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutingUpdated { + peer, + is_new_peer: false, + addresses: entry.value().clone(), + old_peer: None, + bucket_range: self + .kbuckets + .bucket(&key) + .map(|b| b.range()) + .expect("Not kbucket::Entry::SelfEntry."), + }, + )) } } } @@ -1168,16 +1163,14 @@ where } match (address, self.kbucket_inserts) { (None, _) => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::UnroutablePeer { peer }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::UnroutablePeer { peer }, + )); } (Some(a), KademliaBucketInserts::Manual) => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutablePeer { peer, address: a }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutablePeer { peer, address: a }, + )); } (Some(a), KademliaBucketInserts::OnConnected) => { let addresses = Addresses::new(a); @@ -1194,25 +1187,20 @@ where .map(|b| b.range()) .expect("Not kbucket::Entry::SelfEntry."), }; - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.queued_events.push_back(ToSwarm::GenerateEvent(event)); } kbucket::InsertResult::Full => { debug!("Bucket full. Peer not added to routing table: {}", peer); let address = addresses.first().clone(); - self.queued_events.push_back( - NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutablePeer { peer, address }, - ), - ); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutablePeer { peer, address }, + )); } kbucket::InsertResult::Pending { disconnected } => { let address = addresses.first().clone(); - self.queued_events.push_back( - NetworkBehaviourAction::GenerateEvent( - KademliaEvent::PendingRoutablePeer { peer, address }, - ), - ); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::PendingRoutablePeer { peer, address }, + )); // `disconnected` might already be in the process of re-connecting. // In other words `disconnected` might have already re-connected but @@ -1221,7 +1209,7 @@ where // // Only try dialing peer if not currently connected. if !self.connected_peers.contains(disconnected.preimage()) { - self.queued_events.push_back(NetworkBehaviourAction::Dial { + self.queued_events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(disconnected.into_preimage()) .build(), }) @@ -1624,16 +1612,15 @@ where // If the (alleged) publisher is the local node, do nothing. The record of // the original publisher should never change as a result of replication // and the publisher is always assumed to have the "right" value. - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::PutRecordRes { - key: record.key, - value: record.value, - request_id, - }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::PutRecordRes { + key: record.key, + value: record.value, + request_id, + }, + }); return; } @@ -1684,40 +1671,37 @@ where record.key, record.value.len() ); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::PutRecord { - source, - connection, - record: None, - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::PutRecord { + source, + connection, + record: None, }, - )); + }, + )); } Err(e) => { info!("Record not stored: {:?}", e); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::Reset(request_id), - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::Reset(request_id), + }); return; } }, KademliaStoreInserts::FilterBoth => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::PutRecord { - source, - connection, - record: Some(record.clone()), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::PutRecord { + source, + connection, + record: Some(record.clone()), }, - )); + }, + )); } } } @@ -1729,16 +1713,15 @@ where // closest nodes to the target. In addition returning // [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal // information to a possibly malicious remote node. - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::PutRecordRes { - key: record.key, - value: record.value, - request_id, - }, - }) + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::PutRecordRes { + key: record.key, + value: record.value, + request_id, + }, + }) } /// Processes a provider record received from a peer. @@ -1757,22 +1740,20 @@ where return; } - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::AddProvider { record: None }, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::AddProvider { record: None }, + }, + )); } KademliaStoreInserts::FilterBoth => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::AddProvider { - record: Some(record), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::AddProvider { + record: Some(record), }, - )); + }, + )); } } } @@ -1845,12 +1826,11 @@ where .position(|(p, _)| p == &peer_id) .map(|p| q.inner.pending_rpcs.remove(p)) }) { - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - event, - handler: NotifyHandler::Any, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id, + event, + handler: NotifyHandler::Any, + }); } self.connected_peers.insert(peer_id); @@ -2085,24 +2065,22 @@ where KademliaHandlerEvent::FindNodeReq { key, request_id } => { let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::FindNode { - num_closer_peers: closer_peers.len(), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::FindNode { + num_closer_peers: closer_peers.len(), }, - )); + }, + )); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::FindNodeRes { - closer_peers, - request_id, - }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::FindNodeRes { + closer_peers, + request_id, + }, + }); } KademliaHandlerEvent::FindNodeRes { @@ -2116,26 +2094,24 @@ where let provider_peers = self.provider_peers(&key, &source); let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::GetProvider { - num_closer_peers: closer_peers.len(), - num_provider_peers: provider_peers.len(), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::GetProvider { + num_closer_peers: closer_peers.len(), + num_provider_peers: provider_peers.len(), }, - )); + }, + )); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::GetProvidersRes { - closer_peers, - provider_peers, - request_id, - }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::GetProvidersRes { + closer_peers, + provider_peers, + request_id, + }, + }); } KademliaHandlerEvent::GetProvidersRes { @@ -2157,20 +2133,19 @@ where *providers_found += provider_peers.len(); let providers = provider_peers.iter().map(|p| p.node_id).collect(); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { - id: user_data, - result: QueryResult::GetProviders(Ok( - GetProvidersOk::FoundProviders { - key: key.clone(), - providers, - }, - )), - step: step.clone(), - stats, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id: user_data, + result: QueryResult::GetProviders(Ok( + GetProvidersOk::FoundProviders { + key: key.clone(), + providers, + }, + )), + step: step.clone(), + stats, + }, + )); *step = step.next(); } } @@ -2215,26 +2190,24 @@ where let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::GetRecord { - num_closer_peers: closer_peers.len(), - present_locally: record.is_some(), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::GetRecord { + num_closer_peers: closer_peers.len(), + present_locally: record.is_some(), }, - )); + }, + )); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::GetRecordRes { - record, - closer_peers, - request_id, - }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::GetRecordRes { + record, + closer_peers, + request_id, + }, + }); } KademliaHandlerEvent::GetRecordRes { @@ -2258,17 +2231,16 @@ where record, }; - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { - id: user_data, - result: QueryResult::GetRecord(Ok( - GetRecordOk::FoundRecord(record), - )), - step: step.clone(), - stats, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id: user_data, + result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord( + record, + ))), + step: step.clone(), + stats, + }, + )); *step = step.next(); } else { @@ -2333,7 +2305,7 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { let now = Instant::now(); // Calculate the available capacity for queries triggered by background jobs. @@ -2392,7 +2364,7 @@ where addresses: value, old_peer: entry.evicted.map(|n| n.key.into_preimage()), }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::GenerateEvent(event)); } // Look for a finished query. @@ -2400,12 +2372,12 @@ where match self.queries.poll(now) { QueryPoolState::Finished(q) => { if let Some(event) = self.query_finished(q) { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::GenerateEvent(event)); } } QueryPoolState::Timeout(q) => { if let Some(event) = self.query_timeout(q) { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::GenerateEvent(event)); } } QueryPoolState::Waiting(Some((query, peer_id))) => { @@ -2424,15 +2396,14 @@ where } if self.connected_peers.contains(&peer_id) { - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - event, - handler: NotifyHandler::Any, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id, + event, + handler: NotifyHandler::Any, + }); } else if &peer_id != self.kbuckets.local_key().preimage() { query.inner.pending_rpcs.push((peer_id, event)); - self.queued_events.push_back(NetworkBehaviourAction::Dial { + self.queued_events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(peer_id).build(), }); } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index f2709e90..92e38c04 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -31,8 +31,8 @@ use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ - dummy, ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + dummy, ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, PollParameters, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use smallvec::SmallVec; use std::collections::hash_map::{Entry, HashMap}; @@ -252,7 +252,7 @@ where &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { // Poll ifwatch. while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { match event { @@ -307,7 +307,7 @@ where let event = Event::Discovered(DiscoveredAddrsIter { inner: discovered.into_iter(), }); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::GenerateEvent(event)); } // Emit expired event. let now = Instant::now(); @@ -326,7 +326,7 @@ where let event = Event::Expired(ExpiredAddrsIter { inner: expired.into_iter(), }); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::GenerateEvent(event)); } if let Some(closest_expiration) = closest_expiration { let mut timer = P::Timer::at(closest_expiration); diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs index f7d9d845..dade91c5 100644 --- a/protocols/perf/src/client/behaviour.rs +++ b/protocols/perf/src/client/behaviour.rs @@ -29,8 +29,8 @@ use libp2p_core::Multiaddr; use libp2p_identity::PeerId; use libp2p_swarm::{ derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionHandlerUpgrErr, - ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, THandlerInEvent, THandlerOutEvent, + ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent, + THandlerOutEvent, ToSwarm, }; use void::Void; @@ -47,7 +47,7 @@ pub struct Event { #[derive(Default)] pub struct Behaviour { /// Queue of actions to return when polled. - queued_events: VecDeque>>, + queued_events: VecDeque>>, /// Set of connected peers. connected: HashSet, } @@ -64,12 +64,11 @@ impl Behaviour { let id = RunId::next(); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: server, - handler: NotifyHandler::Any, - event: crate::client::handler::Command { id, params }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: server, + handler: NotifyHandler::Any, + event: crate::client::handler::Command { id, params }, + }); Ok(id) } @@ -141,14 +140,14 @@ impl NetworkBehaviour for Behaviour { super::handler::Event { id, result }: THandlerOutEvent, ) { self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent(Event { id, result })); + .push_back(ToSwarm::GenerateEvent(Event { id, result })); } fn poll( &mut self, _cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/perf/src/server/behaviour.rs b/protocols/perf/src/server/behaviour.rs index 32c0dbda..5d63475c 100644 --- a/protocols/perf/src/server/behaviour.rs +++ b/protocols/perf/src/server/behaviour.rs @@ -27,8 +27,8 @@ use std::{ use libp2p_identity::PeerId; use libp2p_swarm::{ - ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, - THandlerInEvent, THandlerOutEvent, + ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent, + ToSwarm, }; use crate::server::handler::Handler; @@ -44,7 +44,7 @@ pub struct Event { #[derive(Default)] pub struct Behaviour { /// Queue of actions to return when polled. - queued_events: VecDeque>>, + queued_events: VecDeque>>, } impl Behaviour { @@ -100,18 +100,17 @@ impl NetworkBehaviour for Behaviour { _connection_id: ConnectionId, super::handler::Event { stats }: THandlerOutEvent, ) { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent(Event { - remote_peer_id: event_source, - stats, - })) + self.queued_events.push_back(ToSwarm::GenerateEvent(Event { + remote_peer_id: event_source, + stats, + })) } fn poll( &mut self, _cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(event) = self.queued_events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 04fab82f..23fe2ba6 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -50,8 +50,8 @@ pub use handler::{Config, Failure, Success}; use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::{ - behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, - PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters, + THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::{ collections::VecDeque, @@ -154,7 +154,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(e) = self.events.pop_back() { let Event { result, peer } = &e; @@ -164,7 +164,7 @@ impl NetworkBehaviour for Behaviour { _ => {} } - Poll::Ready(NetworkBehaviourAction::GenerateEvent(e)) + Poll::Ready(ToSwarm::GenerateEvent(e)) } else { Poll::Pending } diff --git a/protocols/relay/src/behaviour.rs b/protocols/relay/src/behaviour.rs index e8000966..c301a050 100644 --- a/protocols/relay/src/behaviour.rs +++ b/protocols/relay/src/behaviour.rs @@ -35,8 +35,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::{ dummy, ConnectionDenied, ConnectionHandlerUpgrErr, ConnectionId, ExternalAddresses, - NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, - THandlerInEvent, THandlerOutEvent, + NetworkBehaviour, NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + ToSwarm, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; @@ -242,7 +242,7 @@ impl Behaviour { .filter(|c| matches!(c.status, CircuitStatus::Accepted)) { self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitClosed { + ToSwarm::GenerateEvent(Event::CircuitClosed { src_peer_id: circuit.src_peer_id, dst_peer_id: circuit.dst_peer_id, error: Some(std::io::ErrorKind::ConnectionAborted.into()), @@ -377,7 +377,7 @@ impl NetworkBehaviour for Behaviour { .all(|limiter| { limiter.try_next(event_source, endpoint.get_remote_address(), now) }) { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { handler: NotifyHandler::One(connection), peer_id: event_source, event: Either::Left(handler::In::DenyReservationReq { @@ -411,7 +411,7 @@ impl NetworkBehaviour for Behaviour { .insert(connection); self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationReqAccepted { + ToSwarm::GenerateEvent(Event::ReservationReqAccepted { src_peer_id: event_source, renewed, }) @@ -420,7 +420,7 @@ impl NetworkBehaviour for Behaviour { } handler::Event::ReservationReqAcceptFailed { error } => { self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationReqAcceptFailed { + ToSwarm::GenerateEvent(Event::ReservationReqAcceptFailed { src_peer_id: event_source, error, }) @@ -429,7 +429,7 @@ impl NetworkBehaviour for Behaviour { } handler::Event::ReservationReqDenied {} => { self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationReqDenied { + ToSwarm::GenerateEvent(Event::ReservationReqDenied { src_peer_id: event_source, }) .into(), @@ -437,7 +437,7 @@ impl NetworkBehaviour for Behaviour { } handler::Event::ReservationReqDenyFailed { error } => { self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationReqDenyFailed { + ToSwarm::GenerateEvent(Event::ReservationReqDenyFailed { src_peer_id: event_source, error, }) @@ -462,7 +462,7 @@ impl NetworkBehaviour for Behaviour { } self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationTimedOut { + ToSwarm::GenerateEvent(Event::ReservationTimedOut { src_peer_id: event_source, }) .into(), @@ -491,7 +491,7 @@ impl NetworkBehaviour for Behaviour { limiter.try_next(event_source, endpoint.get_remote_address(), now) }) { // Deny circuit exceeding limits. - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { handler: NotifyHandler::One(connection), peer_id: event_source, event: Either::Left(handler::In::DenyCircuitReq { @@ -514,7 +514,7 @@ impl NetworkBehaviour for Behaviour { dst_connection_id: *dst_conn, }); - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { handler: NotifyHandler::One(*dst_conn), peer_id: event_source, event: Either::Left(handler::In::NegotiateOutboundConnect { @@ -527,7 +527,7 @@ impl NetworkBehaviour for Behaviour { } } else { // Deny circuit request if no reservation present. - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { handler: NotifyHandler::One(connection), peer_id: event_source, event: Either::Left(handler::In::DenyCircuitReq { @@ -541,7 +541,7 @@ impl NetworkBehaviour for Behaviour { } handler::Event::CircuitReqReceiveFailed { error } => { self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqReceiveFailed { + ToSwarm::GenerateEvent(Event::CircuitReqReceiveFailed { src_peer_id: event_source, error, }) @@ -557,7 +557,7 @@ impl NetworkBehaviour for Behaviour { } self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqDenied { + ToSwarm::GenerateEvent(Event::CircuitReqDenied { src_peer_id: event_source, dst_peer_id, }) @@ -574,7 +574,7 @@ impl NetworkBehaviour for Behaviour { } self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqDenyFailed { + ToSwarm::GenerateEvent(Event::CircuitReqDenyFailed { src_peer_id: event_source, dst_peer_id, error, @@ -592,7 +592,7 @@ impl NetworkBehaviour for Behaviour { dst_pending_data, } => { self.queued_actions.push_back( - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { handler: NotifyHandler::One(src_connection_id), peer_id: src_peer_id, event: Either::Left(handler::In::AcceptAndDriveCircuit { @@ -616,7 +616,7 @@ impl NetworkBehaviour for Behaviour { error, } => { self.queued_actions.push_back( - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { handler: NotifyHandler::One(src_connection_id), peer_id: src_peer_id, event: Either::Left(handler::In::DenyCircuitReq { @@ -628,7 +628,7 @@ impl NetworkBehaviour for Behaviour { .into(), ); self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqOutboundConnectFailed { + ToSwarm::GenerateEvent(Event::CircuitReqOutboundConnectFailed { src_peer_id, dst_peer_id: event_source, error, @@ -642,7 +642,7 @@ impl NetworkBehaviour for Behaviour { } => { self.circuits.accepted(circuit_id); self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqAccepted { + ToSwarm::GenerateEvent(Event::CircuitReqAccepted { src_peer_id: event_source, dst_peer_id, }) @@ -656,7 +656,7 @@ impl NetworkBehaviour for Behaviour { } => { self.circuits.remove(circuit_id); self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqAcceptFailed { + ToSwarm::GenerateEvent(Event::CircuitReqAcceptFailed { src_peer_id: event_source, dst_peer_id, error, @@ -672,7 +672,7 @@ impl NetworkBehaviour for Behaviour { self.circuits.remove(circuit_id); self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitClosed { + ToSwarm::GenerateEvent(Event::CircuitClosed { src_peer_id: event_source, dst_peer_id, error, @@ -687,7 +687,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(action) = self.queued_actions.pop_front() { return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses)); } @@ -786,11 +786,11 @@ impl Add for CircuitId { } } -/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] +/// A [`ToSwarm`], either complete, or still requiring data from [`PollParameters`] /// before being returned in [`Behaviour::poll`]. #[allow(clippy::large_enum_variant)] enum Action { - Done(NetworkBehaviourAction>), + Done(ToSwarm>), AcceptReservationPrototype { inbound_reservation_req: inbound_hop::ReservationReq, handler: NotifyHandler, @@ -798,8 +798,8 @@ enum Action { }, } -impl From>> for Action { - fn from(action: NetworkBehaviourAction>) -> Self { +impl From>> for Action { + fn from(action: ToSwarm>) -> Self { Self::Done(action) } } @@ -809,14 +809,14 @@ impl Action { self, local_peer_id: PeerId, external_addresses: &ExternalAddresses, - ) -> NetworkBehaviourAction> { + ) -> ToSwarm> { match self { Action::Done(action) => action, Action::AcceptReservationPrototype { inbound_reservation_req, handler, peer_id, - } => NetworkBehaviourAction::NotifyHandler { + } => ToSwarm::NotifyHandler { handler, peer_id, event: Either::Left(handler::In::AcceptReservationReq { diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 63140936..2a24607a 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -40,8 +40,8 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ dummy, ConnectionDenied, ConnectionHandler, ConnectionHandlerUpgrErr, ConnectionId, - DialFailure, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + DialFailure, NegotiatedSubstream, NetworkBehaviour, NotifyHandler, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; @@ -104,7 +104,7 @@ pub struct Behaviour { directly_connected_peers: HashMap>, /// Queue of actions to return when polled. - queued_actions: VecDeque>>, + queued_actions: VecDeque>>, pending_handler_commands: HashMap, } @@ -219,12 +219,11 @@ impl NetworkBehaviour for Behaviour { } if let Some(event) = self.pending_handler_commands.remove(&connection_id) { - self.queued_actions - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection_id), - event: Either::Left(event), - }) + self.queued_actions.push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection_id), + event: Either::Left(event), + }) } } FromSwarm::ConnectionClosed(connection_closed) => { @@ -296,15 +295,14 @@ impl NetworkBehaviour for Behaviour { } }; - self.queued_actions - .push_back(NetworkBehaviourAction::GenerateEvent(event)) + self.queued_actions.push_back(ToSwarm::GenerateEvent(event)) } fn poll( &mut self, cx: &mut Context<'_>, _poll_parameters: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(action) = self.queued_actions.pop_front() { return Poll::Ready(action); } @@ -320,7 +318,7 @@ impl NetworkBehaviour for Behaviour { .get(&relay_peer_id) .and_then(|cs| cs.get(0)) { - Some(connection_id) => NetworkBehaviourAction::NotifyHandler { + Some(connection_id) => ToSwarm::NotifyHandler { peer_id: relay_peer_id, handler: NotifyHandler::One(*connection_id), event: Either::Left(handler::In::Reserve { to_listener }), @@ -334,7 +332,7 @@ impl NetworkBehaviour for Behaviour { self.pending_handler_commands .insert(relayed_connection_id, handler::In::Reserve { to_listener }); - NetworkBehaviourAction::Dial { opts } + ToSwarm::Dial { opts } } } } @@ -350,7 +348,7 @@ impl NetworkBehaviour for Behaviour { .get(&relay_peer_id) .and_then(|cs| cs.get(0)) { - Some(connection_id) => NetworkBehaviourAction::NotifyHandler { + Some(connection_id) => ToSwarm::NotifyHandler { peer_id: relay_peer_id, handler: NotifyHandler::One(*connection_id), event: Either::Left(handler::In::EstablishCircuit { @@ -373,7 +371,7 @@ impl NetworkBehaviour for Behaviour { }, ); - NetworkBehaviourAction::Dial { opts } + ToSwarm::Dial { opts } } } } diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 718eee1e..324d352c 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -33,8 +33,7 @@ use libp2p_identity::{Keypair, PeerId, SigningError}; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ CloseConnection, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, THandlerInEvent, - THandlerOutEvent, + NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{HashMap, VecDeque}; use std::iter::FromIterator; @@ -42,7 +41,7 @@ use std::task::{Context, Poll}; use void::Void; pub struct Behaviour { - events: VecDeque>>, + events: VecDeque>>, keypair: Keypair, pending_register_requests: Vec<(Namespace, PeerId, Option)>, @@ -75,7 +74,7 @@ impl Behaviour { /// Register our external addresses in the given namespace with the given rendezvous peer. /// /// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported - /// by other [`NetworkBehaviour`]s via [`NetworkBehaviourAction::ReportObservedAddr`]. + /// by other [`NetworkBehaviour`]s via [`ToSwarm::ReportObservedAddr`]. pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option) { self.pending_register_requests .push((namespace, rendezvous_node, ttl)); @@ -83,14 +82,13 @@ impl Behaviour { /// Unregister ourselves from the given namespace with the given rendezvous peer. pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: rendezvous_node, - event: handler::OutboundInEvent::NewSubstream { - open_info: OpenInfo::UnregisterRequest(namespace), - }, - handler: NotifyHandler::Any, - }); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::UnregisterRequest(namespace), + }, + handler: NotifyHandler::Any, + }); } /// Discover other peers at a given rendezvous peer. @@ -107,18 +105,17 @@ impl Behaviour { limit: Option, rendezvous_node: PeerId, ) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: rendezvous_node, - event: handler::OutboundInEvent::NewSubstream { - open_info: OpenInfo::DiscoverRequest { - namespace: ns, - cookie, - limit, - }, + self.events.push_back(ToSwarm::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::DiscoverRequest { + namespace: ns, + cookie, + limit, }, - handler: NotifyHandler::Any, - }); + }, + handler: NotifyHandler::Any, + }); } } @@ -233,7 +230,7 @@ impl NetworkBehaviour for Behaviour { handler::OutboundOutEvent::OutboundError { error, .. } => { log::warn!("Connection with peer {} failed: {}", peer_id, error); - vec![NetworkBehaviourAction::CloseConnection { + vec![ToSwarm::CloseConnection { peer_id, connection: CloseConnection::One(connection_id), }] @@ -247,7 +244,7 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } @@ -259,13 +256,13 @@ impl NetworkBehaviour for Behaviour { let external_addresses = self.external_addresses.iter().cloned().collect::>(); if external_addresses.is_empty() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - Event::RegisterFailed(RegisterError::NoExternalAddresses), - )); + return Poll::Ready(ToSwarm::GenerateEvent(Event::RegisterFailed( + RegisterError::NoExternalAddresses, + ))); } let action = match PeerRecord::new(&self.keypair, external_addresses) { - Ok(peer_record) => NetworkBehaviourAction::NotifyHandler { + Ok(peer_record) => ToSwarm::NotifyHandler { peer_id: rendezvous_node, event: handler::OutboundInEvent::NewSubstream { open_info: OpenInfo::RegisterRequest(NewRegistration { @@ -276,7 +273,7 @@ impl NetworkBehaviour for Behaviour { }, handler: NotifyHandler::Any, }, - Err(signing_error) => NetworkBehaviourAction::GenerateEvent(Event::RegisterFailed( + Err(signing_error) => ToSwarm::GenerateEvent(Event::RegisterFailed( RegisterError::FailedToMakeRecord(signing_error), )), }; @@ -288,7 +285,7 @@ impl NetworkBehaviour for Behaviour { futures::ready!(self.expiring_registrations.poll_next_unpin(cx)) { self.discovered_peers.remove(&expired_registration); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(Event::Expired { + return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired { peer: expired_registration.0, })); } @@ -321,23 +318,23 @@ fn handle_outbound_event( peer_id: PeerId, discovered_peers: &mut HashMap<(PeerId, Namespace), Vec>, expiring_registrations: &mut FuturesUnordered>, -) -> Vec>> { +) -> Vec>> { match event { outbound::OutEvent::Registered { namespace, ttl } => { - vec![NetworkBehaviourAction::GenerateEvent(Event::Registered { + vec![ToSwarm::GenerateEvent(Event::Registered { rendezvous_node: peer_id, ttl, namespace, })] } outbound::OutEvent::RegisterFailed(namespace, error) => { - vec![NetworkBehaviourAction::GenerateEvent( - Event::RegisterFailed(RegisterError::Remote { + vec![ToSwarm::GenerateEvent(Event::RegisterFailed( + RegisterError::Remote { rendezvous_node: peer_id, namespace, error, - }), - )] + }, + ))] } outbound::OutEvent::Discovered { registrations, @@ -361,20 +358,18 @@ fn handle_outbound_event( .boxed() })); - vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered { + vec![ToSwarm::GenerateEvent(Event::Discovered { rendezvous_node: peer_id, registrations, cookie, })] } outbound::OutEvent::DiscoverFailed { namespace, error } => { - vec![NetworkBehaviourAction::GenerateEvent( - Event::DiscoverFailed { - rendezvous_node: peer_id, - namespace, - error, - }, - )] + vec![ToSwarm::GenerateEvent(Event::DiscoverFailed { + rendezvous_node: peer_id, + namespace, + error, + })] } } } diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 609428bf..1311d4f9 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -31,8 +31,8 @@ use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::{ - CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::iter::FromIterator; @@ -41,7 +41,7 @@ use std::time::Duration; use void::Void; pub struct Behaviour { - events: VecDeque>>, + events: VecDeque>>, registrations: Registrations, } @@ -150,7 +150,7 @@ impl NetworkBehaviour for Behaviour { handler::InboundOutEvent::InboundError { error, .. } => { log::warn!("Connection with peer {} failed: {}", peer_id, error); - vec![NetworkBehaviourAction::CloseConnection { + vec![ToSwarm::CloseConnection { peer_id, connection: CloseConnection::One(connection), }] @@ -165,11 +165,11 @@ impl NetworkBehaviour for Behaviour { &mut self, cx: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - Event::RegistrationExpired(registration), - )); + return Poll::Ready(ToSwarm::GenerateEvent(Event::RegistrationExpired( + registration, + ))); } if let Some(event) = self.events.pop_front() { @@ -203,7 +203,7 @@ fn handle_inbound_event( connection: ConnectionId, id: InboundSubstreamId, registrations: &mut Registrations, -) -> Vec>> { +) -> Vec>> { match event { // bad registration inbound::OutEvent::RegistrationRequested(registration) @@ -212,7 +212,7 @@ fn handle_inbound_event( let error = ErrorCode::NotAuthorized; vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -220,7 +220,7 @@ fn handle_inbound_event( message: inbound::InEvent::DeclineRegisterRequest(error), }, }, - NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered { + ToSwarm::GenerateEvent(Event::PeerNotRegistered { peer: peer_id, namespace: registration.namespace, error, @@ -233,7 +233,7 @@ fn handle_inbound_event( match registrations.add(registration) { Ok(registration) => { vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -243,7 +243,7 @@ fn handle_inbound_event( }, }, }, - NetworkBehaviourAction::GenerateEvent(Event::PeerRegistered { + ToSwarm::GenerateEvent(Event::PeerRegistered { peer: peer_id, registration, }), @@ -253,7 +253,7 @@ fn handle_inbound_event( let error = ErrorCode::InvalidTtl; vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -261,7 +261,7 @@ fn handle_inbound_event( message: inbound::InEvent::DeclineRegisterRequest(error), }, }, - NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered { + ToSwarm::GenerateEvent(Event::PeerNotRegistered { peer: peer_id, namespace, error, @@ -279,7 +279,7 @@ fn handle_inbound_event( let discovered = registrations.cloned().collect::>(); vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -290,7 +290,7 @@ fn handle_inbound_event( }, }, }, - NetworkBehaviourAction::GenerateEvent(Event::DiscoverServed { + ToSwarm::GenerateEvent(Event::DiscoverServed { enquirer: peer_id, registrations: discovered, }), @@ -300,7 +300,7 @@ fn handle_inbound_event( let error = ErrorCode::InvalidCookie; vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -308,7 +308,7 @@ fn handle_inbound_event( message: inbound::InEvent::DeclineDiscoverRequest(error), }, }, - NetworkBehaviourAction::GenerateEvent(Event::DiscoverNotServed { + ToSwarm::GenerateEvent(Event::DiscoverNotServed { enquirer: peer_id, error, }), @@ -318,12 +318,10 @@ fn handle_inbound_event( inbound::OutEvent::UnregisterRequested(namespace) => { registrations.remove(namespace.clone(), peer_id); - vec![NetworkBehaviourAction::GenerateEvent( - Event::PeerUnregistered { - peer: peer_id, - namespace, - }, - )] + vec![ToSwarm::GenerateEvent(Event::PeerUnregistered { + peer: peer_id, + namespace, + })] } } } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 707be901..d9f2220d 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -75,8 +75,8 @@ use libp2p_identity::PeerId; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, dial_opts::DialOpts, - ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }; use smallvec::SmallVec; use std::{ @@ -350,9 +350,8 @@ where /// The protocol codec for reading and writing requests and responses. codec: TCodec, /// Pending events to return from `poll`. - pending_events: VecDeque< - NetworkBehaviourAction, RequestProtocol>, - >, + pending_events: + VecDeque, RequestProtocol>>, /// The currently connected peers, their pending outbound and inbound responses and their known, /// reachable addresses, if any. connected: HashMap>, @@ -419,7 +418,7 @@ where }; if let Some(request) = self.try_send_request(peer, request) { - self.pending_events.push_back(NetworkBehaviourAction::Dial { + self.pending_events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(*peer).build(), }); self.pending_outbound_requests @@ -538,12 +537,11 @@ where let ix = (request.request_id.0 as usize) % connections.len(); let conn = &mut connections[ix]; conn.pending_inbound_responses.insert(request.request_id); - self.pending_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer, - handler: NotifyHandler::One(conn.id), - event: request, - }); + self.pending_events.push_back(ToSwarm::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::One(conn.id), + event: request, + }); None } else { Some(request) @@ -675,24 +673,20 @@ where for request_id in connection.pending_outbound_responses { self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer: peer_id, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer: peer_id, + request_id, + error: InboundFailure::ConnectionClosed, + })); } for request_id in connection.pending_inbound_responses { self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::OutboundFailure { - peer: peer_id, - request_id, - error: OutboundFailure::ConnectionClosed, - }, - )); + .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { + peer: peer_id, + request_id, + error: OutboundFailure::ConnectionClosed, + })); } } @@ -707,13 +701,11 @@ where if let Some(pending) = self.pending_outbound_requests.remove(&peer) { for request in pending { self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::OutboundFailure { - peer, - request_id: request.request_id, - error: OutboundFailure::DialFailure, - }, - )); + .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { + peer, + request_id: request.request_id, + error: OutboundFailure::DialFailure, + })); } } } @@ -825,10 +817,7 @@ where response, }; self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { - peer, - message, - })); + .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); } handler::Event::Request { request_id, @@ -842,10 +831,7 @@ where channel, }; self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { - peer, - message, - })); + .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message })); match self.get_connection_mut(&peer, connection) { Some(connection) => { @@ -854,14 +840,13 @@ where } // Connection closed after `Event::Request` has been emitted. None => { - self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); + self.pending_events.push_back(ToSwarm::GenerateEvent( + Event::InboundFailure { + peer, + request_id, + error: InboundFailure::ConnectionClosed, + }, + )); } } } @@ -873,7 +858,7 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::ResponseSent { + .push_back(ToSwarm::GenerateEvent(Event::ResponseSent { peer, request_id, })); @@ -886,13 +871,11 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::ResponseOmission, - }, - )); + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::ResponseOmission, + })); } handler::Event::OutboundTimeout(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); @@ -902,13 +885,11 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::OutboundFailure { - peer, - request_id, - error: OutboundFailure::Timeout, - }, - )); + .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { + peer, + request_id, + error: OutboundFailure::Timeout, + })); } handler::Event::InboundTimeout(request_id) => { // Note: `Event::InboundTimeout` is emitted both for timing @@ -918,13 +899,11 @@ where self.remove_pending_outbound_response(&peer, connection, request_id); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::Timeout, - }, - )); + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Timeout, + })); } handler::Event::OutboundUnsupportedProtocols(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); @@ -934,26 +913,22 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::OutboundFailure { - peer, - request_id, - error: OutboundFailure::UnsupportedProtocols, - }, - )); + .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure { + peer, + request_id, + error: OutboundFailure::UnsupportedProtocols, + })); } handler::Event::InboundUnsupportedProtocols(request_id) => { // Note: No need to call `self.remove_pending_outbound_response`, // `Event::Request` was never emitted for this request and // thus request was never added to `pending_outbound_responses`. self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::UnsupportedProtocols, - }, - )); + .push_back(ToSwarm::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::UnsupportedProtocols, + })); } } } @@ -962,7 +937,7 @@ where &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ev); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index d4e52bab..04a32cf9 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -62,7 +62,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { let multiaddr = quote! { #prelude_path::Multiaddr }; let trait_to_impl = quote! { #prelude_path::NetworkBehaviour }; let either_ident = quote! { #prelude_path::Either }; - let network_behaviour_action = quote! { #prelude_path::NetworkBehaviourAction }; + let network_behaviour_action = quote! { #prelude_path::ToSwarm }; let connection_handler = quote! { #prelude_path::ConnectionHandler }; let proto_select_ident = quote! { #prelude_path::ConnectionHandlerSelect }; let peer_id = quote! { #prelude_path::PeerId }; diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index bb9f93a2..70061416 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -9,9 +9,15 @@ - Deprecate `Swarm::ban_peer_id` in favor of the new `libp2p::allow_block_list` module. See [PR 3590]. +- Rename `NetworkBehaviourAction` to `ToSwarm`. + A deprecated type-alias is provided to ease the transition. + The new name is meant to better indicate the message-passing relationship between `Swarm` and `NetworkBehaviour`. + See [PR XXXX]. + [PR 3386]: https://github.com/libp2p/rust-libp2p/pull/3386 [PR 3652]: https://github.com/libp2p/rust-libp2p/pull/3652 [PR 3590]: https://github.com/libp2p/rust-libp2p/pull/3590 +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX # 0.42.0 diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index fcb61443..9fd014bd 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -141,7 +141,7 @@ pub trait NetworkBehaviour: 'static { /// sent from the handler to the behaviour are invoked with /// [`NetworkBehaviour::on_connection_handler_event`], /// and the behaviour can send a message to the handler by making [`NetworkBehaviour::poll`] - /// return [`NetworkBehaviourAction::NotifyHandler`]. + /// return [`ToSwarm::NotifyHandler`]. /// /// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and /// connection closing. @@ -276,7 +276,7 @@ pub trait NetworkBehaviour: 'static { &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll>>; + ) -> Poll>>; } /// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to. @@ -318,12 +318,14 @@ pub trait PollParameters { fn local_peer_id(&self) -> &PeerId; } -/// An action that a [`NetworkBehaviour`] can trigger in the [`Swarm`] -/// in whose context it is executing. +#[deprecated(note = "Use `ToSwarm` instead.")] +pub type NetworkBehaviourAction = ToSwarm; + +/// A command issued from a [`NetworkBehaviour`] for the [`Swarm`]. /// /// [`Swarm`]: super::Swarm #[derive(Debug)] -pub enum NetworkBehaviourAction { +pub enum ToSwarm { /// Instructs the `Swarm` to return an event when it is being polled. GenerateEvent(TOutEvent), @@ -381,7 +383,7 @@ pub enum NetworkBehaviourAction { /// with the given peer. /// /// Note: Closing a connection via - /// [`NetworkBehaviourAction::CloseConnection`] does not inform the + /// [`ToSwarm::CloseConnection`] does not inform the /// corresponding [`ConnectionHandler`](crate::ConnectionHandler). /// Closing a connection via a [`ConnectionHandler`](crate::ConnectionHandler) can be done /// either in a collaborative manner across [`ConnectionHandler`](crate::ConnectionHandler)s @@ -395,31 +397,31 @@ pub enum NetworkBehaviourAction { }, } -impl NetworkBehaviourAction { +impl ToSwarm { /// Map the handler event. pub fn map_in( self, f: impl FnOnce(TInEventOld) -> TInEventNew, - ) -> NetworkBehaviourAction { + ) -> ToSwarm { match self { - NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), - NetworkBehaviourAction::Dial { opts } => NetworkBehaviourAction::Dial { opts }, - NetworkBehaviourAction::NotifyHandler { + ToSwarm::GenerateEvent(e) => ToSwarm::GenerateEvent(e), + ToSwarm::Dial { opts } => ToSwarm::Dial { opts }, + ToSwarm::NotifyHandler { peer_id, handler, event, - } => NetworkBehaviourAction::NotifyHandler { + } => ToSwarm::NotifyHandler { peer_id, handler, event: f(event), }, - NetworkBehaviourAction::ReportObservedAddr { address, score } => { - NetworkBehaviourAction::ReportObservedAddr { address, score } + ToSwarm::ReportObservedAddr { address, score } => { + ToSwarm::ReportObservedAddr { address, score } } - NetworkBehaviourAction::CloseConnection { + ToSwarm::CloseConnection { peer_id, connection, - } => NetworkBehaviourAction::CloseConnection { + } => ToSwarm::CloseConnection { peer_id, connection, }, @@ -427,31 +429,28 @@ impl NetworkBehaviourAction { } } -impl NetworkBehaviourAction { +impl ToSwarm { /// Map the event the swarm will return. - pub fn map_out( - self, - f: impl FnOnce(TOutEvent) -> E, - ) -> NetworkBehaviourAction { + pub fn map_out(self, f: impl FnOnce(TOutEvent) -> E) -> ToSwarm { match self { - NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(f(e)), - NetworkBehaviourAction::Dial { opts } => NetworkBehaviourAction::Dial { opts }, - NetworkBehaviourAction::NotifyHandler { + ToSwarm::GenerateEvent(e) => ToSwarm::GenerateEvent(f(e)), + ToSwarm::Dial { opts } => ToSwarm::Dial { opts }, + ToSwarm::NotifyHandler { peer_id, handler, event, - } => NetworkBehaviourAction::NotifyHandler { + } => ToSwarm::NotifyHandler { peer_id, handler, event, }, - NetworkBehaviourAction::ReportObservedAddr { address, score } => { - NetworkBehaviourAction::ReportObservedAddr { address, score } + ToSwarm::ReportObservedAddr { address, score } => { + ToSwarm::ReportObservedAddr { address, score } } - NetworkBehaviourAction::CloseConnection { + ToSwarm::CloseConnection { peer_id, connection, - } => NetworkBehaviourAction::CloseConnection { + } => ToSwarm::CloseConnection { peer_id, connection, }, diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index ef71f8e5..bf59949c 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::behaviour::{self, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use crate::behaviour::{self, NetworkBehaviour, PollParameters, ToSwarm}; use crate::connection::ConnectionId; use crate::{ConnectionDenied, THandler, THandlerInEvent, THandlerOutEvent}; use either::Either; @@ -156,7 +156,7 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { let event = match self { Either::Left(behaviour) => futures::ready!(behaviour.poll(cx, params)) .map_out(Either::Left) diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 395e3804..bd4678a5 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -27,8 +27,8 @@ use crate::handler::{ }; use crate::upgrade::SendWrapper; use crate::{ - ConnectionDenied, NetworkBehaviour, NetworkBehaviourAction, PollParameters, THandler, - THandlerInEvent, THandlerOutEvent, + ConnectionDenied, NetworkBehaviour, PollParameters, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, }; use either::Either; use futures::future; @@ -182,7 +182,7 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(inner) = self.inner.as_mut() { inner.poll(cx, params) } else { diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index c43a7a52..6ee60831 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -30,7 +30,7 @@ use std::num::NonZeroU8; /// Options to configure a dial to a known or unknown peer. /// /// Used in [`Swarm::dial`](crate::Swarm::dial) and -/// [`NetworkBehaviourAction::Dial`](crate::behaviour::NetworkBehaviourAction::Dial). +/// [`ToSwarm::Dial`](crate::behaviour::ToSwarm::Dial). /// /// To construct use either of: /// diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 9396344c..4497540a 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -1,4 +1,4 @@ -use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use crate::behaviour::{FromSwarm, NetworkBehaviour, PollParameters, ToSwarm}; use crate::connection::ConnectionId; use crate::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, @@ -54,7 +54,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { Poll::Pending } diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index 119b8ac8..c22a926a 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -1,4 +1,4 @@ -use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use crate::behaviour::{FromSwarm, NetworkBehaviour, PollParameters, ToSwarm}; use crate::connection::ConnectionId; use crate::handler::{ ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, @@ -57,7 +57,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _: &mut Context<'_>, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { Poll::Pending } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 41809f97..20a73b6c 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -90,11 +90,13 @@ pub mod derive_prelude { pub use crate::ConnectionHandlerSelect; pub use crate::DialError; pub use crate::NetworkBehaviour; + #[allow(deprecated)] pub use crate::NetworkBehaviourAction; pub use crate::PollParameters; pub use crate::THandler; pub use crate::THandlerInEvent; pub use crate::THandlerOutEvent; + pub use crate::ToSwarm; pub use either::Either; pub use futures::prelude as futures; pub use libp2p_core::transport::ListenerId; @@ -106,11 +108,13 @@ pub mod derive_prelude { #[allow(deprecated)] pub use crate::connection::ConnectionLimit; +#[allow(deprecated)] +pub use behaviour::NetworkBehaviourAction; pub use behaviour::{ AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredExternalAddr, ExpiredListenAddr, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure, - ListenerClosed, ListenerError, NetworkBehaviour, NetworkBehaviourAction, NewExternalAddr, - NewListenAddr, NotifyHandler, PollParameters, + ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddr, NewListenAddr, NotifyHandler, + PollParameters, ToSwarm, }; #[allow(deprecated)] pub use connection::pool::{ConnectionCounters, ConnectionLimits}; @@ -699,7 +703,7 @@ where /// order in which addresses are used to connect to) as well as /// how long the address is retained in the list, depending on /// how frequently it is reported by the `NetworkBehaviour` via - /// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly + /// [`ToSwarm::ReportObservedAddr`] or explicitly /// through this method. pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult { let result = self.external_addrs.add(a.clone(), s); @@ -1188,13 +1192,11 @@ where fn handle_behaviour_event( &mut self, - event: NetworkBehaviourAction>, + event: ToSwarm>, ) -> Option>> { match event { - NetworkBehaviourAction::GenerateEvent(event) => { - return Some(SwarmEvent::Behaviour(event)) - } - NetworkBehaviourAction::Dial { opts } => { + ToSwarm::GenerateEvent(event) => return Some(SwarmEvent::Behaviour(event)), + ToSwarm::Dial { opts } => { let peer_id = opts.get_or_parse_peer_id(); if let Ok(()) = self.dial(opts) { if let Ok(Some(peer_id)) = peer_id { @@ -1202,7 +1204,7 @@ where } } } - NetworkBehaviourAction::NotifyHandler { + ToSwarm::NotifyHandler { peer_id, handler, event, @@ -1221,7 +1223,7 @@ where self.pending_event = Some((peer_id, handler, event)); } - NetworkBehaviourAction::ReportObservedAddr { address, score } => { + ToSwarm::ReportObservedAddr { address, score } => { // Maps the given `observed_addr`, representing an address of the local // node observed by a remote peer, onto the locally known listen addresses // to yield one or more addresses of the local node that may be publicly @@ -1251,7 +1253,7 @@ where self.add_external_address(addr, score); } } - NetworkBehaviourAction::CloseConnection { + ToSwarm::CloseConnection { peer_id, connection, } => match connection { @@ -2314,7 +2316,7 @@ mod tests { /// Establishes multiple connections between two peers, /// after which one peer disconnects the other - /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. + /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`]. /// /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`] /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] @@ -2352,12 +2354,14 @@ mod tests { if reconnected { return Poll::Ready(()); } - swarm2.behaviour.inner().next_action.replace( - NetworkBehaviourAction::CloseConnection { + swarm2 + .behaviour + .inner() + .next_action + .replace(ToSwarm::CloseConnection { peer_id: swarm1_id, connection: CloseConnection::All, - }, - ); + }); state = State::Disconnecting; continue; } @@ -2382,7 +2386,7 @@ mod tests { /// Establishes multiple connections between two peers, /// after which one peer closes a single connection - /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. + /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`]. /// /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`] /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] @@ -2421,7 +2425,7 @@ mod tests { let conn_id = swarm2.behaviour.on_connection_established[num_connections / 2].1; swarm2.behaviour.inner().next_action.replace( - NetworkBehaviourAction::CloseConnection { + ToSwarm::CloseConnection { peer_id: swarm1_id, connection: CloseConnection::One(conn_id), }, diff --git a/swarm/src/test.rs b/swarm/src/test.rs index caf4fb99..ab5262ad 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -23,8 +23,8 @@ use crate::behaviour::{ FromSwarm, ListenerClosed, ListenerError, NewExternalAddr, NewListenAddr, NewListener, }; use crate::{ - ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, - PollParameters, THandler, THandlerInEvent, THandlerOutEvent, + ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, PollParameters, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p_core::{multiaddr::Multiaddr, transport::ListenerId, ConnectedPoint, Endpoint}; use libp2p_identity::PeerId; @@ -48,7 +48,7 @@ where /// The next action to return from `poll`. /// /// An action is only returned once. - pub next_action: Option>, + pub next_action: Option>, } impl MockBehaviour @@ -114,7 +114,7 @@ where &mut self, _: &mut Context, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { self.next_action.take().map_or(Poll::Pending, Poll::Ready) } @@ -579,7 +579,7 @@ where &mut self, cx: &mut Context, args: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { self.poll += 1; self.inner.poll(cx, args) } diff --git a/swarm/tests/swarm_derive.rs b/swarm/tests/swarm_derive.rs index c35101b7..485e8b46 100644 --- a/swarm/tests/swarm_derive.rs +++ b/swarm/tests/swarm_derive.rs @@ -457,7 +457,7 @@ fn multiple_behaviour_attributes() { #[test] fn custom_out_event_no_type_parameters() { use libp2p_identity::PeerId; - use libp2p_swarm::{ConnectionId, NetworkBehaviourAction, PollParameters}; + use libp2p_swarm::{ConnectionId, PollParameters, ToSwarm}; use std::task::Context; use std::task::Poll; @@ -502,7 +502,7 @@ fn custom_out_event_no_type_parameters() { &mut self, _ctx: &mut Context, _: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { Poll::Pending }