feat(swarm): rename NetworkBehaviourAction to ToSwarm

Resolves #3123.

Pull-Request: #3658.
This commit is contained in:
Thomas Eizinger
2023-03-24 14:43:49 +01:00
committed by GitHub
parent 7ffa63b656
commit dcbc04e89e
32 changed files with 698 additions and 807 deletions

View File

@ -310,7 +310,7 @@ request without having to guess.
When accepting a **command** that eventually results in a response through an event require that When accepting a **command** that eventually results in a response through an event require that
command to contain a unique ID, which is later on contained in the asynchronous response event. One command to contain a unique ID, which is later on contained in the asynchronous response event. One
such example is the `Swarm` accepting a `NetworkBehaviourAction::Dial` from the `NetworkBehaviour`. such example is the `Swarm` accepting a `ToSwarm::Dial` from the `NetworkBehaviour`.
``` rust ``` rust
struct Command { struct Command {

View File

@ -65,7 +65,7 @@ use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::{ use libp2p_swarm::{
dummy, CloseConnection, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, dummy, CloseConnection, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use std::collections::{HashSet, VecDeque}; use std::collections::{HashSet, VecDeque};
use std::fmt; use std::fmt;
@ -261,9 +261,9 @@ where
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(peer) = self.close_connections.pop_front() { if let Some(peer) = self.close_connections.pop_front() {
return Poll::Ready(NetworkBehaviourAction::CloseConnection { return Poll::Ready(ToSwarm::CloseConnection {
peer_id: peer, peer_id: peer,
connection: CloseConnection::All, connection: CloseConnection::All,
}); });

View File

@ -22,7 +22,7 @@ use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::{ use libp2p_swarm::{
dummy, ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, dummy, ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fmt; use std::fmt;
@ -355,7 +355,7 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
Poll::Pending Poll::Pending
} }
} }

View File

@ -39,7 +39,7 @@ use libp2p_swarm::{
ExpiredListenAddr, FromSwarm, ExpiredListenAddr, FromSwarm,
}, },
ConnectionDenied, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour, ConnectionDenied, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
@ -208,9 +208,7 @@ pub struct Behaviour {
last_probe: Option<Instant>, last_probe: Option<Instant>,
pending_actions: VecDeque< pending_actions: VecDeque<ToSwarm<<Self as NetworkBehaviour>::OutEvent, THandlerInEvent<Self>>>,
NetworkBehaviourAction<<Self as NetworkBehaviour>::OutEvent, THandlerInEvent<Self>>,
>,
probe_id: ProbeId, probe_id: ProbeId,
@ -336,9 +334,7 @@ impl Behaviour {
} => { } => {
if let Some(event) = self.as_server().on_outbound_connection(&peer, address) { if let Some(event) = self.as_server().on_outbound_connection(&peer, address) {
self.pending_actions self.pending_actions
.push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( .push_back(ToSwarm::GenerateEvent(Event::InboundProbe(event)));
event,
)));
} }
} }
ConnectedPoint::Dialer { ConnectedPoint::Dialer {
@ -399,9 +395,7 @@ impl Behaviour {
})); }));
if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) { if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) {
self.pending_actions self.pending_actions
.push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( .push_back(ToSwarm::GenerateEvent(Event::InboundProbe(event)));
event,
)));
} }
} }
@ -441,7 +435,7 @@ impl NetworkBehaviour for Behaviour {
} }
match self.inner.poll(cx, params) { match self.inner.poll(cx, params) {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { Poll::Ready(ToSwarm::GenerateEvent(event)) => {
let actions = match event { let actions = match event {
request_response::Event::Message { request_response::Event::Message {
message: request_response::Message::Response { .. }, message: request_response::Message::Response { .. },
@ -474,9 +468,7 @@ impl NetworkBehaviour for Behaviour {
match self.as_client().poll_auto_probe(cx) { match self.as_client().poll_auto_probe(cx) {
Poll::Ready(event) => { Poll::Ready(event) => {
self.pending_actions self.pending_actions
.push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( .push_back(ToSwarm::GenerateEvent(Event::OutboundProbe(event)));
event,
)));
continue; continue;
} }
Poll::Pending => {} Poll::Pending => {}
@ -601,8 +593,7 @@ impl NetworkBehaviour for Behaviour {
} }
} }
type Action = type Action = ToSwarm<<Behaviour as NetworkBehaviour>::OutEvent, THandlerInEvent<Behaviour>>;
NetworkBehaviourAction<<Behaviour as NetworkBehaviour>::OutEvent, THandlerInEvent<Behaviour>>;
// Trait implemented for `AsClient` and `AsServer` to handle events from the inner [`request_response::Behaviour`] Protocol. // Trait implemented for `AsClient` and `AsServer` to handle events from the inner [`request_response::Behaviour`] Protocol.
trait HandleInnerEvent { trait HandleInnerEvent {

View File

@ -31,8 +31,7 @@ use libp2p_core::Multiaddr;
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; use libp2p_request_response::{self as request_response, OutboundFailure, RequestId};
use libp2p_swarm::{ use libp2p_swarm::{
AddressScore, ConnectionId, ExternalAddresses, ListenAddresses, NetworkBehaviourAction, AddressScore, ConnectionId, ExternalAddresses, ListenAddresses, PollParameters, ToSwarm,
PollParameters,
}; };
use rand::{seq::SliceRandom, thread_rng}; use rand::{seq::SliceRandom, thread_rng};
use std::{ use std::{
@ -143,17 +142,13 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
let mut actions = VecDeque::with_capacity(3); let mut actions = VecDeque::with_capacity(3);
actions.push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( actions.push_back(ToSwarm::GenerateEvent(Event::OutboundProbe(event)));
event,
)));
if let Some(old) = self.handle_reported_status(response.result.clone().into()) { if let Some(old) = self.handle_reported_status(response.result.clone().into()) {
actions.push_back(NetworkBehaviourAction::GenerateEvent( actions.push_back(ToSwarm::GenerateEvent(Event::StatusChanged {
Event::StatusChanged { old,
old, new: self.nat_status.clone(),
new: self.nat_status.clone(), }));
},
));
} }
if let Ok(address) = response.result { if let Ok(address) = response.result {
@ -165,7 +160,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
.find_map(|r| (r.addr == address).then_some(r.score)) .find_map(|r| (r.addr == address).then_some(r.score))
.unwrap_or(AddressScore::Finite(0)); .unwrap_or(AddressScore::Finite(0));
if let AddressScore::Finite(finite_score) = score { if let AddressScore::Finite(finite_score) = score {
actions.push_back(NetworkBehaviourAction::ReportObservedAddr { actions.push_back(ToSwarm::ReportObservedAddr {
address, address,
score: AddressScore::Finite(finite_score + 1), score: AddressScore::Finite(finite_score + 1),
}); });
@ -191,7 +186,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> {
self.schedule_probe.reset(Duration::ZERO); self.schedule_probe.reset(Duration::ZERO);
VecDeque::from([NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( VecDeque::from([ToSwarm::GenerateEvent(Event::OutboundProbe(
OutboundProbeEvent::Error { OutboundProbeEvent::Error {
probe_id, probe_id,
peer: Some(peer), peer: Some(peer),

View File

@ -30,7 +30,7 @@ use libp2p_request_response::{
}; };
use libp2p_swarm::{ use libp2p_swarm::{
dial_opts::{DialOpts, PeerCondition}, dial_opts::{DialOpts, PeerCondition},
ConnectionId, DialError, NetworkBehaviourAction, PollParameters, ConnectionId, DialError, PollParameters, ToSwarm,
}; };
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
@ -124,14 +124,14 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
self.throttled_clients.push((peer, Instant::now())); self.throttled_clients.push((peer, Instant::now()));
VecDeque::from([ VecDeque::from([
NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( ToSwarm::GenerateEvent(Event::InboundProbe(
InboundProbeEvent::Request { InboundProbeEvent::Request {
probe_id, probe_id,
peer, peer,
addresses: addrs.clone(), addresses: addrs.clone(),
}, },
)), )),
NetworkBehaviourAction::Dial { ToSwarm::Dial {
opts: DialOpts::peer_id(peer) opts: DialOpts::peer_id(peer)
.condition(PeerCondition::Always) .condition(PeerCondition::Always)
.override_dial_concurrency_factor( .override_dial_concurrency_factor(
@ -155,13 +155,13 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
}; };
let _ = self.inner.send_response(channel, response); let _ = self.inner.send_response(channel, response);
VecDeque::from([NetworkBehaviourAction::GenerateEvent( VecDeque::from([ToSwarm::GenerateEvent(Event::InboundProbe(
Event::InboundProbe(InboundProbeEvent::Error { InboundProbeEvent::Error {
probe_id, probe_id,
peer, peer,
error: InboundProbeError::Response(error), error: InboundProbeError::Response(error),
}), },
)]) ))])
} }
} }
} }
@ -183,7 +183,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> {
_ => self.probe_id.next(), _ => self.probe_id.next(),
}; };
VecDeque::from([NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( VecDeque::from([ToSwarm::GenerateEvent(Event::InboundProbe(
InboundProbeEvent::Error { InboundProbeEvent::Error {
probe_id, probe_id,
peer, peer,

View File

@ -30,8 +30,8 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailu
use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::dial_opts::{self, DialOpts};
use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent}; use libp2p_swarm::{dummy, ConnectionDenied, ConnectionId, THandler, THandlerOutEvent};
use libp2p_swarm::{ use libp2p_swarm::{
ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters,
NotifyHandler, PollParameters, THandlerInEvent, THandlerInEvent, ToSwarm,
}; };
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -70,9 +70,7 @@ pub enum Error {
pub struct Behaviour { pub struct Behaviour {
/// Queue of actions to return when polled. /// Queue of actions to return when polled.
queued_events: VecDeque< queued_events: VecDeque<ToSwarm<Event, Either<handler::relayed::Command, Either<Void, Void>>>>,
NetworkBehaviourAction<Event, Either<handler::relayed::Command, Either<Void, Void>>>,
>,
/// All direct (non-relayed) connections. /// All direct (non-relayed) connections.
direct_connections: HashMap<PeerId, HashSet<ConnectionId>>, direct_connections: HashMap<PeerId, HashSet<ConnectionId>>,
@ -130,7 +128,7 @@ impl Behaviour {
// //
// https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol
self.queued_events.extend([ self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(connection_id), handler: NotifyHandler::One(connection_id),
event: Either::Left(handler::relayed::Command::Connect { event: Either::Left(handler::relayed::Command::Connect {
@ -138,15 +136,13 @@ impl Behaviour {
attempt: 1, attempt: 1,
}), }),
}, },
NetworkBehaviourAction::GenerateEvent( ToSwarm::GenerateEvent(Event::InitiatedDirectConnectionUpgrade {
Event::InitiatedDirectConnectionUpgrade { remote_peer_id: peer_id,
remote_peer_id: peer_id, local_relayed_addr: match connected_point {
local_relayed_addr: match connected_point { ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(),
ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(), ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."),
ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."),
},
}, },
), }),
]); ]);
} }
} else { } else {
@ -190,23 +186,22 @@ impl Behaviour {
}; };
if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { handler: NotifyHandler::One(relayed_connection_id),
handler: NotifyHandler::One(relayed_connection_id), peer_id,
peer_id, event: Either::Left(handler::relayed::Command::Connect {
event: Either::Left(handler::relayed::Command::Connect { attempt: attempt + 1,
attempt: attempt + 1, obs_addrs: self.observed_addreses(),
obs_addrs: self.observed_addreses(), }),
}), })
})
} else { } else {
self.queued_events.extend([ self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(relayed_connection_id), handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive), event: Either::Left(handler::relayed::Command::UpgradeFinishedDontKeepAlive),
}, },
NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeFailed {
remote_peer_id: peer_id, remote_peer_id: peer_id,
error: Error::Dial, error: Error::Dial,
}), }),
@ -341,7 +336,7 @@ impl NetworkBehaviour for Behaviour {
remote_addr, remote_addr,
}) => { }) => {
self.queued_events.extend([ self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
handler: NotifyHandler::One(relayed_connection_id), handler: NotifyHandler::One(relayed_connection_id),
peer_id: event_source, peer_id: event_source,
event: Either::Left(handler::relayed::Command::AcceptInboundConnect { event: Either::Left(handler::relayed::Command::AcceptInboundConnect {
@ -349,22 +344,19 @@ impl NetworkBehaviour for Behaviour {
obs_addrs: self.observed_addreses(), obs_addrs: self.observed_addreses(),
}), }),
}, },
NetworkBehaviourAction::GenerateEvent( ToSwarm::GenerateEvent(Event::RemoteInitiatedDirectConnectionUpgrade {
Event::RemoteInitiatedDirectConnectionUpgrade { remote_peer_id: event_source,
remote_peer_id: event_source, remote_relayed_addr: remote_addr,
remote_relayed_addr: remote_addr, }),
},
),
]); ]);
} }
Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( Event::DirectConnectionUpgradeFailed {
Event::DirectConnectionUpgradeFailed { remote_peer_id: event_source,
remote_peer_id: event_source, error: Error::Handler(error),
error: Error::Handler(error), },
}, ));
));
} }
Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => {
let opts = DialOpts::peer_id(event_source) let opts = DialOpts::peer_id(event_source)
@ -376,17 +368,15 @@ impl NetworkBehaviour for Behaviour {
self.direct_to_relayed_connections self.direct_to_relayed_connections
.insert(maybe_direct_connection_id, relayed_connection_id); .insert(maybe_direct_connection_id, relayed_connection_id);
self.queued_events self.queued_events.push_back(ToSwarm::Dial { opts });
.push_back(NetworkBehaviourAction::Dial { opts });
} }
Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( Event::DirectConnectionUpgradeFailed {
Event::DirectConnectionUpgradeFailed { remote_peer_id: event_source,
remote_peer_id: event_source, error: Error::Handler(error),
error: Error::Handler(error), },
}, ));
));
} }
Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => { Either::Left(handler::relayed::Event::OutboundConnectNegotiated { remote_addrs }) => {
let opts = DialOpts::peer_id(event_source) let opts = DialOpts::peer_id(event_source)
@ -403,23 +393,20 @@ impl NetworkBehaviour for Behaviour {
.outgoing_direct_connection_attempts .outgoing_direct_connection_attempts
.entry((relayed_connection_id, event_source)) .entry((relayed_connection_id, event_source))
.or_default() += 1; .or_default() += 1;
self.queued_events self.queued_events.push_back(ToSwarm::Dial { opts });
.push_back(NetworkBehaviourAction::Dial { opts });
} }
Either::Right(Either::Left(handler::direct::Event::DirectConnectionEstablished)) => { Either::Right(Either::Left(handler::direct::Event::DirectConnectionEstablished)) => {
self.queued_events.extend([ self.queued_events.extend([
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id: event_source, peer_id: event_source,
handler: NotifyHandler::One(relayed_connection_id), handler: NotifyHandler::One(relayed_connection_id),
event: Either::Left( event: Either::Left(
handler::relayed::Command::UpgradeFinishedDontKeepAlive, handler::relayed::Command::UpgradeFinishedDontKeepAlive,
), ),
}, },
NetworkBehaviourAction::GenerateEvent( ToSwarm::GenerateEvent(Event::DirectConnectionUpgradeSucceeded {
Event::DirectConnectionUpgradeSucceeded { remote_peer_id: event_source,
remote_peer_id: event_source, }),
},
),
]); ]);
} }
Either::Right(Either::Right(never)) => void::unreachable(never), Either::Right(Either::Right(never)) => void::unreachable(never),
@ -430,7 +417,7 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.queued_events.pop_front() { if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event); return Poll::Ready(event);
} }

