refactor(swarm)!: remove handler from NetworkBehaviourAction::Dial (#3328)

We create the `ConnectionId` for the new connection as part of `DialOpts`. This allows `NetworkBehaviour`s to accurately track state regarding their own dial attempts.

This patch is the main enabler of https://github.com/libp2p/rust-libp2p/pull/3254. Removing the `handler` field will allow us to deprecate the `NetworkBehaviour::new_handler` function in favor of four new ones that give more control over the connection lifecycle.
This commit is contained in:
Thomas Eizinger
2023-02-14 14:09:29 +13:00
committed by GitHub
parent 9247cfa878
commit caed1fe2c7
31 changed files with 584 additions and 967 deletions

View File

@ -92,6 +92,7 @@ pub mod derive_prelude {
pub use crate::NetworkBehaviour;
pub use crate::NetworkBehaviourAction;
pub use crate::PollParameters;
pub use crate::THandlerInEvent;
pub use either::Either;
pub use futures::prelude as futures;
pub use libp2p_core::transport::ListenerId;
@ -162,7 +163,7 @@ type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;
/// Custom event that can be received by the [`ConnectionHandler`] of the
/// [`NetworkBehaviour`].
type THandlerInEvent<TBehaviour> =
pub type THandlerInEvent<TBehaviour> =
<<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;
/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
@ -326,12 +327,6 @@ where
/// List of nodes for which we deny any incoming connection.
banned_peers: HashSet<PeerId>,
/// Connections for which we withhold any reporting. These belong to banned peers.
///
/// Note: Connections to a peer that are established at the time of banning that peer
/// are not added here. Instead they are simply closed.
banned_peer_connections: HashSet<ConnectionId>,
/// Pending event to be delivered to connection handlers
/// (or dropped if the peer disconnected) before the `behaviour`
/// can be polled again.
@ -502,19 +497,13 @@ where
/// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
/// ```
pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
let handler = self.behaviour.new_handler();
self.dial_with_handler(opts.into(), handler)
}
let dial_opts = opts.into();
fn dial_with_handler(
&mut self,
dial_opts: DialOpts,
handler: <TBehaviour as NetworkBehaviour>::ConnectionHandler,
) -> Result<(), DialError> {
let peer_id = dial_opts
.get_or_parse_peer_id()
.map_err(DialError::InvalidPeerId)?;
let condition = dial_opts.peer_condition();
let connection_id = dial_opts.connection_id();
let should_dial = match (condition, peer_id) {
(PeerCondition::Always, _) => true,
@ -530,8 +519,8 @@ where
self.behaviour
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error: &e,
connection_id,
}));
return Err(e);
@ -544,8 +533,8 @@ where
self.behaviour
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: Some(peer_id),
handler,
error: &error,
connection_id,
}));
return Err(error);
@ -572,8 +561,8 @@ where
self.behaviour
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error: &error,
connection_id,
}));
return Err(error);
};
@ -607,18 +596,18 @@ where
match self.pool.add_outgoing(
dials,
peer_id,
handler,
dial_opts.role_override(),
dial_opts.dial_concurrency_override(),
connection_id,
) {
Ok(_connection_id) => Ok(()),
Err((connection_limit, handler)) => {
Ok(()) => Ok(()),
Err(connection_limit) => {
let error = DialError::ConnectionLimit(connection_limit);
self.behaviour
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error: &error,
connection_id,
}));
Err(error)
@ -762,65 +751,68 @@ where
peer_id,
id,
endpoint,
other_established_connection_ids,
connection,
concurrent_dial_errors,
established_in,
} => {
if self.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any
// future events from the connection to the behaviour.
self.banned_peer_connections.insert(id);
self.pool.disconnect(peer_id);
self.pool.close_connection(connection);
return Some(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
let non_banned_established = other_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count();
log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
peer_id,
endpoint,
num_established,
non_banned_established + 1,
);
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| {
es.iter()
.map(|(a, _)| a)
.cloned()
.collect::<Vec<Multiaddr>>()
})
.unwrap_or_default();
self.behaviour
.on_swarm_event(FromSwarm::ConnectionEstablished(
behaviour::ConnectionEstablished {
peer_id,
connection_id: id,
endpoint: &endpoint,
failed_addresses: &failed_addresses,
other_established: non_banned_established,
},
));
return Some(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
established_in,
});
}
let handler = self
.behaviour
.new_handler()
.into_handler(&peer_id, &endpoint);
let other_established_connection_ids = self
.pool
.iter_established_connections_of_peer(&peer_id)
.collect::<Vec<_>>();
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
self.pool
.spawn_connection(id, peer_id, &endpoint, connection, handler);
log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}.",
peer_id,
endpoint,
num_established,
);
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| {
es.iter()
.map(|(a, _)| a)
.cloned()
.collect::<Vec<Multiaddr>>()
})
.unwrap_or_default();
self.behaviour
.on_swarm_event(FromSwarm::ConnectionEstablished(
behaviour::ConnectionEstablished {
peer_id,
connection_id: id,
endpoint: &endpoint,
failed_addresses: &failed_addresses,
other_established: other_established_connection_ids.len(),
},
));
return Some(SwarmEvent::ConnectionEstablished {
peer_id,
num_established,
endpoint,
concurrent_dial_errors,
established_in,
});
}
PoolEvent::PendingOutboundConnectionError {
id: _,
id: connection_id,
error,
handler,
peer,
} => {
let error = error.into();
@ -828,8 +820,8 @@ where
self.behaviour
.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: peer,
handler,
error: &error,
connection_id,
}));
if let Some(peer) = peer {
@ -844,11 +836,10 @@ where
});
}
PoolEvent::PendingInboundConnectionError {
id: _,
id,
send_back_addr,
local_addr,
error,
handler,
} => {
let error = error.into();
@ -858,7 +849,7 @@ where
local_addr: &local_addr,
send_back_addr: &send_back_addr,
error: &error,
handler,
connection_id: id,
}));
return Some(SwarmEvent::IncomingConnectionError {
local_addr,
@ -892,21 +883,15 @@ where
let endpoint = connected.endpoint;
let num_established =
u32::try_from(remaining_established_connection_ids.len()).unwrap();
let conn_was_reported = !self.banned_peer_connections.remove(&id);
if conn_was_reported {
let remaining_non_banned = remaining_established_connection_ids
.into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count();
self.behaviour
.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id: id,
endpoint: &endpoint,
handler,
remaining_established: remaining_non_banned,
}));
}
self.behaviour
.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id: id,
endpoint: &endpoint,
handler,
remaining_established: num_established as usize,
}));
return Some(SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
@ -915,12 +900,8 @@ where
});
}
PoolEvent::ConnectionEvent { peer_id, id, event } => {
if self.banned_peer_connections.contains(&id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id);
} else {
self.behaviour
.on_connection_handler_event(peer_id, id, event);
}
self.behaviour
.on_connection_handler_event(peer_id, id, event);
}
PoolEvent::AddressChange {
peer_id,
@ -928,15 +909,13 @@ where
new_endpoint,
old_endpoint,
} => {
if !self.banned_peer_connections.contains(&id) {
self.behaviour
.on_swarm_event(FromSwarm::AddressChange(AddressChange {
peer_id,
connection_id: id,
old: &old_endpoint,
new: &new_endpoint,
}));
}
self.behaviour
.on_swarm_event(FromSwarm::AddressChange(AddressChange {
peer_id,
connection_id: id,
old: &old_endpoint,
new: &new_endpoint,
}));
}
}
@ -957,30 +936,30 @@ where
local_addr,
send_back_addr,
} => {
let handler = self.behaviour.new_handler();
let connection_id = ConnectionId::next();
match self.pool.add_incoming(
upgrade,
handler,
IncomingInfo {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
},
connection_id,
) {
Ok(_connection_id) => {
Ok(()) => {
return Some(SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
});
}
Err((connection_limit, handler)) => {
Err(connection_limit) => {
let error = ListenError::ConnectionLimit(connection_limit);
self.behaviour
.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
error: &error,
handler,
connection_id,
}));
log::warn!("Incoming connection rejected: {:?}", connection_limit);
}
@ -1063,15 +1042,15 @@ where
fn handle_behaviour_event(
&mut self,
event: NetworkBehaviourAction<TBehaviour::OutEvent, TBehaviour::ConnectionHandler>,
event: NetworkBehaviourAction<TBehaviour::OutEvent, THandlerInEvent<TBehaviour>>,
) -> Option<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
match event {
NetworkBehaviourAction::GenerateEvent(event) => {
return Some(SwarmEvent::Behaviour(event))
}
NetworkBehaviourAction::Dial { opts, handler } => {
NetworkBehaviourAction::Dial { opts } => {
let peer_id = opts.get_or_parse_peer_id();
if let Ok(()) = self.dial_with_handler(opts, handler) {
if let Ok(()) = self.dial(opts) {
if let Ok(Some(peer_id)) = peer_id {
return Some(SwarmEvent::Dialing(peer_id));
}
@ -1571,7 +1550,6 @@ where
listened_addrs: HashMap::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
banned_peer_connections: HashSet::new(),
pending_event: None,
}
}
@ -1972,24 +1950,22 @@ mod tests {
{
// Setup to test that new connections of banned peers are not reported.
swarm1.dial(addr2.clone()).unwrap();
s1_expected_conns += 1;
stage = Stage::BannedDial;
}
}
Stage::BannedDial => {
if swarm2.network_info().num_peers() == 1 {
// The banned connection was established. Check that it was not reported to
// the behaviour of the banning swarm.
assert_eq!(
swarm2.behaviour.on_connection_established.len(), s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
// The banned connection was established. Check that it was not reported to
// the behaviour of the banning swarm.
assert_eq!(
swarm2.behaviour.on_connection_established.len(),
s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
// Setup to test that the banned connection is not reported upon closing
// even if the peer is unbanned.
swarm2.unban_peer_id(swarm1_id);
stage = Stage::Unbanned;
}
// Setup to test that the banned connection is not reported upon closing
// even if the peer is unbanned.
swarm2.unban_peer_id(swarm1_id);
stage = Stage::Unbanned;
}
Stage::Unbanned => {
if swarm2.network_info().num_peers() == 0 {
@ -1998,7 +1974,6 @@ mod tests {
swarm2.behaviour.on_connection_closed.len(), s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
assert!(swarm2.banned_peer_connections.is_empty());
// Setup to test that a ban lifted does not affect future connections.
for _ in 0..num_connections {