swarm/: Patch reporting on banned peer connections (#2350)

Don't report events of a connection to the `NetworkBehaviour`, if connection has
been established while the remote peer was banned. Among other guarantees this
upholds that `NetworkBehaviour::inject_event` is never called without a previous
`NetworkBehaviour::inject_connection_established` for said connection.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Divma
2021-11-26 10:48:12 -05:00
committed by GitHub
parent f0000d56cd
commit fd417517ca
49 changed files with 437 additions and 193 deletions

View File

@ -97,6 +97,7 @@ use smallvec::SmallVec;
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
use std::{
convert::TryFrom,
error, fmt, io,
pin::Pin,
task::{Context, Poll},
@ -275,6 +276,12 @@ where
/// List of nodes for which we deny any incoming connection.
banned_peers: HashSet<PeerId>,
/// Connections for which we withhold any reporting. These belong to banned peers.
///
/// Note: Connections to a peer that are established at the time of banning that peer
/// are not added here. Instead they are simply closed.
banned_peer_connections: HashSet<ConnectionId>,
/// Pending event to be delivered to connection handlers
/// (or dropped if the peer disconnected) before the `behaviour`
/// can be polled again.
@ -540,6 +547,10 @@ where
pub fn ban_peer_id(&mut self, peer_id: PeerId) {
if self.banned_peers.insert(peer_id) {
if let Some(peer) = self.network.peer(peer_id).into_connected() {
// Note that established connections to the now banned peer are closed but not
// added to [`Swarm::banned_peer_connections`]. They have been previously reported
// as open to the behaviour and need be reported as closed once closing the
// connection finishes.
peer.disconnect();
}
}
@ -603,8 +614,12 @@ where
Poll::Pending => network_not_ready = true,
Poll::Ready(NetworkEvent::ConnectionEvent { connection, event }) => {
let peer = connection.peer_id();
let connection = connection.id();
this.behaviour.inject_event(peer, connection, event);
let conn_id = connection.id();
if this.banned_peer_connections.contains(&conn_id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer, conn_id);
} else {
this.behaviour.inject_event(peer, conn_id, event);
}
}
Poll::Ready(NetworkEvent::AddressChange {
connection,
@ -612,22 +627,27 @@ where
old_endpoint,
}) => {
let peer = connection.peer_id();
let connection = connection.id();
this.behaviour.inject_address_change(
&peer,
&connection,
&old_endpoint,
&new_endpoint,
);
let conn_id = connection.id();
if !this.banned_peer_connections.contains(&conn_id) {
this.behaviour.inject_address_change(
&peer,
&conn_id,
&old_endpoint,
&new_endpoint,
);
}
}
Poll::Ready(NetworkEvent::ConnectionEstablished {
connection,
num_established,
other_established_connection_ids,
concurrent_dial_errors,
}) => {
let peer_id = connection.peer_id();
let endpoint = connection.endpoint().clone();
if this.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any
// future events from the connection to the behaviour.
this.banned_peer_connections.insert(connection.id());
this.network
.peer(peer_id)
.into_connected()
@ -635,6 +655,11 @@ where
.disconnect();
return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
let num_established = NonZeroU32::new(
u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
)
.expect("n + 1 is always non-zero; qed");
log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}.",
connection.peer_id(),
@ -651,7 +676,13 @@ where
&endpoint,
failed_addresses.as_ref(),
);
if num_established.get() == 1 {
// The peer is not banned, but there could be previous banned connections
// if the peer was just unbanned. Check if this is the first non-banned
// connection.
let first_non_banned = other_established_connection_ids
.into_iter()
.all(|conn_id| this.banned_peer_connections.contains(&conn_id));
if first_non_banned {
this.behaviour.inject_connected(&peer_id);
}
return Poll::Ready(SwarmEvent::ConnectionEstablished {
@ -666,7 +697,7 @@ where
id,
connected,
error,
num_established,
remaining_established_connection_ids,
handler,
}) => {
if let Some(error) = error.as_ref() {
@ -676,14 +707,26 @@ where
}
let peer_id = connected.peer_id;
let endpoint = connected.endpoint;
this.behaviour.inject_connection_closed(
&peer_id,
&id,
&endpoint,
handler.into_protocols_handler(),
);
if num_established == 0 {
this.behaviour.inject_disconnected(&peer_id);
let num_established =
u32::try_from(remaining_established_connection_ids.len()).unwrap();
let conn_was_reported = !this.banned_peer_connections.remove(&id);
if conn_was_reported {
this.behaviour.inject_connection_closed(
&peer_id,
&id,
&endpoint,
handler.into_protocols_handler(),
);
// This connection was reported as open to the behaviour. Check if this is
// the last non-banned connection for the peer.
let last_non_banned = remaining_established_connection_ids
.into_iter()
.all(|conn_id| this.banned_peer_connections.contains(&conn_id));
if last_non_banned {
this.behaviour.inject_disconnected(&peer_id)
}
}
return Poll::Ready(SwarmEvent::ConnectionClosed {
peer_id,
@ -1253,6 +1296,7 @@ where
listened_addrs: SmallVec::new(),
external_addrs: Addresses::default(),
banned_peers: HashSet::new(),
banned_peer_connections: HashSet::new(),
pending_event: None,
substream_upgrade_protocol_override: self.substream_upgrade_protocol_override,
}
@ -1450,15 +1494,6 @@ mod tests {
TBehaviour: NetworkBehaviour,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent: Clone,
{
for s in &[swarm1, swarm2] {
if s.behaviour.inject_connection_established.len() > 0 {
assert_eq!(s.behaviour.inject_connected.len(), 1);
} else {
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
assert!(s.behaviour.inject_connection_closed.is_empty());
assert!(s.behaviour.inject_disconnected.is_empty());
}
[swarm1, swarm2]
.iter()
.all(|s| s.behaviour.inject_connection_established.len() == num_connections)
@ -1473,10 +1508,6 @@ mod tests {
TBehaviour: NetworkBehaviour,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent: Clone
{
for s in &[swarm1, swarm2] {
assert_eq!(s.behaviour.inject_connection_established.len(), 0);
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
[swarm1, swarm2]
.iter()
.all(|s| s.behaviour.inject_connection_closed.len() == num_connections)
@ -1490,7 +1521,12 @@ mod tests {
///
/// The test expects both behaviours to be notified via pairs of
/// inject_connected / inject_disconnected as well as
/// inject_connection_established / inject_connection_closed calls.
/// inject_connection_established / inject_connection_closed calls
/// while unbanned.
///
/// While the ban is in effect, further dials occur. For these connections no
/// `inject_connected`, `inject_connection_established`, `inject_disconnected`,
/// `inject_connection_closed` calls should be registered.
#[test]
fn test_connect_disconnect_ban() {
// Since the test does not try to open any substreams, we can
@ -1505,59 +1541,103 @@ mod tests {
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone().into()).unwrap();
swarm2.listen_on(addr2.clone().into()).unwrap();
swarm1.listen_on(addr1.clone()).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
let mut banned = false;
let mut unbanned = false;
enum Stage {
/// Waiting for the peers to connect. Banning has not occurred.
Connecting,
/// Ban occurred.
Banned,
// Ban is in place and a dial is ongoing.
BannedDial,
// Mid-ban dial was registered and the peer was unbanned.
Unbanned,
// There are dial attempts ongoing for the no longer banned peers.
Reconnecting,
}
let num_connections = 10;
for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}
let mut state = State::Connecting;
executor::block_on(future::poll_fn(move |cx| {
loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
if swarms_connected(&swarm1, &swarm2, num_connections) {
if banned {
return Poll::Ready(());
}
swarm2.ban_peer_id(swarm1_id.clone());
swarm1.behaviour.reset();
swarm2.behaviour.reset();
banned = true;
state = State::Disconnecting;
}
}
State::Disconnecting => {
if swarms_disconnected(&swarm1, &swarm2, num_connections) {
if unbanned {
return Poll::Ready(());
}
// Unban the first peer and reconnect.
swarm2.unban_peer_id(swarm1_id.clone());
swarm1.behaviour.reset();
swarm2.behaviour.reset();
unbanned = true;
for _ in 0..num_connections {
swarm2.dial(addr1.clone()).unwrap();
}
state = State::Connecting;
}
let mut s1_expected_conns = num_connections;
let mut s2_expected_conns = num_connections;
let mut stage = Stage::Connecting;
executor::block_on(future::poll_fn(move |cx| loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match stage {
Stage::Connecting => {
if swarm1.behaviour.assert_connected(s1_expected_conns, 1)
&& swarm2.behaviour.assert_connected(s2_expected_conns, 1)
{
// Setup to test that already established connections are correctly closed
// and reported as such after the peer is banned.
swarm2.ban_peer_id(swarm1_id);
stage = Stage::Banned;
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending;
Stage::Banned => {
if swarm1.behaviour.assert_disconnected(s1_expected_conns, 1)
&& swarm2.behaviour.assert_disconnected(s2_expected_conns, 1)
{
// Setup to test that new connections of banned peers are not reported.
swarm1.dial(addr2.clone()).unwrap();
s1_expected_conns += 1;
stage = Stage::BannedDial;
}
}
Stage::BannedDial => {
if swarm2.network_info().num_peers() == 1 {
// The banned connection was established. Check that it was not reported to
// the behaviour of the banning swarm.
assert_eq!(
swarm2.behaviour.inject_connection_established.len(), s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
// Setup to test that the banned connection is not reported upon closing
// even if the peer is unbanned.
swarm2.unban_peer_id(swarm1_id);
stage = Stage::Unbanned;
}
}
Stage::Unbanned => {
if swarm2.network_info().num_peers() == 0 {
// The banned connection has closed. Check that it was not reported.
assert_eq!(
swarm2.behaviour.inject_connection_closed.len(), s2_expected_conns,
"No additional closed connections should be reported for the banned peer"
);
assert!(swarm2.banned_peer_connections.is_empty());
// Setup to test that a ban lifted does not affect future connections.
for _ in 0..num_connections {
swarm1.dial(addr2.clone()).unwrap();
}
s1_expected_conns += num_connections;
s2_expected_conns += num_connections;
stage = Stage::Reconnecting;
}
}
Stage::Reconnecting => {
if swarm1.behaviour.inject_connection_established.len() == s1_expected_conns
&& swarm2.behaviour.assert_connected(s2_expected_conns, 2)
{
return Poll::Ready(());
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending;
}
}))
}
@ -1582,8 +1662,8 @@ mod tests {
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone().into()).unwrap();
swarm2.listen_on(addr2.clone().into()).unwrap();
swarm1.listen_on(addr1.clone()).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
@ -1605,10 +1685,8 @@ mod tests {
return Poll::Ready(());
}
swarm2
.disconnect_peer_id(swarm1_id.clone())
.disconnect_peer_id(swarm1_id)
.expect("Error disconnecting");
swarm1.behaviour.reset();
swarm2.behaviour.reset();
state = State::Disconnecting;
}
}
@ -1618,8 +1696,6 @@ mod tests {
return Poll::Ready(());
}
reconnected = true;
swarm1.behaviour.reset();
swarm2.behaviour.reset();
for _ in 0..num_connections {
swarm2.dial(addr1.clone()).unwrap();
}
@ -1655,8 +1731,8 @@ mod tests {
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone().into()).unwrap();
swarm2.listen_on(addr2.clone().into()).unwrap();
swarm1.listen_on(addr1.clone()).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
@ -1679,12 +1755,10 @@ mod tests {
}
swarm2.behaviour.inner().next_action.replace(
NetworkBehaviourAction::CloseConnection {
peer_id: swarm1_id.clone(),
peer_id: swarm1_id,
connection: CloseConnection::All,
},
);
swarm1.behaviour.reset();
swarm2.behaviour.reset();
state = State::Disconnecting;
}
}
@ -1694,8 +1768,6 @@ mod tests {
return Poll::Ready(());
}
reconnected = true;
swarm1.behaviour.reset();
swarm2.behaviour.reset();
for _ in 0..num_connections {
swarm2.dial(addr1.clone()).unwrap();
}
@ -1731,8 +1803,8 @@ mod tests {
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone().into()).unwrap();
swarm2.listen_on(addr2.clone().into()).unwrap();
swarm1.listen_on(addr1.clone()).unwrap();
swarm2.listen_on(addr2.clone()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
@ -1756,22 +1828,23 @@ mod tests {
.1;
swarm2.behaviour.inner().next_action.replace(
NetworkBehaviourAction::CloseConnection {
peer_id: swarm1_id.clone(),
peer_id: swarm1_id,
connection: CloseConnection::One(conn_id),
},
);
Some(conn_id)
};
swarm1.behaviour.reset();
swarm2.behaviour.reset();
state = State::Disconnecting;
}
}
State::Disconnecting => {
for s in &[&swarm1, &swarm2] {
assert_eq!(s.behaviour.inject_disconnected.len(), 0);
assert_eq!(s.behaviour.inject_connection_established.len(), 0);
assert_eq!(s.behaviour.inject_connected.len(), 0);
assert_eq!(
s.behaviour.inject_connection_established.len(),
num_connections
);
assert_eq!(s.behaviour.inject_connected.len(), 1);
}
if [&swarm1, &swarm2]
.iter()