View File

@ -30,8 +30,8 @@ use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
use libp2p_swarm::{ use libp2p_swarm::{
dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
NotifyHandler, OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use log::warn; use log::warn;
use smallvec::SmallVec; use smallvec::SmallVec;
@ -42,7 +42,7 @@ use std::{collections::VecDeque, iter};
/// Network behaviour that handles the floodsub protocol. /// Network behaviour that handles the floodsub protocol.
pub struct Floodsub { pub struct Floodsub {
/// Events that need to be yielded to the outside when polling. /// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<FloodsubEvent, FloodsubRpc>>, events: VecDeque<ToSwarm<FloodsubEvent, FloodsubRpc>>,
config: FloodsubConfig, config: FloodsubConfig,
@ -87,23 +87,22 @@ impl Floodsub {
// Send our topics to this node if we're already connected to it. // Send our topics to this node if we're already connected to it.
if self.connected_peers.contains_key(&peer_id) { if self.connected_peers.contains_key(&peer_id) {
for topic in self.subscribed_topics.iter().cloned() { for topic in self.subscribed_topics.iter().cloned() {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id,
peer_id, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, event: FloodsubRpc {
event: FloodsubRpc { messages: Vec::new(),
messages: Vec::new(), subscriptions: vec![FloodsubSubscription {
subscriptions: vec![FloodsubSubscription { topic,
topic, action: FloodsubSubscriptionAction::Subscribe,
action: FloodsubSubscriptionAction::Subscribe, }],
}], },
}, });
});
} }
} }
if self.target_peers.insert(peer_id) { if self.target_peers.insert(peer_id) {
self.events.push_back(NetworkBehaviourAction::Dial { self.events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id).build(), opts: DialOpts::peer_id(peer_id).build(),
}); });
} }
@ -124,18 +123,17 @@ impl Floodsub {
} }
for peer in self.connected_peers.keys() { for peer in self.connected_peers.keys() {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: *peer,
peer_id: *peer, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, event: FloodsubRpc {
event: FloodsubRpc { messages: Vec::new(),
messages: Vec::new(), subscriptions: vec![FloodsubSubscription {
subscriptions: vec![FloodsubSubscription { topic: topic.clone(),
topic: topic.clone(), action: FloodsubSubscriptionAction::Subscribe,
action: FloodsubSubscriptionAction::Subscribe, }],
}], },
}, });
});
} }
self.subscribed_topics.push(topic); self.subscribed_topics.push(topic);
@ -156,18 +154,17 @@ impl Floodsub {
self.subscribed_topics.remove(pos); self.subscribed_topics.remove(pos);
for peer in self.connected_peers.keys() { for peer in self.connected_peers.keys() {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: *peer,
peer_id: *peer, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, event: FloodsubRpc {
event: FloodsubRpc { messages: Vec::new(),
messages: Vec::new(), subscriptions: vec![FloodsubSubscription {
subscriptions: vec![FloodsubSubscription { topic: topic.clone(),
topic: topic.clone(), action: FloodsubSubscriptionAction::Unsubscribe,
action: FloodsubSubscriptionAction::Unsubscribe, }],
}], },
}, });
});
} }
true true
@ -233,9 +230,10 @@ impl Floodsub {
); );
} }
if self.config.subscribe_local_messages { if self.config.subscribe_local_messages {
self.events.push_back(NetworkBehaviourAction::GenerateEvent( self.events
FloodsubEvent::Message(message.clone()), .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Message(
)); message.clone(),
)));
} }
} }
// Don't publish the message if we have to check subscriptions // Don't publish the message if we have to check subscriptions
@ -259,15 +257,14 @@ impl Floodsub {
continue; continue;
} }
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: *peer_id,
peer_id: *peer_id, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, event: FloodsubRpc {
event: FloodsubRpc { subscriptions: Vec::new(),
subscriptions: Vec::new(), messages: vec![message.clone()],
messages: vec![message.clone()], },
}, });
});
} }
} }
@ -287,18 +284,17 @@ impl Floodsub {
// We need to send our subscriptions to the newly-connected node. // We need to send our subscriptions to the newly-connected node.
if self.target_peers.contains(&peer_id) { if self.target_peers.contains(&peer_id) {
for topic in self.subscribed_topics.iter().cloned() { for topic in self.subscribed_topics.iter().cloned() {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id,
peer_id, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, event: FloodsubRpc {
event: FloodsubRpc { messages: Vec::new(),
messages: Vec::new(), subscriptions: vec![FloodsubSubscription {
subscriptions: vec![FloodsubSubscription { topic,
topic, action: FloodsubSubscriptionAction::Subscribe,
action: FloodsubSubscriptionAction::Subscribe, }],
}], },
}, });
});
} }
} }
@ -324,7 +320,7 @@ impl Floodsub {
// We can be disconnected by the remote in case of inactivity for example, so we always // We can be disconnected by the remote in case of inactivity for example, so we always
// try to reconnect. // try to reconnect.
if self.target_peers.contains(&peer_id) { if self.target_peers.contains(&peer_id) {
self.events.push_back(NetworkBehaviourAction::Dial { self.events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id).build(), opts: DialOpts::peer_id(peer_id).build(),
}); });
} }
@ -377,12 +373,11 @@ impl NetworkBehaviour for Floodsub {
if !remote_peer_topics.contains(&subscription.topic) { if !remote_peer_topics.contains(&subscription.topic) {
remote_peer_topics.push(subscription.topic.clone()); remote_peer_topics.push(subscription.topic.clone());
} }
self.events.push_back(NetworkBehaviourAction::GenerateEvent( self.events
FloodsubEvent::Subscribed { .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Subscribed {
peer_id: propagation_source, peer_id: propagation_source,
topic: subscription.topic, topic: subscription.topic,
}, }));
));
} }
FloodsubSubscriptionAction::Unsubscribe => { FloodsubSubscriptionAction::Unsubscribe => {
if let Some(pos) = remote_peer_topics if let Some(pos) = remote_peer_topics
@ -391,12 +386,11 @@ impl NetworkBehaviour for Floodsub {
{ {
remote_peer_topics.remove(pos); remote_peer_topics.remove(pos);
} }
self.events.push_back(NetworkBehaviourAction::GenerateEvent( self.events
FloodsubEvent::Unsubscribed { .push_back(ToSwarm::GenerateEvent(FloodsubEvent::Unsubscribed {
peer_id: propagation_source, peer_id: propagation_source,
topic: subscription.topic, topic: subscription.topic,
}, }));
));
} }
} }
} }
@ -427,8 +421,7 @@ impl NetworkBehaviour for Floodsub {
.any(|t| message.topics.iter().any(|u| t == u)) .any(|t| message.topics.iter().any(|u| t == u))
{ {
let event = FloodsubEvent::Message(message.clone()); let event = FloodsubEvent::Message(message.clone());
self.events self.events.push_back(ToSwarm::GenerateEvent(event));
.push_back(NetworkBehaviourAction::GenerateEvent(event));
} }
// Propagate the message to everyone else who is subscribed to any of the topics. // Propagate the message to everyone else who is subscribed to any of the topics.
@ -465,12 +458,11 @@ impl NetworkBehaviour for Floodsub {
} }
for (peer_id, rpc) in rpcs_to_dispatch { for (peer_id, rpc) in rpcs_to_dispatch {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id,
peer_id, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, event: rpc,
event: rpc, });
});
} }
} }
@ -478,7 +470,7 @@ impl NetworkBehaviour for Floodsub {
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Poll::Ready(event); return Poll::Ready(event);
} }

View File

@ -40,8 +40,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::{ use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm}, behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, FromSwarm},
dial_opts::DialOpts, dial_opts::DialOpts,
ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters, THandler,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use wasm_timer::Instant; use wasm_timer::Instant;
@ -218,7 +218,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
config: Config, config: Config,
/// Events that need to be yielded to the outside when polling. /// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<Event, HandlerIn>>, events: VecDeque<ToSwarm<Event, HandlerIn>>,
/// Pools non-urgent control messages between heartbeats. /// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<ControlAction>>, control_pool: HashMap<PeerId, Vec<ControlAction>>,
@ -1133,7 +1133,7 @@ where
if !self.peer_topics.contains_key(peer_id) { if !self.peer_topics.contains_key(peer_id) {
// Connect to peer // Connect to peer
debug!("Connecting to explicit peer {:?}", peer_id); debug!("Connecting to explicit peer {:?}", peer_id);
self.events.push_back(NetworkBehaviourAction::Dial { self.events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(*peer_id).build(), opts: DialOpts::peer_id(*peer_id).build(),
}); });
} }
@ -1632,7 +1632,7 @@ where
self.px_peers.insert(peer_id); self.px_peers.insert(peer_id);
// dial peer // dial peer
self.events.push_back(NetworkBehaviourAction::Dial { self.events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id).build(), opts: DialOpts::peer_id(peer_id).build(),
}); });
} }
@ -1818,7 +1818,7 @@ where
if self.mesh.contains_key(&message.topic) { if self.mesh.contains_key(&message.topic) {
debug!("Sending received message to user"); debug!("Sending received message to user");
self.events self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { .push_back(ToSwarm::GenerateEvent(Event::Message {
propagation_source: *propagation_source, propagation_source: *propagation_source,
message_id: msg_id.clone(), message_id: msg_id.clone(),
message, message,
@ -1994,12 +1994,10 @@ where
} }
} }
// generates a subscription event to be polled // generates a subscription event to be polled
application_event.push(NetworkBehaviourAction::GenerateEvent( application_event.push(ToSwarm::GenerateEvent(Event::Subscribed {
Event::Subscribed { peer_id: *propagation_source,
peer_id: *propagation_source, topic: topic_hash.clone(),
topic: topic_hash.clone(), }));
},
));
} }
SubscriptionAction::Unsubscribe => { SubscriptionAction::Unsubscribe => {
if peer_list.remove(propagation_source) { if peer_list.remove(propagation_source) {
@ -2014,12 +2012,10 @@ where
subscribed_topics.remove(topic_hash); subscribed_topics.remove(topic_hash);
unsubscribed_peers.push((*propagation_source, topic_hash.clone())); unsubscribed_peers.push((*propagation_source, topic_hash.clone()));
// generate an unsubscribe event to be polled // generate an unsubscribe event to be polled
application_event.push(NetworkBehaviourAction::GenerateEvent( application_event.push(ToSwarm::GenerateEvent(Event::Unsubscribed {
Event::Unsubscribed { peer_id: *propagation_source,
peer_id: *propagation_source, topic: topic_hash.clone(),
topic: topic_hash.clone(), }));
},
));
} }
} }
@ -2890,12 +2886,11 @@ where
let messages = self.fragment_message(message)?; let messages = self.fragment_message(message)?;
for message in messages { for message in messages {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id,
peer_id, event: HandlerIn::Message(message),
event: HandlerIn::Message(message), handler: NotifyHandler::Any,
handler: NotifyHandler::Any, })
})
} }
Ok(()) Ok(())
} }
@ -3150,12 +3145,11 @@ where
for topic in topics { for topic in topics {
if let Some(mesh_peers) = self.mesh.get(topic) { if let Some(mesh_peers) = self.mesh.get(topic) {
if mesh_peers.contains(&peer_id) { if mesh_peers.contains(&peer_id) {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id,
peer_id, event: HandlerIn::JoinedMesh,
event: HandlerIn::JoinedMesh, handler: NotifyHandler::One(connections.connections[0]),
handler: NotifyHandler::One(connections.connections[0]), });
});
break; break;
} }
} }
@ -3338,11 +3332,10 @@ where
"Peer does not support gossipsub protocols. {}", "Peer does not support gossipsub protocols. {}",
propagation_source propagation_source
); );
self.events.push_back(NetworkBehaviourAction::GenerateEvent( self.events
Event::GossipsubNotSupported { .push_back(ToSwarm::GenerateEvent(Event::GossipsubNotSupported {
peer_id: propagation_source, peer_id: propagation_source,
}, }));
));
} else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) { } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) {
// Only change the value if the old value is Floodsub (the default set in // Only change the value if the old value is Floodsub (the default set in
// `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished). // `NetworkBehaviour::on_event` with FromSwarm::ConnectionEstablished).
@ -3450,7 +3443,7 @@ where
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Poll::Ready(event); return Poll::Ready(event);
} }
@ -3499,7 +3492,7 @@ fn peer_added_to_mesh(
new_topics: Vec<&TopicHash>, new_topics: Vec<&TopicHash>,
mesh: &HashMap<TopicHash, BTreeSet<PeerId>>, mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
known_topics: Option<&BTreeSet<TopicHash>>, known_topics: Option<&BTreeSet<TopicHash>>,
events: &mut VecDeque<NetworkBehaviourAction<Event, HandlerIn>>, events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
connections: &HashMap<PeerId, PeerConnections>, connections: &HashMap<PeerId, PeerConnections>,
) { ) {
// Ensure there is an active connection // Ensure there is an active connection
@ -3525,7 +3518,7 @@ fn peer_added_to_mesh(
} }
} }
// This is the first mesh the peer has joined, inform the handler // This is the first mesh the peer has joined, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler { events.push_back(ToSwarm::NotifyHandler {
peer_id, peer_id,
event: HandlerIn::JoinedMesh, event: HandlerIn::JoinedMesh,
handler: NotifyHandler::One(connection_id), handler: NotifyHandler::One(connection_id),
@ -3540,7 +3533,7 @@ fn peer_removed_from_mesh(
old_topic: &TopicHash, old_topic: &TopicHash,
mesh: &HashMap<TopicHash, BTreeSet<PeerId>>, mesh: &HashMap<TopicHash, BTreeSet<PeerId>>,
known_topics: Option<&BTreeSet<TopicHash>>, known_topics: Option<&BTreeSet<TopicHash>>,
events: &mut VecDeque<NetworkBehaviourAction<Event, HandlerIn>>, events: &mut VecDeque<ToSwarm<Event, HandlerIn>>,
connections: &HashMap<PeerId, PeerConnections>, connections: &HashMap<PeerId, PeerConnections>,
) { ) {
// Ensure there is an active connection // Ensure there is an active connection
@ -3564,7 +3557,7 @@ fn peer_removed_from_mesh(
} }
} }
// The peer is not in any other mesh, inform the handler // The peer is not in any other mesh, inform the handler
events.push_back(NetworkBehaviourAction::NotifyHandler { events.push_back(ToSwarm::NotifyHandler {
peer_id, peer_id,
event: HandlerIn::LeftMesh, event: HandlerIn::LeftMesh,
handler: NotifyHandler::One(*connection_id), handler: NotifyHandler::One(*connection_id),

View File

@ -412,7 +412,7 @@ fn test_subscribe() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_subscriptions, e| match e { .fold(vec![], |mut collected_subscriptions, e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
event: HandlerIn::Message(ref message), event: HandlerIn::Message(ref message),
.. ..
} => { } => {
@ -480,7 +480,7 @@ fn test_unsubscribe() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_subscriptions, e| match e { .fold(vec![], |mut collected_subscriptions, e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
event: HandlerIn::Message(ref message), event: HandlerIn::Message(ref message),
.. ..
} => { } => {
@ -668,7 +668,7 @@ fn test_publish_without_flood_publishing() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
event: HandlerIn::Message(ref message), event: HandlerIn::Message(ref message),
.. ..
} => { } => {
@ -758,7 +758,7 @@ fn test_fanout() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
event: HandlerIn::Message(ref message), event: HandlerIn::Message(ref message),
.. ..
} => { } => {
@ -811,7 +811,7 @@ fn test_inject_connected() {
.events .events
.iter() .iter()
.filter(|e| match e { .filter(|e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
event: HandlerIn::Message(ref m), event: HandlerIn::Message(ref m),
.. ..
} => !m.subscriptions.is_empty(), } => !m.subscriptions.is_empty(),
@ -821,7 +821,7 @@ fn test_inject_connected() {
// check that there are two subscriptions sent to each peer // check that there are two subscriptions sent to each peer
for sevent in send_events.clone() { for sevent in send_events.clone() {
if let NetworkBehaviourAction::NotifyHandler { if let ToSwarm::NotifyHandler {
event: HandlerIn::Message(ref m), event: HandlerIn::Message(ref m),
.. ..
} = sevent } = sevent
@ -1054,7 +1054,7 @@ fn test_handle_iwant_msg_cached() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_messages, e| match e { .fold(vec![], |mut collected_messages, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => { ToSwarm::NotifyHandler { event, .. } => {
if let HandlerIn::Message(ref m) = event { if let HandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for c in &event.messages { for c in &event.messages {
@ -1112,7 +1112,7 @@ fn test_handle_iwant_msg_cached_shifted() {
// is the message is being sent? // is the message is being sent?
let message_exists = gs.events.iter().any(|e| match e { let message_exists = gs.events.iter().any(|e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
event: HandlerIn::Message(ref m), event: HandlerIn::Message(ref m),
.. ..
} => { } => {
@ -1353,7 +1353,7 @@ fn count_control_msgs<D: DataTransform, F: TopicSubscriptionFilter>(
+ gs.events + gs.events
.iter() .iter()
.map(|e| match e { .map(|e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
event: HandlerIn::Message(ref m), event: HandlerIn::Message(ref m),
.. ..
@ -1394,7 +1394,7 @@ fn test_explicit_peer_gets_connected() {
.events .events
.iter() .iter()
.filter(|e| match e { .filter(|e| match e {
NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(peer), ToSwarm::Dial { opts } => opts.get_peer_id() == Some(peer),
_ => false, _ => false,
}) })
.count(); .count();
@ -1435,7 +1435,7 @@ fn test_explicit_peer_reconnects() {
gs.events gs.events
.iter() .iter()
.filter(|e| match e { .filter(|e| match e {
NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(*peer), ToSwarm::Dial { opts } => opts.get_peer_id() == Some(*peer),
_ => false, _ => false,
}) })
.count(), .count(),
@ -1450,7 +1450,7 @@ fn test_explicit_peer_reconnects() {
gs.events gs.events
.iter() .iter()
.filter(|e| match e { .filter(|e| match e {
NetworkBehaviourAction::Dial { opts } => opts.get_peer_id() == Some(*peer), ToSwarm::Dial { opts } => opts.get_peer_id() == Some(*peer),
_ => false, _ => false,
}) })
.count() .count()
@ -1574,7 +1574,7 @@ fn do_forward_messages_to_explicit_peers() {
gs.events gs.events
.iter() .iter()
.filter(|e| match e { .filter(|e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
event: HandlerIn::Message(ref m), event: HandlerIn::Message(ref m),
.. ..
@ -1830,7 +1830,7 @@ fn test_connect_to_px_peers_on_handle_prune() {
.events .events
.iter() .iter()
.filter_map(|e| match e { .filter_map(|e| match e {
NetworkBehaviourAction::Dial { opts } => opts.get_peer_id(), ToSwarm::Dial { opts } => opts.get_peer_id(),
_ => None, _ => None,
}) })
.collect(); .collect();
@ -2122,7 +2122,7 @@ fn test_flood_publish() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, .. } => { ToSwarm::NotifyHandler { event, .. } => {
if let HandlerIn::Message(ref m) = event { if let HandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for s in &event.messages { for s in &event.messages {
@ -2488,7 +2488,7 @@ fn test_ignore_px_from_negative_scored_peer() {
assert_eq!( assert_eq!(
gs.events gs.events
.iter() .iter()
.filter(|e| matches!(e, NetworkBehaviourAction::Dial { .. })) .filter(|e| matches!(e, ToSwarm::Dial { .. }))
.count(), .count(),
0 0
); );
@ -2683,7 +2683,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_messages, e| match e { .fold(vec![], |mut collected_messages, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { ToSwarm::NotifyHandler { event, peer_id, .. } => {
if let HandlerIn::Message(ref m) = event { if let HandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for c in &event.messages { for c in &event.messages {
@ -2831,7 +2831,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { ToSwarm::NotifyHandler { event, peer_id, .. } => {
if let HandlerIn::Message(ref m) = event { if let HandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for s in &event.messages { for s in &event.messages {
@ -2888,7 +2888,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { ToSwarm::NotifyHandler { event, peer_id, .. } => {
if let HandlerIn::Message(ref m) = event { if let HandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
for s in &event.messages { for s in &event.messages {
@ -3012,7 +3012,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {
assert_eq!(gs.events.len(), 1); assert_eq!(gs.events.len(), 1);
assert!(matches!( assert!(matches!(
gs.events[0], gs.events[0],
NetworkBehaviourAction::GenerateEvent(Event::Subscribed { .. }) ToSwarm::GenerateEvent(Event::Subscribed { .. })
)); ));
let control_action = ControlAction::IHave { let control_action = ControlAction::IHave {
@ -3078,7 +3078,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() {
assert_eq!( assert_eq!(
gs.events gs.events
.iter() .iter()
.filter(|e| matches!(e, NetworkBehaviourAction::Dial { .. })) .filter(|e| matches!(e, ToSwarm::Dial { .. }))
.count(), .count(),
0 0
); );
@ -3100,7 +3100,7 @@ fn test_ignore_px_from_peers_below_accept_px_threshold() {
assert!( assert!(
gs.events gs.events
.iter() .iter()
.filter(|e| matches!(e, NetworkBehaviourAction::Dial { .. })) .filter(|e| matches!(e, ToSwarm::Dial { .. }))
.count() .count()
> 0 > 0
); );
@ -4413,7 +4413,7 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() {
gs.events gs.events
.iter() .iter()
.map(|e| match e { .map(|e| match e {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
event: HandlerIn::Message(ref m), event: HandlerIn::Message(ref m),
.. ..
} => { } => {
@ -4819,7 +4819,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { ToSwarm::NotifyHandler { peer_id, event, .. } => {
if peer_id == &p1 || peer_id == &p2 { if peer_id == &p1 || peer_id == &p2 {
if let HandlerIn::Message(ref m) = event { if let HandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
@ -4876,7 +4876,7 @@ fn test_do_not_use_floodsub_in_fanout() {
.events .events
.iter() .iter()
.fold(vec![], |mut collected_publish, e| match e { .fold(vec![], |mut collected_publish, e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { ToSwarm::NotifyHandler { peer_id, event, .. } => {
if peer_id == &p1 || peer_id == &p2 { if peer_id == &p1 || peer_id == &p2 {
if let HandlerIn::Message(ref m) = event { if let HandlerIn::Message(ref m) = event {
let event = proto_to_message(m); let event = proto_to_message(m);
@ -5190,7 +5190,7 @@ fn test_subscribe_and_graft_with_negative_score() {
let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, gs2: &mut Behaviour<_, _>| { let forward_messages_to_p1 = |gs1: &mut Behaviour<_, _>, gs2: &mut Behaviour<_, _>| {
//collect messages to p1 //collect messages to p1
let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e {
NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { ToSwarm::NotifyHandler { peer_id, event, .. } => {
if peer_id == p1 { if peer_id == p1 {
if let HandlerIn::Message(m) = event { if let HandlerIn::Message(m) = event {
Some(m) Some(m)

View File

@ -26,8 +26,8 @@ use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{ use libp2p_swarm::{
dial_opts::DialOpts, AddressScore, ConnectionDenied, ConnectionHandlerUpgrErr, DialError, dial_opts::DialOpts, AddressScore, ConnectionDenied, ConnectionHandlerUpgrErr, DialError,
ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, ExternalAddresses, ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters,
PollParameters, THandlerInEvent, THandlerInEvent, ToSwarm,
}; };
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent}; use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
use lru::LruCache; use lru::LruCache;
@ -44,7 +44,7 @@ use std::{
/// about them, and answers identify queries from other nodes. /// about them, and answers identify queries from other nodes.
/// ///
/// All external addresses of the local node supposedly observed by remotes /// All external addresses of the local node supposedly observed by remotes
/// are reported via [`NetworkBehaviourAction::ReportObservedAddr`] with a /// are reported via [`ToSwarm::ReportObservedAddr`] with a
/// [score](AddressScore) of `1`. /// [score](AddressScore) of `1`.
pub struct Behaviour { pub struct Behaviour {
config: Config, config: Config,
@ -55,7 +55,7 @@ pub struct Behaviour {
/// with current information about the local peer. /// with current information about the local peer.
requests: Vec<Request>, requests: Vec<Request>,
/// Pending events to be emitted when polled. /// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<Event, InEvent>>, events: VecDeque<ToSwarm<Event, InEvent>>,
/// The addresses of all peers that we have discovered. /// The addresses of all peers that we have discovered.
discovered_peers: PeerCache, discovered_peers: PeerCache,
@ -200,7 +200,7 @@ impl Behaviour {
if !self.requests.contains(&request) { if !self.requests.contains(&request) {
self.requests.push(request); self.requests.push(request);
self.events.push_back(NetworkBehaviourAction::Dial { self.events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(p).build(), opts: DialOpts::peer_id(p).build(),
}); });
} }
@ -293,27 +293,19 @@ impl NetworkBehaviour for Behaviour {
let observed = info.observed_addr.clone(); let observed = info.observed_addr.clone();
self.events self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Received { .push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info }));
peer_id, self.events.push_back(ToSwarm::ReportObservedAddr {
info, address: observed,
})); score: AddressScore::Finite(1),
self.events });
.push_back(NetworkBehaviourAction::ReportObservedAddr {
address: observed,
score: AddressScore::Finite(1),
});
} }
handler::Event::Identification(peer) => { handler::Event::Identification(peer) => {
self.events self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent { .push_back(ToSwarm::GenerateEvent(Event::Sent { peer_id: peer }));
peer_id: peer,
}));
} }
handler::Event::IdentificationPushed => { handler::Event::IdentificationPushed => {
self.events self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed { .push_back(ToSwarm::GenerateEvent(Event::Pushed { peer_id }));
peer_id,
}));
} }
handler::Event::Identify => { handler::Event::Identify => {
self.requests.push(Request { self.requests.push(Request {
@ -323,10 +315,7 @@ impl NetworkBehaviour for Behaviour {
} }
handler::Event::IdentificationError(error) => { handler::Event::IdentificationError(error) => {
self.events self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Error { .push_back(ToSwarm::GenerateEvent(Event::Error { peer_id, error }));
peer_id,
error,
}));
} }
} }
} }
@ -335,7 +324,7 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
params: &mut impl PollParameters, params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Poll::Ready(event); return Poll::Ready(event);
} }
@ -345,7 +334,7 @@ impl NetworkBehaviour for Behaviour {
Some(Request { Some(Request {
peer_id, peer_id,
protocol: Protocol::Push, protocol: Protocol::Push,
}) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { }) => Poll::Ready(ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::Any, handler: NotifyHandler::Any,
event: InEvent { event: InEvent {
@ -362,7 +351,7 @@ impl NetworkBehaviour for Behaviour {
Some(Request { Some(Request {
peer_id, peer_id,
protocol: Protocol::Identify(connection_id), protocol: Protocol::Identify(connection_id),
}) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { }) => Poll::Ready(ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(connection_id), handler: NotifyHandler::One(connection_id),
event: InEvent { event: InEvent {

View File

@ -47,8 +47,8 @@ use libp2p_swarm::behaviour::{
use libp2p_swarm::{ use libp2p_swarm::{
dial_opts::{self, DialOpts}, dial_opts::{self, DialOpts},
ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses, ConnectionDenied, ConnectionId, DialError, ExternalAddresses, ListenAddresses,
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, NetworkBehaviour, NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent,
THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use log::{debug, info, warn}; use log::{debug, info, warn};
use smallvec::SmallVec; use smallvec::SmallVec;
@ -103,7 +103,7 @@ pub struct Kademlia<TStore> {
connection_idle_timeout: Duration, connection_idle_timeout: Duration,
/// Queued events to return when the behaviour is being polled. /// Queued events to return when the behaviour is being polled.
queued_events: VecDeque<NetworkBehaviourAction<KademliaEvent, KademliaHandlerIn<QueryId>>>, queued_events: VecDeque<ToSwarm<KademliaEvent, KademliaHandlerIn<QueryId>>>,
listen_addresses: ListenAddresses, listen_addresses: ListenAddresses,
@ -522,20 +522,19 @@ where
match self.kbuckets.entry(&key) { match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => { kbucket::Entry::Present(mut entry, _) => {
if entry.value().insert(address) { if entry.value().insert(address) {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::RoutingUpdated {
KademliaEvent::RoutingUpdated { peer: *peer,
peer: *peer, is_new_peer: false,
is_new_peer: false, addresses: entry.value().clone(),
addresses: entry.value().clone(), old_peer: None,
old_peer: None, bucket_range: self
bucket_range: self .kbuckets
.kbuckets .bucket(&key)
.bucket(&key) .map(|b| b.range())
.map(|b| b.range()) .expect("Not kbucket::Entry::SelfEntry."),
.expect("Not kbucket::Entry::SelfEntry."), },
}, ))
))
} }
RoutingUpdate::Success RoutingUpdate::Success
} }
@ -552,20 +551,19 @@ where
}; };
match entry.insert(addresses.clone(), status) { match entry.insert(addresses.clone(), status) {
kbucket::InsertResult::Inserted => { kbucket::InsertResult::Inserted => {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::RoutingUpdated {
KademliaEvent::RoutingUpdated { peer: *peer,
peer: *peer, is_new_peer: true,
is_new_peer: true, addresses,
addresses, old_peer: None,
old_peer: None, bucket_range: self
bucket_range: self .kbuckets
.kbuckets .bucket(&key)
.bucket(&key) .map(|b| b.range())
.map(|b| b.range()) .expect("Not kbucket::Entry::SelfEntry."),
.expect("Not kbucket::Entry::SelfEntry."), },
}, ));
));
RoutingUpdate::Success RoutingUpdate::Success
} }
kbucket::InsertResult::Full => { kbucket::InsertResult::Full => {
@ -573,7 +571,7 @@ where
RoutingUpdate::Failed RoutingUpdate::Failed
} }
kbucket::InsertResult::Pending { disconnected } => { kbucket::InsertResult::Pending { disconnected } => {
self.queued_events.push_back(NetworkBehaviourAction::Dial { self.queued_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(disconnected.into_preimage()).build(), opts: DialOpts::peer_id(disconnected.into_preimage()).build(),
}); });
RoutingUpdate::Pending RoutingUpdate::Pending
@ -727,15 +725,14 @@ where
let stats = QueryStats::empty(); let stats = QueryStats::empty();
if let Some(record) = record { if let Some(record) = record {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::OutboundQueryProgressed {
KademliaEvent::OutboundQueryProgressed { id,
id, result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))),
result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))), step,
step, stats,
stats, },
}, ));
));
} }
id id
@ -975,18 +972,17 @@ where
let stats = QueryStats::empty(); let stats = QueryStats::empty();
if !providers.is_empty() { if !providers.is_empty() {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::OutboundQueryProgressed {
KademliaEvent::OutboundQueryProgressed { id,
id, result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key,
key, providers,
providers, })),
})), step,
step, stats,
stats, },
}, ));
));
} }
id id
} }
@ -1134,20 +1130,19 @@ where
} }
if let Some(address) = address { if let Some(address) = address {
if entry.value().insert(address) { if entry.value().insert(address) {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::RoutingUpdated {
KademliaEvent::RoutingUpdated { peer,
peer, is_new_peer: false,
is_new_peer: false, addresses: entry.value().clone(),
addresses: entry.value().clone(), old_peer: None,
old_peer: None, bucket_range: self
bucket_range: self .kbuckets
.kbuckets .bucket(&key)
.bucket(&key) .map(|b| b.range())
.map(|b| b.range()) .expect("Not kbucket::Entry::SelfEntry."),
.expect("Not kbucket::Entry::SelfEntry."), },
}, ))
))
} }
} }
} }
@ -1168,16 +1163,14 @@ where
} }
match (address, self.kbucket_inserts) { match (address, self.kbucket_inserts) {
(None, _) => { (None, _) => {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::UnroutablePeer { peer },
KademliaEvent::UnroutablePeer { peer }, ));
));
} }
(Some(a), KademliaBucketInserts::Manual) => { (Some(a), KademliaBucketInserts::Manual) => {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::RoutablePeer { peer, address: a },
KademliaEvent::RoutablePeer { peer, address: a }, ));
));
} }
(Some(a), KademliaBucketInserts::OnConnected) => { (Some(a), KademliaBucketInserts::OnConnected) => {
let addresses = Addresses::new(a); let addresses = Addresses::new(a);
@ -1194,25 +1187,20 @@ where
.map(|b| b.range()) .map(|b| b.range())
.expect("Not kbucket::Entry::SelfEntry."), .expect("Not kbucket::Entry::SelfEntry."),
}; };
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(event));
.push_back(NetworkBehaviourAction::GenerateEvent(event));
} }
kbucket::InsertResult::Full => { kbucket::InsertResult::Full => {
debug!("Bucket full. Peer not added to routing table: {}", peer); debug!("Bucket full. Peer not added to routing table: {}", peer);
let address = addresses.first().clone(); let address = addresses.first().clone();
self.queued_events.push_back( self.queued_events.push_back(ToSwarm::GenerateEvent(
NetworkBehaviourAction::GenerateEvent( KademliaEvent::RoutablePeer { peer, address },
KademliaEvent::RoutablePeer { peer, address }, ));
),
);
} }
kbucket::InsertResult::Pending { disconnected } => { kbucket::InsertResult::Pending { disconnected } => {
let address = addresses.first().clone(); let address = addresses.first().clone();
self.queued_events.push_back( self.queued_events.push_back(ToSwarm::GenerateEvent(
NetworkBehaviourAction::GenerateEvent( KademliaEvent::PendingRoutablePeer { peer, address },
KademliaEvent::PendingRoutablePeer { peer, address }, ));
),
);
// `disconnected` might already be in the process of re-connecting. // `disconnected` might already be in the process of re-connecting.
// In other words `disconnected` might have already re-connected but // In other words `disconnected` might have already re-connected but
@ -1221,7 +1209,7 @@ where
// //
// Only try dialing peer if not currently connected. // Only try dialing peer if not currently connected.
if !self.connected_peers.contains(disconnected.preimage()) { if !self.connected_peers.contains(disconnected.preimage()) {
self.queued_events.push_back(NetworkBehaviourAction::Dial { self.queued_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(disconnected.into_preimage()) opts: DialOpts::peer_id(disconnected.into_preimage())
.build(), .build(),
}) })
@ -1624,16 +1612,15 @@ where
// If the (alleged) publisher is the local node, do nothing. The record of // If the (alleged) publisher is the local node, do nothing. The record of
// the original publisher should never change as a result of replication // the original publisher should never change as a result of replication
// and the publisher is always assumed to have the "right" value. // and the publisher is always assumed to have the "right" value.
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source,
peer_id: source, handler: NotifyHandler::One(connection),
handler: NotifyHandler::One(connection), event: KademliaHandlerIn::PutRecordRes {
event: KademliaHandlerIn::PutRecordRes { key: record.key,
key: record.key, value: record.value,
value: record.value, request_id,
request_id, },
}, });
});
return; return;
} }
@ -1684,40 +1671,37 @@ where
record.key, record.key,
record.value.len() record.value.len()
); );
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::InboundRequest {
KademliaEvent::InboundRequest { request: InboundRequest::PutRecord {
request: InboundRequest::PutRecord { source,
source, connection,
connection, record: None,
record: None,
},
}, },
)); },
));
} }
Err(e) => { Err(e) => {
info!("Record not stored: {:?}", e); info!("Record not stored: {:?}", e);
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source,
peer_id: source, handler: NotifyHandler::One(connection),
handler: NotifyHandler::One(connection), event: KademliaHandlerIn::Reset(request_id),
event: KademliaHandlerIn::Reset(request_id), });
});
return; return;
} }
}, },
KademliaStoreInserts::FilterBoth => { KademliaStoreInserts::FilterBoth => {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::InboundRequest {
KademliaEvent::InboundRequest { request: InboundRequest::PutRecord {
request: InboundRequest::PutRecord { source,
source, connection,
connection, record: Some(record.clone()),
record: Some(record.clone()),
},
}, },
)); },
));
} }
} }
} }
@ -1729,16 +1713,15 @@ where
// closest nodes to the target. In addition returning // closest nodes to the target. In addition returning
// [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal // [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal
// information to a possibly malicious remote node. // information to a possibly malicious remote node.
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source,
peer_id: source, handler: NotifyHandler::One(connection),
handler: NotifyHandler::One(connection), event: KademliaHandlerIn::PutRecordRes {
event: KademliaHandlerIn::PutRecordRes { key: record.key,
key: record.key, value: record.value,
value: record.value, request_id,
request_id, },
}, })
})
} }
/// Processes a provider record received from a peer. /// Processes a provider record received from a peer.
@ -1757,22 +1740,20 @@ where
return; return;
} }
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::InboundRequest {
KademliaEvent::InboundRequest { request: InboundRequest::AddProvider { record: None },
request: InboundRequest::AddProvider { record: None }, },
}, ));
));
} }
KademliaStoreInserts::FilterBoth => { KademliaStoreInserts::FilterBoth => {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::InboundRequest {
KademliaEvent::InboundRequest { request: InboundRequest::AddProvider {
request: InboundRequest::AddProvider { record: Some(record),
record: Some(record),
},
}, },
)); },
));
} }
} }
} }
@ -1845,12 +1826,11 @@ where
.position(|(p, _)| p == &peer_id) .position(|(p, _)| p == &peer_id)
.map(|p| q.inner.pending_rpcs.remove(p)) .map(|p| q.inner.pending_rpcs.remove(p))
}) { }) {
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id,
peer_id, event,
event, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, });
});
} }
self.connected_peers.insert(peer_id); self.connected_peers.insert(peer_id);
@ -2085,24 +2065,22 @@ where
KademliaHandlerEvent::FindNodeReq { key, request_id } => { KademliaHandlerEvent::FindNodeReq { key, request_id } => {
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::InboundRequest {
KademliaEvent::InboundRequest { request: InboundRequest::FindNode {
request: InboundRequest::FindNode { num_closer_peers: closer_peers.len(),
num_closer_peers: closer_peers.len(),
},
}, },
)); },
));
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source,
peer_id: source, handler: NotifyHandler::One(connection),
handler: NotifyHandler::One(connection), event: KademliaHandlerIn::FindNodeRes {
event: KademliaHandlerIn::FindNodeRes { closer_peers,
closer_peers, request_id,
request_id, },
}, });
});
} }
KademliaHandlerEvent::FindNodeRes { KademliaHandlerEvent::FindNodeRes {
@ -2116,26 +2094,24 @@ where
let provider_peers = self.provider_peers(&key, &source); let provider_peers = self.provider_peers(&key, &source);
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::InboundRequest {
KademliaEvent::InboundRequest { request: InboundRequest::GetProvider {
request: InboundRequest::GetProvider { num_closer_peers: closer_peers.len(),
num_closer_peers: closer_peers.len(), num_provider_peers: provider_peers.len(),
num_provider_peers: provider_peers.len(),
},
}, },
)); },
));
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source,
peer_id: source, handler: NotifyHandler::One(connection),
handler: NotifyHandler::One(connection), event: KademliaHandlerIn::GetProvidersRes {
event: KademliaHandlerIn::GetProvidersRes { closer_peers,
closer_peers, provider_peers,
provider_peers, request_id,
request_id, },
}, });
});
} }
KademliaHandlerEvent::GetProvidersRes { KademliaHandlerEvent::GetProvidersRes {
@ -2157,20 +2133,19 @@ where
*providers_found += provider_peers.len(); *providers_found += provider_peers.len();
let providers = provider_peers.iter().map(|p| p.node_id).collect(); let providers = provider_peers.iter().map(|p| p.node_id).collect();
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::OutboundQueryProgressed {
KademliaEvent::OutboundQueryProgressed { id: user_data,
id: user_data, result: QueryResult::GetProviders(Ok(
result: QueryResult::GetProviders(Ok( GetProvidersOk::FoundProviders {
GetProvidersOk::FoundProviders { key: key.clone(),
key: key.clone(), providers,
providers, },
}, )),
)), step: step.clone(),
step: step.clone(), stats,
stats, },
}, ));
));
*step = step.next(); *step = step.next();
} }
} }
@ -2215,26 +2190,24 @@ where
let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); let closer_peers = self.find_closest(&kbucket::Key::new(key), &source);
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::InboundRequest {
KademliaEvent::InboundRequest { request: InboundRequest::GetRecord {
request: InboundRequest::GetRecord { num_closer_peers: closer_peers.len(),
num_closer_peers: closer_peers.len(), present_locally: record.is_some(),
present_locally: record.is_some(),
},
}, },
)); },
));
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: source,
peer_id: source, handler: NotifyHandler::One(connection),
handler: NotifyHandler::One(connection), event: KademliaHandlerIn::GetRecordRes {
event: KademliaHandlerIn::GetRecordRes { record,
record, closer_peers,
closer_peers, request_id,
request_id, },
}, });
});
} }
KademliaHandlerEvent::GetRecordRes { KademliaHandlerEvent::GetRecordRes {
@ -2258,17 +2231,16 @@ where
record, record,
}; };
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( KademliaEvent::OutboundQueryProgressed {
KademliaEvent::OutboundQueryProgressed { id: user_data,
id: user_data, result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(
result: QueryResult::GetRecord(Ok( record,
GetRecordOk::FoundRecord(record), ))),
)), step: step.clone(),
step: step.clone(), stats,
stats, },
}, ));
));
*step = step.next(); *step = step.next();
} else { } else {
@ -2333,7 +2305,7 @@ where
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
let now = Instant::now(); let now = Instant::now();
// Calculate the available capacity for queries triggered by background jobs. // Calculate the available capacity for queries triggered by background jobs.
@ -2392,7 +2364,7 @@ where
addresses: value, addresses: value,
old_peer: entry.evicted.map(|n| n.key.into_preimage()), old_peer: entry.evicted.map(|n| n.key.into_preimage()),
}; };
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(ToSwarm::GenerateEvent(event));
} }
// Look for a finished query. // Look for a finished query.
@ -2400,12 +2372,12 @@ where
match self.queries.poll(now) { match self.queries.poll(now) {
QueryPoolState::Finished(q) => { QueryPoolState::Finished(q) => {
if let Some(event) = self.query_finished(q) { if let Some(event) = self.query_finished(q) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(ToSwarm::GenerateEvent(event));
} }
} }
QueryPoolState::Timeout(q) => { QueryPoolState::Timeout(q) => {
if let Some(event) = self.query_timeout(q) { if let Some(event) = self.query_timeout(q) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(ToSwarm::GenerateEvent(event));
} }
} }
QueryPoolState::Waiting(Some((query, peer_id))) => { QueryPoolState::Waiting(Some((query, peer_id))) => {
@ -2424,15 +2396,14 @@ where
} }
if self.connected_peers.contains(&peer_id) { if self.connected_peers.contains(&peer_id) {
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id,
peer_id, event,
event, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, });
});
} else if &peer_id != self.kbuckets.local_key().preimage() { } else if &peer_id != self.kbuckets.local_key().preimage() {
query.inner.pending_rpcs.push((peer_id, event)); query.inner.pending_rpcs.push((peer_id, event));
self.queued_events.push_back(NetworkBehaviourAction::Dial { self.queued_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id).build(), opts: DialOpts::peer_id(peer_id).build(),
}); });
} }

View File

@ -31,8 +31,8 @@ use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::behaviour::FromSwarm;
use libp2p_swarm::{ use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, dummy, ConnectionDenied, ConnectionId, ListenAddresses, NetworkBehaviour, PollParameters,
NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::hash_map::{Entry, HashMap}; use std::collections::hash_map::{Entry, HashMap};
@ -252,7 +252,7 @@ where
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
// Poll ifwatch. // Poll ifwatch.
while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) {
match event { match event {
@ -307,7 +307,7 @@ where
let event = Event::Discovered(DiscoveredAddrsIter { let event = Event::Discovered(DiscoveredAddrsIter {
inner: discovered.into_iter(), inner: discovered.into_iter(),
}); });
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(ToSwarm::GenerateEvent(event));
} }
// Emit expired event. // Emit expired event.
let now = Instant::now(); let now = Instant::now();
@ -326,7 +326,7 @@ where
let event = Event::Expired(ExpiredAddrsIter { let event = Event::Expired(ExpiredAddrsIter {
inner: expired.into_iter(), inner: expired.into_iter(),
}); });
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Poll::Ready(ToSwarm::GenerateEvent(event));
} }
if let Some(closest_expiration) = closest_expiration { if let Some(closest_expiration) = closest_expiration {
let mut timer = P::Timer::at(closest_expiration); let mut timer = P::Timer::at(closest_expiration);

View File

@ -29,8 +29,8 @@ use libp2p_core::Multiaddr;
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::{ use libp2p_swarm::{
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionHandlerUpgrErr, derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionHandlerUpgrErr,
ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, PollParameters, THandlerInEvent,
PollParameters, THandlerInEvent, THandlerOutEvent, THandlerOutEvent, ToSwarm,
}; };
use void::Void; use void::Void;
@ -47,7 +47,7 @@ pub struct Event {
#[derive(Default)] #[derive(Default)]
pub struct Behaviour { pub struct Behaviour {
/// Queue of actions to return when polled. /// Queue of actions to return when polled.
queued_events: VecDeque<NetworkBehaviourAction<Event, THandlerInEvent<Self>>>, queued_events: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
/// Set of connected peers. /// Set of connected peers.
connected: HashSet<PeerId>, connected: HashSet<PeerId>,
} }
@ -64,12 +64,11 @@ impl Behaviour {
let id = RunId::next(); let id = RunId::next();
self.queued_events self.queued_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: server,
peer_id: server, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, event: crate::client::handler::Command { id, params },
event: crate::client::handler::Command { id, params }, });
});
Ok(id) Ok(id)
} }
@ -141,14 +140,14 @@ impl NetworkBehaviour for Behaviour {
super::handler::Event { id, result }: THandlerOutEvent<Self>, super::handler::Event { id, result }: THandlerOutEvent<Self>,
) { ) {
self.queued_events self.queued_events
.push_back(NetworkBehaviourAction::GenerateEvent(Event { id, result })); .push_back(ToSwarm::GenerateEvent(Event { id, result }));
} }
fn poll( fn poll(
&mut self, &mut self,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.queued_events.pop_front() { if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event); return Poll::Ready(event);
} }

View File

@ -27,8 +27,8 @@ use std::{
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::{ use libp2p_swarm::{
ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, THandlerInEvent, THandlerOutEvent,
THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use crate::server::handler::Handler; use crate::server::handler::Handler;
@ -44,7 +44,7 @@ pub struct Event {
#[derive(Default)] #[derive(Default)]
pub struct Behaviour { pub struct Behaviour {
/// Queue of actions to return when polled. /// Queue of actions to return when polled.
queued_events: VecDeque<NetworkBehaviourAction<Event, THandlerInEvent<Self>>>, queued_events: VecDeque<ToSwarm<Event, THandlerInEvent<Self>>>,
} }
impl Behaviour { impl Behaviour {
@ -100,18 +100,17 @@ impl NetworkBehaviour for Behaviour {
_connection_id: ConnectionId, _connection_id: ConnectionId,
super::handler::Event { stats }: THandlerOutEvent<Self>, super::handler::Event { stats }: THandlerOutEvent<Self>,
) { ) {
self.queued_events self.queued_events.push_back(ToSwarm::GenerateEvent(Event {
.push_back(NetworkBehaviourAction::GenerateEvent(Event { remote_peer_id: event_source,
remote_peer_id: event_source, stats,
stats, }))
}))
} }
fn poll( fn poll(
&mut self, &mut self,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.queued_events.pop_front() { if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event); return Poll::Ready(event);
} }

View File

@ -50,8 +50,8 @@ pub use handler::{Config, Failure, Success};
use libp2p_core::{Endpoint, Multiaddr}; use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::{ use libp2p_swarm::{
behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, behaviour::FromSwarm, ConnectionDenied, ConnectionId, NetworkBehaviour, PollParameters,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
@ -154,7 +154,7 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(e) = self.events.pop_back() { if let Some(e) = self.events.pop_back() {
let Event { result, peer } = &e; let Event { result, peer } = &e;
@ -164,7 +164,7 @@ impl NetworkBehaviour for Behaviour {
_ => {} _ => {}
} }
Poll::Ready(NetworkBehaviourAction::GenerateEvent(e)) Poll::Ready(ToSwarm::GenerateEvent(e))
} else { } else {
Poll::Pending Poll::Pending
} }

View File

@ -35,8 +35,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm};
use libp2p_swarm::{ use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandlerUpgrErr, ConnectionId, ExternalAddresses, dummy, ConnectionDenied, ConnectionHandlerUpgrErr, ConnectionId, ExternalAddresses,
NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, NetworkBehaviour, NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent,
THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::collections::{hash_map, HashMap, HashSet, VecDeque};
use std::num::NonZeroU32; use std::num::NonZeroU32;
@ -242,7 +242,7 @@ impl Behaviour {
.filter(|c| matches!(c.status, CircuitStatus::Accepted)) .filter(|c| matches!(c.status, CircuitStatus::Accepted))
{ {
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::CircuitClosed { ToSwarm::GenerateEvent(Event::CircuitClosed {
src_peer_id: circuit.src_peer_id, src_peer_id: circuit.src_peer_id,
dst_peer_id: circuit.dst_peer_id, dst_peer_id: circuit.dst_peer_id,
error: Some(std::io::ErrorKind::ConnectionAborted.into()), error: Some(std::io::ErrorKind::ConnectionAborted.into()),
@ -377,7 +377,7 @@ impl NetworkBehaviour for Behaviour {
.all(|limiter| { .all(|limiter| {
limiter.try_next(event_source, endpoint.get_remote_address(), now) limiter.try_next(event_source, endpoint.get_remote_address(), now)
}) { }) {
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
peer_id: event_source, peer_id: event_source,
event: Either::Left(handler::In::DenyReservationReq { event: Either::Left(handler::In::DenyReservationReq {
@ -411,7 +411,7 @@ impl NetworkBehaviour for Behaviour {
.insert(connection); .insert(connection);
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::ReservationReqAccepted { ToSwarm::GenerateEvent(Event::ReservationReqAccepted {
src_peer_id: event_source, src_peer_id: event_source,
renewed, renewed,
}) })
@ -420,7 +420,7 @@ impl NetworkBehaviour for Behaviour {
} }
handler::Event::ReservationReqAcceptFailed { error } => { handler::Event::ReservationReqAcceptFailed { error } => {
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::ReservationReqAcceptFailed { ToSwarm::GenerateEvent(Event::ReservationReqAcceptFailed {
src_peer_id: event_source, src_peer_id: event_source,
error, error,
}) })
@ -429,7 +429,7 @@ impl NetworkBehaviour for Behaviour {
} }
handler::Event::ReservationReqDenied {} => { handler::Event::ReservationReqDenied {} => {
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::ReservationReqDenied { ToSwarm::GenerateEvent(Event::ReservationReqDenied {
src_peer_id: event_source, src_peer_id: event_source,
}) })
.into(), .into(),
@ -437,7 +437,7 @@ impl NetworkBehaviour for Behaviour {
} }
handler::Event::ReservationReqDenyFailed { error } => { handler::Event::ReservationReqDenyFailed { error } => {
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::ReservationReqDenyFailed { ToSwarm::GenerateEvent(Event::ReservationReqDenyFailed {
src_peer_id: event_source, src_peer_id: event_source,
error, error,
}) })
@ -462,7 +462,7 @@ impl NetworkBehaviour for Behaviour {
} }
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::ReservationTimedOut { ToSwarm::GenerateEvent(Event::ReservationTimedOut {
src_peer_id: event_source, src_peer_id: event_source,
}) })
.into(), .into(),
@ -491,7 +491,7 @@ impl NetworkBehaviour for Behaviour {
limiter.try_next(event_source, endpoint.get_remote_address(), now) limiter.try_next(event_source, endpoint.get_remote_address(), now)
}) { }) {
// Deny circuit exceeding limits. // Deny circuit exceeding limits.
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
peer_id: event_source, peer_id: event_source,
event: Either::Left(handler::In::DenyCircuitReq { event: Either::Left(handler::In::DenyCircuitReq {
@ -514,7 +514,7 @@ impl NetworkBehaviour for Behaviour {
dst_connection_id: *dst_conn, dst_connection_id: *dst_conn,
}); });
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
handler: NotifyHandler::One(*dst_conn), handler: NotifyHandler::One(*dst_conn),
peer_id: event_source, peer_id: event_source,
event: Either::Left(handler::In::NegotiateOutboundConnect { event: Either::Left(handler::In::NegotiateOutboundConnect {
@ -527,7 +527,7 @@ impl NetworkBehaviour for Behaviour {
} }
} else { } else {
// Deny circuit request if no reservation present. // Deny circuit request if no reservation present.
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
peer_id: event_source, peer_id: event_source,
event: Either::Left(handler::In::DenyCircuitReq { event: Either::Left(handler::In::DenyCircuitReq {
@ -541,7 +541,7 @@ impl NetworkBehaviour for Behaviour {
} }
handler::Event::CircuitReqReceiveFailed { error } => { handler::Event::CircuitReqReceiveFailed { error } => {
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::CircuitReqReceiveFailed { ToSwarm::GenerateEvent(Event::CircuitReqReceiveFailed {
src_peer_id: event_source, src_peer_id: event_source,
error, error,
}) })
@ -557,7 +557,7 @@ impl NetworkBehaviour for Behaviour {
} }
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::CircuitReqDenied { ToSwarm::GenerateEvent(Event::CircuitReqDenied {
src_peer_id: event_source, src_peer_id: event_source,
dst_peer_id, dst_peer_id,
}) })
@ -574,7 +574,7 @@ impl NetworkBehaviour for Behaviour {
} }
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::CircuitReqDenyFailed { ToSwarm::GenerateEvent(Event::CircuitReqDenyFailed {
src_peer_id: event_source, src_peer_id: event_source,
dst_peer_id, dst_peer_id,
error, error,
@ -592,7 +592,7 @@ impl NetworkBehaviour for Behaviour {
dst_pending_data, dst_pending_data,
} => { } => {
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
handler: NotifyHandler::One(src_connection_id), handler: NotifyHandler::One(src_connection_id),
peer_id: src_peer_id, peer_id: src_peer_id,
event: Either::Left(handler::In::AcceptAndDriveCircuit { event: Either::Left(handler::In::AcceptAndDriveCircuit {
@ -616,7 +616,7 @@ impl NetworkBehaviour for Behaviour {
error, error,
} => { } => {
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
handler: NotifyHandler::One(src_connection_id), handler: NotifyHandler::One(src_connection_id),
peer_id: src_peer_id, peer_id: src_peer_id,
event: Either::Left(handler::In::DenyCircuitReq { event: Either::Left(handler::In::DenyCircuitReq {
@ -628,7 +628,7 @@ impl NetworkBehaviour for Behaviour {
.into(), .into(),
); );
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::CircuitReqOutboundConnectFailed { ToSwarm::GenerateEvent(Event::CircuitReqOutboundConnectFailed {
src_peer_id, src_peer_id,
dst_peer_id: event_source, dst_peer_id: event_source,
error, error,
@ -642,7 +642,7 @@ impl NetworkBehaviour for Behaviour {
} => { } => {
self.circuits.accepted(circuit_id); self.circuits.accepted(circuit_id);
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::CircuitReqAccepted { ToSwarm::GenerateEvent(Event::CircuitReqAccepted {
src_peer_id: event_source, src_peer_id: event_source,
dst_peer_id, dst_peer_id,
}) })
@ -656,7 +656,7 @@ impl NetworkBehaviour for Behaviour {
} => { } => {
self.circuits.remove(circuit_id); self.circuits.remove(circuit_id);
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::CircuitReqAcceptFailed { ToSwarm::GenerateEvent(Event::CircuitReqAcceptFailed {
src_peer_id: event_source, src_peer_id: event_source,
dst_peer_id, dst_peer_id,
error, error,
@ -672,7 +672,7 @@ impl NetworkBehaviour for Behaviour {
self.circuits.remove(circuit_id); self.circuits.remove(circuit_id);
self.queued_actions.push_back( self.queued_actions.push_back(
NetworkBehaviourAction::GenerateEvent(Event::CircuitClosed { ToSwarm::GenerateEvent(Event::CircuitClosed {
src_peer_id: event_source, src_peer_id: event_source,
dst_peer_id, dst_peer_id,
error, error,
@ -687,7 +687,7 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(action) = self.queued_actions.pop_front() { if let Some(action) = self.queued_actions.pop_front() {
return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses)); return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses));
} }
@ -786,11 +786,11 @@ impl Add<u64> for CircuitId {
} }
} }
/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] /// A [`ToSwarm`], either complete, or still requiring data from [`PollParameters`]
/// before being returned in [`Behaviour::poll`]. /// before being returned in [`Behaviour::poll`].
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
enum Action { enum Action {
Done(NetworkBehaviourAction<Event, Either<handler::In, Void>>), Done(ToSwarm<Event, Either<handler::In, Void>>),
AcceptReservationPrototype { AcceptReservationPrototype {
inbound_reservation_req: inbound_hop::ReservationReq, inbound_reservation_req: inbound_hop::ReservationReq,
handler: NotifyHandler, handler: NotifyHandler,
@ -798,8 +798,8 @@ enum Action {
}, },
} }
impl From<NetworkBehaviourAction<Event, Either<handler::In, Void>>> for Action { impl From<ToSwarm<Event, Either<handler::In, Void>>> for Action {
fn from(action: NetworkBehaviourAction<Event, Either<handler::In, Void>>) -> Self { fn from(action: ToSwarm<Event, Either<handler::In, Void>>) -> Self {
Self::Done(action) Self::Done(action)
} }
} }
@ -809,14 +809,14 @@ impl Action {
self, self,
local_peer_id: PeerId, local_peer_id: PeerId,
external_addresses: &ExternalAddresses, external_addresses: &ExternalAddresses,
) -> NetworkBehaviourAction<Event, Either<handler::In, Void>> { ) -> ToSwarm<Event, Either<handler::In, Void>> {
match self { match self {
Action::Done(action) => action, Action::Done(action) => action,
Action::AcceptReservationPrototype { Action::AcceptReservationPrototype {
inbound_reservation_req, inbound_reservation_req,
handler, handler,
peer_id, peer_id,
} => NetworkBehaviourAction::NotifyHandler { } => ToSwarm::NotifyHandler {
handler, handler,
peer_id, peer_id,
event: Either::Left(handler::In::AcceptReservationReq { event: Either::Left(handler::In::AcceptReservationReq {

View File

@ -40,8 +40,8 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm
use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::dial_opts::DialOpts;
use libp2p_swarm::{ use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionHandlerUpgrErr, ConnectionId, dummy, ConnectionDenied, ConnectionHandler, ConnectionHandlerUpgrErr, ConnectionId,
DialFailure, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, DialFailure, NegotiatedSubstream, NetworkBehaviour, NotifyHandler, PollParameters, THandler,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use std::collections::{hash_map, HashMap, VecDeque}; use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, ErrorKind, IoSlice}; use std::io::{Error, ErrorKind, IoSlice};
@ -104,7 +104,7 @@ pub struct Behaviour {
directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>, directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>,
/// Queue of actions to return when polled. /// Queue of actions to return when polled.
queued_actions: VecDeque<NetworkBehaviourAction<Event, Either<handler::In, Void>>>, queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Void>>>,
pending_handler_commands: HashMap<ConnectionId, handler::In>, pending_handler_commands: HashMap<ConnectionId, handler::In>,
} }
@ -219,12 +219,11 @@ impl NetworkBehaviour for Behaviour {
} }
if let Some(event) = self.pending_handler_commands.remove(&connection_id) { if let Some(event) = self.pending_handler_commands.remove(&connection_id) {
self.queued_actions self.queued_actions.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id,
peer_id, handler: NotifyHandler::One(connection_id),
handler: NotifyHandler::One(connection_id), event: Either::Left(event),
event: Either::Left(event), })
})
} }
} }
FromSwarm::ConnectionClosed(connection_closed) => { FromSwarm::ConnectionClosed(connection_closed) => {
@ -296,15 +295,14 @@ impl NetworkBehaviour for Behaviour {
} }
}; };
self.queued_actions self.queued_actions.push_back(ToSwarm::GenerateEvent(event))
.push_back(NetworkBehaviourAction::GenerateEvent(event))
} }
fn poll( fn poll(
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
_poll_parameters: &mut impl PollParameters, _poll_parameters: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(action) = self.queued_actions.pop_front() { if let Some(action) = self.queued_actions.pop_front() {
return Poll::Ready(action); return Poll::Ready(action);
} }
@ -320,7 +318,7 @@ impl NetworkBehaviour for Behaviour {
.get(&relay_peer_id) .get(&relay_peer_id)
.and_then(|cs| cs.get(0)) .and_then(|cs| cs.get(0))
{ {
Some(connection_id) => NetworkBehaviourAction::NotifyHandler { Some(connection_id) => ToSwarm::NotifyHandler {
peer_id: relay_peer_id, peer_id: relay_peer_id,
handler: NotifyHandler::One(*connection_id), handler: NotifyHandler::One(*connection_id),
event: Either::Left(handler::In::Reserve { to_listener }), event: Either::Left(handler::In::Reserve { to_listener }),
@ -334,7 +332,7 @@ impl NetworkBehaviour for Behaviour {
self.pending_handler_commands self.pending_handler_commands
.insert(relayed_connection_id, handler::In::Reserve { to_listener }); .insert(relayed_connection_id, handler::In::Reserve { to_listener });
NetworkBehaviourAction::Dial { opts } ToSwarm::Dial { opts }
} }
} }
} }
@ -350,7 +348,7 @@ impl NetworkBehaviour for Behaviour {
.get(&relay_peer_id) .get(&relay_peer_id)
.and_then(|cs| cs.get(0)) .and_then(|cs| cs.get(0))
{ {
Some(connection_id) => NetworkBehaviourAction::NotifyHandler { Some(connection_id) => ToSwarm::NotifyHandler {
peer_id: relay_peer_id, peer_id: relay_peer_id,
handler: NotifyHandler::One(*connection_id), handler: NotifyHandler::One(*connection_id),
event: Either::Left(handler::In::EstablishCircuit { event: Either::Left(handler::In::EstablishCircuit {
@ -373,7 +371,7 @@ impl NetworkBehaviour for Behaviour {
}, },
); );
NetworkBehaviourAction::Dial { opts } ToSwarm::Dial { opts }
} }
} }
} }

View File

@ -33,8 +33,7 @@ use libp2p_identity::{Keypair, PeerId, SigningError};
use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::behaviour::FromSwarm;
use libp2p_swarm::{ use libp2p_swarm::{
CloseConnection, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour, CloseConnection, ConnectionDenied, ConnectionId, ExternalAddresses, NetworkBehaviour,
NetworkBehaviourAction, NotifyHandler, PollParameters, THandler, THandlerInEvent, NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
THandlerOutEvent,
}; };
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::iter::FromIterator; use std::iter::FromIterator;
@ -42,7 +41,7 @@ use std::task::{Context, Poll};
use void::Void; use void::Void;
pub struct Behaviour { pub struct Behaviour {
events: VecDeque<NetworkBehaviourAction<Event, InEvent<outbound::OpenInfo, Void, Void>>>, events: VecDeque<ToSwarm<Event, InEvent<outbound::OpenInfo, Void, Void>>>,
keypair: Keypair, keypair: Keypair,
pending_register_requests: Vec<(Namespace, PeerId, Option<Ttl>)>, pending_register_requests: Vec<(Namespace, PeerId, Option<Ttl>)>,
@ -75,7 +74,7 @@ impl Behaviour {
/// Register our external addresses in the given namespace with the given rendezvous peer. /// Register our external addresses in the given namespace with the given rendezvous peer.
/// ///
/// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported /// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported
/// by other [`NetworkBehaviour`]s via [`NetworkBehaviourAction::ReportObservedAddr`]. /// by other [`NetworkBehaviour`]s via [`ToSwarm::ReportObservedAddr`].
pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option<Ttl>) { pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option<Ttl>) {
self.pending_register_requests self.pending_register_requests
.push((namespace, rendezvous_node, ttl)); .push((namespace, rendezvous_node, ttl));
@ -83,14 +82,13 @@ impl Behaviour {
/// Unregister ourselves from the given namespace with the given rendezvous peer. /// Unregister ourselves from the given namespace with the given rendezvous peer.
pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) { pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: rendezvous_node,
peer_id: rendezvous_node, event: handler::OutboundInEvent::NewSubstream {
event: handler::OutboundInEvent::NewSubstream { open_info: OpenInfo::UnregisterRequest(namespace),
open_info: OpenInfo::UnregisterRequest(namespace), },
}, handler: NotifyHandler::Any,
handler: NotifyHandler::Any, });
});
} }
/// Discover other peers at a given rendezvous peer. /// Discover other peers at a given rendezvous peer.
@ -107,18 +105,17 @@ impl Behaviour {
limit: Option<u64>, limit: Option<u64>,
rendezvous_node: PeerId, rendezvous_node: PeerId,
) { ) {
self.events self.events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: rendezvous_node,
peer_id: rendezvous_node, event: handler::OutboundInEvent::NewSubstream {
event: handler::OutboundInEvent::NewSubstream { open_info: OpenInfo::DiscoverRequest {
open_info: OpenInfo::DiscoverRequest { namespace: ns,
namespace: ns, cookie,
cookie, limit,
limit,
},
}, },
handler: NotifyHandler::Any, },
}); handler: NotifyHandler::Any,
});
} }
} }
@ -233,7 +230,7 @@ impl NetworkBehaviour for Behaviour {
handler::OutboundOutEvent::OutboundError { error, .. } => { handler::OutboundOutEvent::OutboundError { error, .. } => {
log::warn!("Connection with peer {} failed: {}", peer_id, error); log::warn!("Connection with peer {} failed: {}", peer_id, error);
vec![NetworkBehaviourAction::CloseConnection { vec![ToSwarm::CloseConnection {
peer_id, peer_id,
connection: CloseConnection::One(connection_id), connection: CloseConnection::One(connection_id),
}] }]
@ -247,7 +244,7 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Poll::Ready(event); return Poll::Ready(event);
} }
@ -259,13 +256,13 @@ impl NetworkBehaviour for Behaviour {
let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>(); let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
if external_addresses.is_empty() { if external_addresses.is_empty() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent( return Poll::Ready(ToSwarm::GenerateEvent(Event::RegisterFailed(
Event::RegisterFailed(RegisterError::NoExternalAddresses), RegisterError::NoExternalAddresses,
)); )));
} }
let action = match PeerRecord::new(&self.keypair, external_addresses) { let action = match PeerRecord::new(&self.keypair, external_addresses) {
Ok(peer_record) => NetworkBehaviourAction::NotifyHandler { Ok(peer_record) => ToSwarm::NotifyHandler {
peer_id: rendezvous_node, peer_id: rendezvous_node,
event: handler::OutboundInEvent::NewSubstream { event: handler::OutboundInEvent::NewSubstream {
open_info: OpenInfo::RegisterRequest(NewRegistration { open_info: OpenInfo::RegisterRequest(NewRegistration {
@ -276,7 +273,7 @@ impl NetworkBehaviour for Behaviour {
}, },
handler: NotifyHandler::Any, handler: NotifyHandler::Any,
}, },
Err(signing_error) => NetworkBehaviourAction::GenerateEvent(Event::RegisterFailed( Err(signing_error) => ToSwarm::GenerateEvent(Event::RegisterFailed(
RegisterError::FailedToMakeRecord(signing_error), RegisterError::FailedToMakeRecord(signing_error),
)), )),
}; };
@ -288,7 +285,7 @@ impl NetworkBehaviour for Behaviour {
futures::ready!(self.expiring_registrations.poll_next_unpin(cx)) futures::ready!(self.expiring_registrations.poll_next_unpin(cx))
{ {
self.discovered_peers.remove(&expired_registration); self.discovered_peers.remove(&expired_registration);
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(Event::Expired { return Poll::Ready(ToSwarm::GenerateEvent(Event::Expired {
peer: expired_registration.0, peer: expired_registration.0,
})); }));
} }
@ -321,23 +318,23 @@ fn handle_outbound_event(
peer_id: PeerId, peer_id: PeerId,
discovered_peers: &mut HashMap<(PeerId, Namespace), Vec<Multiaddr>>, discovered_peers: &mut HashMap<(PeerId, Namespace), Vec<Multiaddr>>,
expiring_registrations: &mut FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>, expiring_registrations: &mut FuturesUnordered<BoxFuture<'static, (PeerId, Namespace)>>,
) -> Vec<NetworkBehaviourAction<Event, THandlerInEvent<Behaviour>>> { ) -> Vec<ToSwarm<Event, THandlerInEvent<Behaviour>>> {
match event { match event {
outbound::OutEvent::Registered { namespace, ttl } => { outbound::OutEvent::Registered { namespace, ttl } => {
vec![NetworkBehaviourAction::GenerateEvent(Event::Registered { vec![ToSwarm::GenerateEvent(Event::Registered {
rendezvous_node: peer_id, rendezvous_node: peer_id,
ttl, ttl,
namespace, namespace,
})] })]
} }
outbound::OutEvent::RegisterFailed(namespace, error) => { outbound::OutEvent::RegisterFailed(namespace, error) => {
vec![NetworkBehaviourAction::GenerateEvent( vec![ToSwarm::GenerateEvent(Event::RegisterFailed(
Event::RegisterFailed(RegisterError::Remote { RegisterError::Remote {
rendezvous_node: peer_id, rendezvous_node: peer_id,
namespace, namespace,
error, error,
}), },
)] ))]
} }
outbound::OutEvent::Discovered { outbound::OutEvent::Discovered {
registrations, registrations,
@ -361,20 +358,18 @@ fn handle_outbound_event(
.boxed() .boxed()
})); }));
vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered { vec![ToSwarm::GenerateEvent(Event::Discovered {
rendezvous_node: peer_id, rendezvous_node: peer_id,
registrations, registrations,
cookie, cookie,
})] })]
} }
outbound::OutEvent::DiscoverFailed { namespace, error } => { outbound::OutEvent::DiscoverFailed { namespace, error } => {
vec![NetworkBehaviourAction::GenerateEvent( vec![ToSwarm::GenerateEvent(Event::DiscoverFailed {
Event::DiscoverFailed { rendezvous_node: peer_id,
rendezvous_node: peer_id, namespace,
namespace, error,
error, })]
},
)]
} }
} }
} }

View File

@ -31,8 +31,8 @@ use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::FromSwarm; use libp2p_swarm::behaviour::FromSwarm;
use libp2p_swarm::{ use libp2p_swarm::{
CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
NotifyHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use std::collections::{HashMap, HashSet, VecDeque}; use std::collections::{HashMap, HashSet, VecDeque};
use std::iter::FromIterator; use std::iter::FromIterator;
@ -41,7 +41,7 @@ use std::time::Duration;
use void::Void; use void::Void;
pub struct Behaviour { pub struct Behaviour {
events: VecDeque<NetworkBehaviourAction<Event, InEvent<(), inbound::InEvent, Void>>>, events: VecDeque<ToSwarm<Event, InEvent<(), inbound::InEvent, Void>>>,
registrations: Registrations, registrations: Registrations,
} }
@ -150,7 +150,7 @@ impl NetworkBehaviour for Behaviour {
handler::InboundOutEvent::InboundError { error, .. } => { handler::InboundOutEvent::InboundError { error, .. } => {
log::warn!("Connection with peer {} failed: {}", peer_id, error); log::warn!("Connection with peer {} failed: {}", peer_id, error);
vec![NetworkBehaviourAction::CloseConnection { vec![ToSwarm::CloseConnection {
peer_id, peer_id,
connection: CloseConnection::One(connection), connection: CloseConnection::One(connection),
}] }]
@ -165,11 +165,11 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) { if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent( return Poll::Ready(ToSwarm::GenerateEvent(Event::RegistrationExpired(
Event::RegistrationExpired(registration), registration,
)); )));
} }
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
@ -203,7 +203,7 @@ fn handle_inbound_event(
connection: ConnectionId, connection: ConnectionId,
id: InboundSubstreamId, id: InboundSubstreamId,
registrations: &mut Registrations, registrations: &mut Registrations,
) -> Vec<NetworkBehaviourAction<Event, THandlerInEvent<Behaviour>>> { ) -> Vec<ToSwarm<Event, THandlerInEvent<Behaviour>>> {
match event { match event {
// bad registration // bad registration
inbound::OutEvent::RegistrationRequested(registration) inbound::OutEvent::RegistrationRequested(registration)
@ -212,7 +212,7 @@ fn handle_inbound_event(
let error = ErrorCode::NotAuthorized; let error = ErrorCode::NotAuthorized;
vec![ vec![
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream { event: handler::InboundInEvent::NotifyInboundSubstream {
@ -220,7 +220,7 @@ fn handle_inbound_event(
message: inbound::InEvent::DeclineRegisterRequest(error), message: inbound::InEvent::DeclineRegisterRequest(error),
}, },
}, },
NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered { ToSwarm::GenerateEvent(Event::PeerNotRegistered {
peer: peer_id, peer: peer_id,
namespace: registration.namespace, namespace: registration.namespace,
error, error,
@ -233,7 +233,7 @@ fn handle_inbound_event(
match registrations.add(registration) { match registrations.add(registration) {
Ok(registration) => { Ok(registration) => {
vec![ vec![
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream { event: handler::InboundInEvent::NotifyInboundSubstream {
@ -243,7 +243,7 @@ fn handle_inbound_event(
}, },
}, },
}, },
NetworkBehaviourAction::GenerateEvent(Event::PeerRegistered { ToSwarm::GenerateEvent(Event::PeerRegistered {
peer: peer_id, peer: peer_id,
registration, registration,
}), }),
@ -253,7 +253,7 @@ fn handle_inbound_event(
let error = ErrorCode::InvalidTtl; let error = ErrorCode::InvalidTtl;
vec![ vec![
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream { event: handler::InboundInEvent::NotifyInboundSubstream {
@ -261,7 +261,7 @@ fn handle_inbound_event(
message: inbound::InEvent::DeclineRegisterRequest(error), message: inbound::InEvent::DeclineRegisterRequest(error),
}, },
}, },
NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered { ToSwarm::GenerateEvent(Event::PeerNotRegistered {
peer: peer_id, peer: peer_id,
namespace, namespace,
error, error,
@ -279,7 +279,7 @@ fn handle_inbound_event(
let discovered = registrations.cloned().collect::<Vec<_>>(); let discovered = registrations.cloned().collect::<Vec<_>>();
vec![ vec![
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream { event: handler::InboundInEvent::NotifyInboundSubstream {
@ -290,7 +290,7 @@ fn handle_inbound_event(
}, },
}, },
}, },
NetworkBehaviourAction::GenerateEvent(Event::DiscoverServed { ToSwarm::GenerateEvent(Event::DiscoverServed {
enquirer: peer_id, enquirer: peer_id,
registrations: discovered, registrations: discovered,
}), }),
@ -300,7 +300,7 @@ fn handle_inbound_event(
let error = ErrorCode::InvalidCookie; let error = ErrorCode::InvalidCookie;
vec![ vec![
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler: NotifyHandler::One(connection), handler: NotifyHandler::One(connection),
event: handler::InboundInEvent::NotifyInboundSubstream { event: handler::InboundInEvent::NotifyInboundSubstream {
@ -308,7 +308,7 @@ fn handle_inbound_event(
message: inbound::InEvent::DeclineDiscoverRequest(error), message: inbound::InEvent::DeclineDiscoverRequest(error),
}, },
}, },
NetworkBehaviourAction::GenerateEvent(Event::DiscoverNotServed { ToSwarm::GenerateEvent(Event::DiscoverNotServed {
enquirer: peer_id, enquirer: peer_id,
error, error,
}), }),
@ -318,12 +318,10 @@ fn handle_inbound_event(
inbound::OutEvent::UnregisterRequested(namespace) => { inbound::OutEvent::UnregisterRequested(namespace) => {
registrations.remove(namespace.clone(), peer_id); registrations.remove(namespace.clone(), peer_id);
vec![NetworkBehaviourAction::GenerateEvent( vec![ToSwarm::GenerateEvent(Event::PeerUnregistered {
Event::PeerUnregistered { peer: peer_id,
peer: peer_id, namespace,
namespace, })]
},
)]
} }
} }
} }

View File

@ -75,8 +75,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::{ use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
dial_opts::DialOpts, dial_opts::DialOpts,
ConnectionDenied, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, PollParameters, THandler,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{ use std::{
@ -350,9 +350,8 @@ where
/// The protocol codec for reading and writing requests and responses. /// The protocol codec for reading and writing requests and responses.
codec: TCodec, codec: TCodec,
/// Pending events to return from `poll`. /// Pending events to return from `poll`.
pending_events: VecDeque< pending_events:
NetworkBehaviourAction<Event<TCodec::Request, TCodec::Response>, RequestProtocol<TCodec>>, VecDeque<ToSwarm<Event<TCodec::Request, TCodec::Response>, RequestProtocol<TCodec>>>,
>,
/// The currently connected peers, their pending outbound and inbound responses and their known, /// The currently connected peers, their pending outbound and inbound responses and their known,
/// reachable addresses, if any. /// reachable addresses, if any.
connected: HashMap<PeerId, SmallVec<[Connection; 2]>>, connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
@ -419,7 +418,7 @@ where
}; };
if let Some(request) = self.try_send_request(peer, request) { if let Some(request) = self.try_send_request(peer, request) {
self.pending_events.push_back(NetworkBehaviourAction::Dial { self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(*peer).build(), opts: DialOpts::peer_id(*peer).build(),
}); });
self.pending_outbound_requests self.pending_outbound_requests
@ -538,12 +537,11 @@ where
let ix = (request.request_id.0 as usize) % connections.len(); let ix = (request.request_id.0 as usize) % connections.len();
let conn = &mut connections[ix]; let conn = &mut connections[ix];
conn.pending_inbound_responses.insert(request.request_id); conn.pending_inbound_responses.insert(request.request_id);
self.pending_events self.pending_events.push_back(ToSwarm::NotifyHandler {
.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: *peer,
peer_id: *peer, handler: NotifyHandler::One(conn.id),
handler: NotifyHandler::One(conn.id), event: request,
event: request, });
});
None None
} else { } else {
Some(request) Some(request)
@ -675,24 +673,20 @@ where
for request_id in connection.pending_outbound_responses { for request_id in connection.pending_outbound_responses {
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent( .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
Event::InboundFailure { peer: peer_id,
peer: peer_id, request_id,
request_id, error: InboundFailure::ConnectionClosed,
error: InboundFailure::ConnectionClosed, }));
},
));
} }
for request_id in connection.pending_inbound_responses { for request_id in connection.pending_inbound_responses {
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent( .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
Event::OutboundFailure { peer: peer_id,
peer: peer_id, request_id,
request_id, error: OutboundFailure::ConnectionClosed,
error: OutboundFailure::ConnectionClosed, }));
},
));
} }
} }
@ -707,13 +701,11 @@ where
if let Some(pending) = self.pending_outbound_requests.remove(&peer) { if let Some(pending) = self.pending_outbound_requests.remove(&peer) {
for request in pending { for request in pending {
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent( .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
Event::OutboundFailure { peer,
peer, request_id: request.request_id,
request_id: request.request_id, error: OutboundFailure::DialFailure,
error: OutboundFailure::DialFailure, }));
},
));
} }
} }
} }
@ -825,10 +817,7 @@ where
response, response,
}; };
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message }));
peer,
message,
}));
} }
handler::Event::Request { handler::Event::Request {
request_id, request_id,
@ -842,10 +831,7 @@ where
channel, channel,
}; };
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { .push_back(ToSwarm::GenerateEvent(Event::Message { peer, message }));
peer,
message,
}));
match self.get_connection_mut(&peer, connection) { match self.get_connection_mut(&peer, connection) {
Some(connection) => { Some(connection) => {
@ -854,14 +840,13 @@ where
} }
// Connection closed after `Event::Request` has been emitted. // Connection closed after `Event::Request` has been emitted.
None => { None => {
self.pending_events self.pending_events.push_back(ToSwarm::GenerateEvent(
.push_back(NetworkBehaviourAction::GenerateEvent( Event::InboundFailure {
Event::InboundFailure { peer,
peer, request_id,
request_id, error: InboundFailure::ConnectionClosed,
error: InboundFailure::ConnectionClosed, },
}, ));
));
} }
} }
} }
@ -873,7 +858,7 @@ where
); );
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::ResponseSent { .push_back(ToSwarm::GenerateEvent(Event::ResponseSent {
peer, peer,
request_id, request_id,
})); }));
@ -886,13 +871,11 @@ where
); );
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent( .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
Event::InboundFailure { peer,
peer, request_id,
request_id, error: InboundFailure::ResponseOmission,
error: InboundFailure::ResponseOmission, }));
},
));
} }
handler::Event::OutboundTimeout(request_id) => { handler::Event::OutboundTimeout(request_id) => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
@ -902,13 +885,11 @@ where
); );
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent( .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
Event::OutboundFailure { peer,
peer, request_id,
request_id, error: OutboundFailure::Timeout,
error: OutboundFailure::Timeout, }));
},
));
} }
handler::Event::InboundTimeout(request_id) => { handler::Event::InboundTimeout(request_id) => {
// Note: `Event::InboundTimeout` is emitted both for timing // Note: `Event::InboundTimeout` is emitted both for timing
@ -918,13 +899,11 @@ where
self.remove_pending_outbound_response(&peer, connection, request_id); self.remove_pending_outbound_response(&peer, connection, request_id);
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent( .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
Event::InboundFailure { peer,
peer, request_id,
request_id, error: InboundFailure::Timeout,
error: InboundFailure::Timeout, }));
},
));
} }
handler::Event::OutboundUnsupportedProtocols(request_id) => { handler::Event::OutboundUnsupportedProtocols(request_id) => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
@ -934,26 +913,22 @@ where
); );
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent( .push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
Event::OutboundFailure { peer,
peer, request_id,
request_id, error: OutboundFailure::UnsupportedProtocols,
error: OutboundFailure::UnsupportedProtocols, }));
},
));
} }
handler::Event::InboundUnsupportedProtocols(request_id) => { handler::Event::InboundUnsupportedProtocols(request_id) => {
// Note: No need to call `self.remove_pending_outbound_response`, // Note: No need to call `self.remove_pending_outbound_response`,
// `Event::Request` was never emitted for this request and // `Event::Request` was never emitted for this request and
// thus request was never added to `pending_outbound_responses`. // thus request was never added to `pending_outbound_responses`.
self.pending_events self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent( .push_back(ToSwarm::GenerateEvent(Event::InboundFailure {
Event::InboundFailure { peer,
peer, request_id,
request_id, error: InboundFailure::UnsupportedProtocols,
error: InboundFailure::UnsupportedProtocols, }));
},
));
} }
} }
} }
@ -962,7 +937,7 @@ where
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(ev) = self.pending_events.pop_front() { if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(ev); return Poll::Ready(ev);
} else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {

View File

@ -62,7 +62,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let multiaddr = quote! { #prelude_path::Multiaddr }; let multiaddr = quote! { #prelude_path::Multiaddr };
let trait_to_impl = quote! { #prelude_path::NetworkBehaviour }; let trait_to_impl = quote! { #prelude_path::NetworkBehaviour };
let either_ident = quote! { #prelude_path::Either }; let either_ident = quote! { #prelude_path::Either };
let network_behaviour_action = quote! { #prelude_path::NetworkBehaviourAction }; let network_behaviour_action = quote! { #prelude_path::ToSwarm };
let connection_handler = quote! { #prelude_path::ConnectionHandler }; let connection_handler = quote! { #prelude_path::ConnectionHandler };
let proto_select_ident = quote! { #prelude_path::ConnectionHandlerSelect }; let proto_select_ident = quote! { #prelude_path::ConnectionHandlerSelect };
let peer_id = quote! { #prelude_path::PeerId }; let peer_id = quote! { #prelude_path::PeerId };

View File

@ -9,9 +9,15 @@
- Deprecate `Swarm::ban_peer_id` in favor of the new `libp2p::allow_block_list` module. - Deprecate `Swarm::ban_peer_id` in favor of the new `libp2p::allow_block_list` module.
See [PR 3590]. See [PR 3590].
- Rename `NetworkBehaviourAction` to `ToSwarm`.
A deprecated type-alias is provided to ease the transition.
The new name is meant to better indicate the message-passing relationship between `Swarm` and `NetworkBehaviour`.
See [PR XXXX].
[PR 3386]: https://github.com/libp2p/rust-libp2p/pull/3386 [PR 3386]: https://github.com/libp2p/rust-libp2p/pull/3386
[PR 3652]: https://github.com/libp2p/rust-libp2p/pull/3652 [PR 3652]: https://github.com/libp2p/rust-libp2p/pull/3652
[PR 3590]: https://github.com/libp2p/rust-libp2p/pull/3590 [PR 3590]: https://github.com/libp2p/rust-libp2p/pull/3590
[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX
# 0.42.0 # 0.42.0

View File

@ -141,7 +141,7 @@ pub trait NetworkBehaviour: 'static {
/// sent from the handler to the behaviour are invoked with /// sent from the handler to the behaviour are invoked with
/// [`NetworkBehaviour::on_connection_handler_event`], /// [`NetworkBehaviour::on_connection_handler_event`],
/// and the behaviour can send a message to the handler by making [`NetworkBehaviour::poll`] /// and the behaviour can send a message to the handler by making [`NetworkBehaviour::poll`]
/// return [`NetworkBehaviourAction::NotifyHandler`]. /// return [`ToSwarm::NotifyHandler`].
/// ///
/// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and /// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and
/// connection closing. /// connection closing.
@ -276,7 +276,7 @@ pub trait NetworkBehaviour: 'static {
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
params: &mut impl PollParameters, params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>>; ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>>;
} }
/// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to. /// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to.
@ -318,12 +318,14 @@ pub trait PollParameters {
fn local_peer_id(&self) -> &PeerId; fn local_peer_id(&self) -> &PeerId;
} }
/// An action that a [`NetworkBehaviour`] can trigger in the [`Swarm`] #[deprecated(note = "Use `ToSwarm` instead.")]
/// in whose context it is executing. pub type NetworkBehaviourAction<TOutEvent, TInEvent> = ToSwarm<TOutEvent, TInEvent>;
/// A command issued from a [`NetworkBehaviour`] for the [`Swarm`].
/// ///
/// [`Swarm`]: super::Swarm /// [`Swarm`]: super::Swarm
#[derive(Debug)] #[derive(Debug)]
pub enum NetworkBehaviourAction<TOutEvent, TInEvent> { pub enum ToSwarm<TOutEvent, TInEvent> {
/// Instructs the `Swarm` to return an event when it is being polled. /// Instructs the `Swarm` to return an event when it is being polled.
GenerateEvent(TOutEvent), GenerateEvent(TOutEvent),
@ -381,7 +383,7 @@ pub enum NetworkBehaviourAction<TOutEvent, TInEvent> {
/// with the given peer. /// with the given peer.
/// ///
/// Note: Closing a connection via /// Note: Closing a connection via
/// [`NetworkBehaviourAction::CloseConnection`] does not inform the /// [`ToSwarm::CloseConnection`] does not inform the
/// corresponding [`ConnectionHandler`](crate::ConnectionHandler). /// corresponding [`ConnectionHandler`](crate::ConnectionHandler).
/// Closing a connection via a [`ConnectionHandler`](crate::ConnectionHandler) can be done /// Closing a connection via a [`ConnectionHandler`](crate::ConnectionHandler) can be done
/// either in a collaborative manner across [`ConnectionHandler`](crate::ConnectionHandler)s /// either in a collaborative manner across [`ConnectionHandler`](crate::ConnectionHandler)s
@ -395,31 +397,31 @@ pub enum NetworkBehaviourAction<TOutEvent, TInEvent> {
}, },
} }
impl<TOutEvent, TInEventOld> NetworkBehaviourAction<TOutEvent, TInEventOld> { impl<TOutEvent, TInEventOld> ToSwarm<TOutEvent, TInEventOld> {
/// Map the handler event. /// Map the handler event.
pub fn map_in<TInEventNew>( pub fn map_in<TInEventNew>(
self, self,
f: impl FnOnce(TInEventOld) -> TInEventNew, f: impl FnOnce(TInEventOld) -> TInEventNew,
) -> NetworkBehaviourAction<TOutEvent, TInEventNew> { ) -> ToSwarm<TOutEvent, TInEventNew> {
match self { match self {
NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(e), ToSwarm::GenerateEvent(e) => ToSwarm::GenerateEvent(e),
NetworkBehaviourAction::Dial { opts } => NetworkBehaviourAction::Dial { opts }, ToSwarm::Dial { opts } => ToSwarm::Dial { opts },
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler, handler,
event, event,
} => NetworkBehaviourAction::NotifyHandler { } => ToSwarm::NotifyHandler {
peer_id, peer_id,
handler, handler,
event: f(event), event: f(event),
}, },
NetworkBehaviourAction::ReportObservedAddr { address, score } => { ToSwarm::ReportObservedAddr { address, score } => {
NetworkBehaviourAction::ReportObservedAddr { address, score } ToSwarm::ReportObservedAddr { address, score }
} }
NetworkBehaviourAction::CloseConnection { ToSwarm::CloseConnection {
peer_id, peer_id,
connection, connection,
} => NetworkBehaviourAction::CloseConnection { } => ToSwarm::CloseConnection {
peer_id, peer_id,
connection, connection,
}, },
@ -427,31 +429,28 @@ impl<TOutEvent, TInEventOld> NetworkBehaviourAction<TOutEvent, TInEventOld> {
} }
} }
impl<TOutEvent, THandlerIn> NetworkBehaviourAction<TOutEvent, THandlerIn> { impl<TOutEvent, THandlerIn> ToSwarm<TOutEvent, THandlerIn> {
/// Map the event the swarm will return. /// Map the event the swarm will return.
pub fn map_out<E>( pub fn map_out<E>(self, f: impl FnOnce(TOutEvent) -> E) -> ToSwarm<E, THandlerIn> {
self,
f: impl FnOnce(TOutEvent) -> E,
) -> NetworkBehaviourAction<E, THandlerIn> {
match self { match self {
NetworkBehaviourAction::GenerateEvent(e) => NetworkBehaviourAction::GenerateEvent(f(e)), ToSwarm::GenerateEvent(e) => ToSwarm::GenerateEvent(f(e)),
NetworkBehaviourAction::Dial { opts } => NetworkBehaviourAction::Dial { opts }, ToSwarm::Dial { opts } => ToSwarm::Dial { opts },
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler, handler,
event, event,
} => NetworkBehaviourAction::NotifyHandler { } => ToSwarm::NotifyHandler {
peer_id, peer_id,
handler, handler,
event, event,
}, },
NetworkBehaviourAction::ReportObservedAddr { address, score } => { ToSwarm::ReportObservedAddr { address, score } => {
NetworkBehaviourAction::ReportObservedAddr { address, score } ToSwarm::ReportObservedAddr { address, score }
} }
NetworkBehaviourAction::CloseConnection { ToSwarm::CloseConnection {
peer_id, peer_id,
connection, connection,
} => NetworkBehaviourAction::CloseConnection { } => ToSwarm::CloseConnection {
peer_id, peer_id,
connection, connection,
}, },

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::behaviour::{self, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use crate::behaviour::{self, NetworkBehaviour, PollParameters, ToSwarm};
use crate::connection::ConnectionId; use crate::connection::ConnectionId;
use crate::{ConnectionDenied, THandler, THandlerInEvent, THandlerOutEvent}; use crate::{ConnectionDenied, THandler, THandlerInEvent, THandlerOutEvent};
use either::Either; use either::Either;
@ -156,7 +156,7 @@ where
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
params: &mut impl PollParameters, params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
let event = match self { let event = match self {
Either::Left(behaviour) => futures::ready!(behaviour.poll(cx, params)) Either::Left(behaviour) => futures::ready!(behaviour.poll(cx, params))
.map_out(Either::Left) .map_out(Either::Left)

View File

@ -27,8 +27,8 @@ use crate::handler::{
}; };
use crate::upgrade::SendWrapper; use crate::upgrade::SendWrapper;
use crate::{ use crate::{
ConnectionDenied, NetworkBehaviour, NetworkBehaviourAction, PollParameters, THandler, ConnectionDenied, NetworkBehaviour, PollParameters, THandler, THandlerInEvent,
THandlerInEvent, THandlerOutEvent, THandlerOutEvent, ToSwarm,
}; };
use either::Either; use either::Either;
use futures::future; use futures::future;
@ -182,7 +182,7 @@ where
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
params: &mut impl PollParameters, params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
if let Some(inner) = self.inner.as_mut() { if let Some(inner) = self.inner.as_mut() {
inner.poll(cx, params) inner.poll(cx, params)
} else { } else {

View File

@ -30,7 +30,7 @@ use std::num::NonZeroU8;
/// Options to configure a dial to a known or unknown peer. /// Options to configure a dial to a known or unknown peer.
/// ///
/// Used in [`Swarm::dial`](crate::Swarm::dial) and /// Used in [`Swarm::dial`](crate::Swarm::dial) and
/// [`NetworkBehaviourAction::Dial`](crate::behaviour::NetworkBehaviourAction::Dial). /// [`ToSwarm::Dial`](crate::behaviour::ToSwarm::Dial).
/// ///
/// To construct use either of: /// To construct use either of:
/// ///

View File

@ -1,4 +1,4 @@
use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use crate::behaviour::{FromSwarm, NetworkBehaviour, PollParameters, ToSwarm};
use crate::connection::ConnectionId; use crate::connection::ConnectionId;
use crate::handler::{ use crate::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
@ -54,7 +54,7 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
Poll::Pending Poll::Pending
} }

View File

@ -1,4 +1,4 @@
use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use crate::behaviour::{FromSwarm, NetworkBehaviour, PollParameters, ToSwarm};
use crate::connection::ConnectionId; use crate::connection::ConnectionId;
use crate::handler::{ use crate::handler::{
ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound,
@ -57,7 +57,7 @@ impl NetworkBehaviour for Behaviour {
&mut self, &mut self,
_: &mut Context<'_>, _: &mut Context<'_>,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
Poll::Pending Poll::Pending
} }

View File

@ -90,11 +90,13 @@ pub mod derive_prelude {
pub use crate::ConnectionHandlerSelect; pub use crate::ConnectionHandlerSelect;
pub use crate::DialError; pub use crate::DialError;
pub use crate::NetworkBehaviour; pub use crate::NetworkBehaviour;
#[allow(deprecated)]
pub use crate::NetworkBehaviourAction; pub use crate::NetworkBehaviourAction;
pub use crate::PollParameters; pub use crate::PollParameters;
pub use crate::THandler; pub use crate::THandler;
pub use crate::THandlerInEvent; pub use crate::THandlerInEvent;
pub use crate::THandlerOutEvent; pub use crate::THandlerOutEvent;
pub use crate::ToSwarm;
pub use either::Either; pub use either::Either;
pub use futures::prelude as futures; pub use futures::prelude as futures;
pub use libp2p_core::transport::ListenerId; pub use libp2p_core::transport::ListenerId;
@ -106,11 +108,13 @@ pub mod derive_prelude {
#[allow(deprecated)] #[allow(deprecated)]
pub use crate::connection::ConnectionLimit; pub use crate::connection::ConnectionLimit;
#[allow(deprecated)]
pub use behaviour::NetworkBehaviourAction;
pub use behaviour::{ pub use behaviour::{
AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredExternalAddr, AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredExternalAddr,
ExpiredListenAddr, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure, ExpiredListenAddr, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
ListenerClosed, ListenerError, NetworkBehaviour, NetworkBehaviourAction, NewExternalAddr, ListenerClosed, ListenerError, NetworkBehaviour, NewExternalAddr, NewListenAddr, NotifyHandler,
NewListenAddr, NotifyHandler, PollParameters, PollParameters, ToSwarm,
}; };
#[allow(deprecated)] #[allow(deprecated)]
pub use connection::pool::{ConnectionCounters, ConnectionLimits}; pub use connection::pool::{ConnectionCounters, ConnectionLimits};
@ -699,7 +703,7 @@ where
/// order in which addresses are used to connect to) as well as /// order in which addresses are used to connect to) as well as
/// how long the address is retained in the list, depending on /// how long the address is retained in the list, depending on
/// how frequently it is reported by the `NetworkBehaviour` via /// how frequently it is reported by the `NetworkBehaviour` via
/// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly /// [`ToSwarm::ReportObservedAddr`] or explicitly
/// through this method. /// through this method.
pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult { pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
let result = self.external_addrs.add(a.clone(), s); let result = self.external_addrs.add(a.clone(), s);
@ -1188,13 +1192,11 @@ where
fn handle_behaviour_event( fn handle_behaviour_event(
&mut self, &mut self,
event: NetworkBehaviourAction<TBehaviour::OutEvent, THandlerInEvent<TBehaviour>>, event: ToSwarm<TBehaviour::OutEvent, THandlerInEvent<TBehaviour>>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> { ) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event { match event {
NetworkBehaviourAction::GenerateEvent(event) => { ToSwarm::GenerateEvent(event) => return Some(SwarmEvent::Behaviour(event)),
return Some(SwarmEvent::Behaviour(event)) ToSwarm::Dial { opts } => {
}
NetworkBehaviourAction::Dial { opts } => {
let peer_id = opts.get_or_parse_peer_id(); let peer_id = opts.get_or_parse_peer_id();
if let Ok(()) = self.dial(opts) { if let Ok(()) = self.dial(opts) {
if let Ok(Some(peer_id)) = peer_id { if let Ok(Some(peer_id)) = peer_id {
@ -1202,7 +1204,7 @@ where
} }
} }
} }
NetworkBehaviourAction::NotifyHandler { ToSwarm::NotifyHandler {
peer_id, peer_id,
handler, handler,
event, event,
@ -1221,7 +1223,7 @@ where
self.pending_event = Some((peer_id, handler, event)); self.pending_event = Some((peer_id, handler, event));
} }
NetworkBehaviourAction::ReportObservedAddr { address, score } => { ToSwarm::ReportObservedAddr { address, score } => {
// Maps the given `observed_addr`, representing an address of the local // Maps the given `observed_addr`, representing an address of the local
// node observed by a remote peer, onto the locally known listen addresses // node observed by a remote peer, onto the locally known listen addresses
// to yield one or more addresses of the local node that may be publicly // to yield one or more addresses of the local node that may be publicly
@ -1251,7 +1253,7 @@ where
self.add_external_address(addr, score); self.add_external_address(addr, score);
} }
} }
NetworkBehaviourAction::CloseConnection { ToSwarm::CloseConnection {
peer_id, peer_id,
connection, connection,
} => match connection { } => match connection {
@ -2314,7 +2316,7 @@ mod tests {
/// Establishes multiple connections between two peers, /// Establishes multiple connections between two peers,
/// after which one peer disconnects the other /// after which one peer disconnects the other
/// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
/// ///
/// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`] /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
/// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
@ -2352,12 +2354,14 @@ mod tests {
if reconnected { if reconnected {
return Poll::Ready(()); return Poll::Ready(());
} }
swarm2.behaviour.inner().next_action.replace( swarm2
NetworkBehaviourAction::CloseConnection { .behaviour
.inner()
.next_action
.replace(ToSwarm::CloseConnection {
peer_id: swarm1_id, peer_id: swarm1_id,
connection: CloseConnection::All, connection: CloseConnection::All,
}, });
);
state = State::Disconnecting; state = State::Disconnecting;
continue; continue;
} }
@ -2382,7 +2386,7 @@ mod tests {
/// Establishes multiple connections between two peers, /// Establishes multiple connections between two peers,
/// after which one peer closes a single connection /// after which one peer closes a single connection
/// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`].
/// ///
/// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`] /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
/// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`] /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
@ -2421,7 +2425,7 @@ mod tests {
let conn_id = let conn_id =
swarm2.behaviour.on_connection_established[num_connections / 2].1; swarm2.behaviour.on_connection_established[num_connections / 2].1;
swarm2.behaviour.inner().next_action.replace( swarm2.behaviour.inner().next_action.replace(
NetworkBehaviourAction::CloseConnection { ToSwarm::CloseConnection {
peer_id: swarm1_id, peer_id: swarm1_id,
connection: CloseConnection::One(conn_id), connection: CloseConnection::One(conn_id),
}, },

View File

@ -23,8 +23,8 @@ use crate::behaviour::{
FromSwarm, ListenerClosed, ListenerError, NewExternalAddr, NewListenAddr, NewListener, FromSwarm, ListenerClosed, ListenerError, NewExternalAddr, NewListenAddr, NewListener,
}; };
use crate::{ use crate::{
ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, NetworkBehaviourAction, ConnectionDenied, ConnectionHandler, ConnectionId, NetworkBehaviour, PollParameters, THandler,
PollParameters, THandler, THandlerInEvent, THandlerOutEvent, THandlerInEvent, THandlerOutEvent, ToSwarm,
}; };
use libp2p_core::{multiaddr::Multiaddr, transport::ListenerId, ConnectedPoint, Endpoint}; use libp2p_core::{multiaddr::Multiaddr, transport::ListenerId, ConnectedPoint, Endpoint};
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
@ -48,7 +48,7 @@ where
/// The next action to return from `poll`. /// The next action to return from `poll`.
/// ///
/// An action is only returned once. /// An action is only returned once.
pub next_action: Option<NetworkBehaviourAction<TOutEvent, THandler::InEvent>>, pub next_action: Option<ToSwarm<TOutEvent, THandler::InEvent>>,
} }
impl<THandler, TOutEvent> MockBehaviour<THandler, TOutEvent> impl<THandler, TOutEvent> MockBehaviour<THandler, TOutEvent>
@ -114,7 +114,7 @@ where
&mut self, &mut self,
_: &mut Context, _: &mut Context,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
self.next_action.take().map_or(Poll::Pending, Poll::Ready) self.next_action.take().map_or(Poll::Pending, Poll::Ready)
} }
@ -579,7 +579,7 @@ where
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
args: &mut impl PollParameters, args: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
self.poll += 1; self.poll += 1;
self.inner.poll(cx, args) self.inner.poll(cx, args)
} }

View File

@ -457,7 +457,7 @@ fn multiple_behaviour_attributes() {
#[test] #[test]
fn custom_out_event_no_type_parameters() { fn custom_out_event_no_type_parameters() {
use libp2p_identity::PeerId; use libp2p_identity::PeerId;
use libp2p_swarm::{ConnectionId, NetworkBehaviourAction, PollParameters}; use libp2p_swarm::{ConnectionId, PollParameters, ToSwarm};
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
@ -502,7 +502,7 @@ fn custom_out_event_no_type_parameters() {
&mut self, &mut self,
_ctx: &mut Context, _ctx: &mut Context,
_: &mut impl PollParameters, _: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, THandlerInEvent<Self>>> { ) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
Poll::Pending Poll::Pending
} }