swarm/: Allow disconnecting from Swarm and NetworkBehaviour (#2110)

Add `ExpandedSwarm::disconnect_peer_id` and
`NetworkBehaviourAction::CloseConnection` to close connections to a specific
peer via an `ExpandedSwarm` or `NetworkBehaviour`.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Sergey O. Boyko 2021-07-03 00:35:51 +07:00 committed by GitHub
parent f9491e7e81
commit 4eb0659e4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 376 additions and 40 deletions

View File

@ -3202,6 +3202,9 @@ where
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
NetworkBehaviourAction::ReportObservedAddr { address, score }
}
NetworkBehaviourAction::CloseConnection { peer_id, connection } => {
NetworkBehaviourAction::CloseConnection { peer_id, connection }
}
});
}

View File

@ -657,7 +657,9 @@ where
| NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
| NetworkBehaviourAction::ReportObservedAddr { address, score } =>
NetworkBehaviourAction::ReportObservedAddr { address, score }
NetworkBehaviourAction::ReportObservedAddr { address, score },
| NetworkBehaviourAction::CloseConnection { peer_id, connection } =>
NetworkBehaviourAction::CloseConnection { peer_id, connection }
};
return Poll::Ready(event)

View File

@ -479,6 +479,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address, score }) => {
return std::task::Poll::Ready(#network_behaviour_action::ReportObservedAddr { address, score });
}
std::task::Poll::Ready(#network_behaviour_action::CloseConnection { peer_id, connection }) => {
return std::task::Poll::Ready(#network_behaviour_action::CloseConnection { peer_id, connection });
}
std::task::Poll::Pending => break,
}
}

View File

@ -15,7 +15,12 @@
See [PR 2100] for details.
- Add `ExpandedSwarm::disconnect_peer_id` and
`NetworkBehaviourAction::CloseConnection` to close connections to a specific
peer via an `ExpandedSwarm` or `NetworkBehaviour`. See [PR 2110] for details.
[PR 2100]: https://github.com/libp2p/rust-libp2p/pull/2100
[PR 2110]: https://github.com/libp2p/rust-libp2p/pull/2110/
# 0.29.0 [2021-04-13]

View File

@ -293,6 +293,23 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
/// relative to other observed addresses.
score: AddressScore,
},
/// Instructs the `Swarm` to initiate a graceful close of one or all connections
/// with the given peer.
///
/// Note: Closing a connection via
/// [`NetworkBehaviourAction::CloseConnection`] does not inform the
/// corresponding [`ProtocolsHandler`].
/// Closing a connection via a [`ProtocolsHandler`] can be done
/// either in a collaborative manner across [`ProtocolsHandler`]s
/// with [`ProtocolsHandler::connection_keep_alive`] or directly with
/// [`ProtocolsHandlerEvent::Close`](crate::ProtocolsHandlerEvent::Close).
CloseConnection {
/// The peer to disconnect.
peer_id: PeerId,
/// Whether to close a specific or all connections to the given peer.
connection: CloseConnection,
}
}
impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
@ -312,7 +329,9 @@ impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
event: f(event)
},
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
NetworkBehaviourAction::ReportObservedAddr { address, score }
NetworkBehaviourAction::ReportObservedAddr { address, score },
NetworkBehaviourAction::CloseConnection { peer_id, connection } =>
NetworkBehaviourAction::CloseConnection { peer_id, connection }
}
}
@ -328,7 +347,9 @@ impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
NetworkBehaviourAction::ReportObservedAddr { address, score } =>
NetworkBehaviourAction::ReportObservedAddr { address, score }
NetworkBehaviourAction::ReportObservedAddr { address, score },
NetworkBehaviourAction::CloseConnection { peer_id, connection } =>
NetworkBehaviourAction::CloseConnection { peer_id, connection }
}
}
}
@ -373,3 +394,18 @@ impl Default for DialPeerCondition {
DialPeerCondition::Disconnected
}
}
/// The options which connections to close.
#[derive(Debug, Clone)]
pub enum CloseConnection {
/// Disconnect a particular connection.
One(ConnectionId),
/// Disconnect all connections.
All,
}
impl Default for CloseConnection {
fn default() -> Self {
CloseConnection::All
}
}

View File

@ -68,7 +68,8 @@ pub use behaviour::{
NetworkBehaviourEventProcess,
PollParameters,
NotifyHandler,
DialPeerCondition
DialPeerCondition,
CloseConnection
};
pub use protocols_handler::{
IntoProtocolsHandler,
@ -463,6 +464,25 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
self.banned_peers.remove(&peer_id);
}
/// Disconnects a peer by its peer ID, closing all connections to said peer.
///
/// Returns `Ok(())` if there was one or more established connections to the peer.
///
/// Note: Closing a connection via [`ExpandedSwarm::disconnect_peer_id`] does
/// not inform the corresponding [`ProtocolsHandler`].
/// Closing a connection via a [`ProtocolsHandler`] can be done either in a
/// collaborative manner across [`ProtocolsHandler`]s
/// with [`ProtocolsHandler::connection_keep_alive`] or directly with
/// [`ProtocolsHandlerEvent::Close`].
pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
if let Some(peer) = self.network.peer(peer_id).into_connected() {
peer.disconnect();
return Ok(());
}
Err(())
}
/// Checks whether the [`Network`] has an established connection to a peer.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
self.network.is_connected(peer_id)
@ -737,6 +757,20 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
this.add_external_address(addr, score);
}
},
Poll::Ready(NetworkBehaviourAction::CloseConnection { peer_id, connection }) => {
if let Some(mut peer) = this.network.peer(peer_id).into_connected() {
match connection {
CloseConnection::One(connection_id) => {
if let Some(conn) = peer.connection(connection_id) {
conn.start_close();
}
}
CloseConnection::All => {
peer.disconnect();
}
}
}
},
}
}
}
@ -838,7 +872,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler:
THandler::Handler:
ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
{
@ -1135,6 +1169,13 @@ mod tests {
use libp2p_noise as noise;
use super::*;
// Test execution state.
// Connection => Disconnecting => Connecting.
enum State {
Connecting,
Disconnecting,
}
fn new_test_swarm<T, O>(handler_proto: T) -> Swarm<CallTraceBehaviour<MockBehaviour<T, O>>>
where
T: ProtocolsHandler + Clone,
@ -1153,7 +1194,53 @@ mod tests {
SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
}
/// Establishes a number of connections between two peers,
fn swarms_connected<TBehaviour>(
swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
num_connections: usize,
) -> bool
where
TBehaviour: NetworkBehaviour,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent: Clone,
{
for s in &[swarm1, swarm2] {
if s.behaviour.inject_connection_established.len() > 0 {
assert_eq!(s.behaviour.inject_connected.len(), 1);
} else {
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
assert!(s.behaviour.inject_connection_closed.is_empty());
assert!(s.behaviour.inject_disconnected.is_empty());
}
[swarm1, swarm2]
.iter()
.all(|s| s.behaviour.inject_connection_established.len() == num_connections)
}
fn swarms_disconnected<TBehaviour: NetworkBehaviour>(
swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
num_connections: usize,
) -> bool
where
TBehaviour: NetworkBehaviour,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent: Clone
{
for s in &[swarm1, swarm2] {
if s.behaviour.inject_connection_closed.len() < num_connections {
assert_eq!(s.behaviour.inject_disconnected.len(), 0);
} else {
assert_eq!(s.behaviour.inject_disconnected.len(), 1);
}
assert_eq!(s.behaviour.inject_connection_established.len(), 0);
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
[swarm1, swarm2]
.iter()
.all(|s| s.behaviour.inject_connection_closed.len() == num_connections)
}
/// Establishes multiple connections between two peers,
/// after which one peer bans the other.
///
/// The test expects both behaviours to be notified via pairs of
@ -1163,8 +1250,7 @@ mod tests {
fn test_connect_disconnect_ban() {
// Since the test does not try to open any substreams, we can
// use the dummy protocols handler.
let mut handler_proto = DummyProtocolsHandler::default();
handler_proto.keep_alive = KeepAlive::Yes;
let handler_proto = DummyProtocolsHandler { keep_alive: KeepAlive::Yes };
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
@ -1175,12 +1261,6 @@ mod tests {
swarm1.listen_on(addr1.clone().into()).unwrap();
swarm2.listen_on(addr2.clone().into()).unwrap();
// Test execution state. Connection => Disconnecting => Connecting.
enum State {
Connecting,
Disconnecting,
}
let swarm1_id = *swarm1.local_peer_id();
let mut banned = false;
@ -1188,7 +1268,7 @@ mod tests {
let num_connections = 10;
for _ in 0 .. num_connections {
for _ in 0..num_connections {
swarm1.dial_addr(addr2.clone()).unwrap();
}
let mut state = State::Connecting;
@ -1199,18 +1279,7 @@ mod tests {
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
for s in &[&swarm1, &swarm2] {
if s.behaviour.inject_connection_established.len() > 0 {
assert_eq!(s.behaviour.inject_connected.len(), 1);
} else {
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
assert!(s.behaviour.inject_connection_closed.len() == 0);
assert!(s.behaviour.inject_disconnected.len() == 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_established.len() == num_connections
}) {
if swarms_connected(&swarm1, &swarm2, num_connections) {
if banned {
return Poll::Ready(())
}
@ -1222,18 +1291,7 @@ mod tests {
}
}
State::Disconnecting => {
for s in &[&swarm1, &swarm2] {
if s.behaviour.inject_connection_closed.len() < num_connections {
assert_eq!(s.behaviour.inject_disconnected.len(), 0);
} else {
assert_eq!(s.behaviour.inject_disconnected.len(), 1);
}
assert_eq!(s.behaviour.inject_connection_established.len(), 0);
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_closed.len() == num_connections
}) {
if swarms_disconnected(&swarm1, &swarm2, num_connections) {
if unbanned {
return Poll::Ready(())
}
@ -1242,7 +1300,7 @@ mod tests {
swarm1.behaviour.reset();
swarm2.behaviour.reset();
unbanned = true;
for _ in 0 .. num_connections {
for _ in 0..num_connections {
swarm2.dial_addr(addr1.clone()).unwrap();
}
state = State::Connecting;
@ -1256,4 +1314,231 @@ mod tests {
}
}))
}
/// Establishes multiple connections between two peers,
/// after which one peer disconnects the other using [`ExpandedSwarm::disconnect_peer_id`].
///
/// The test expects both behaviours to be notified via pairs of
/// inject_connected / inject_disconnected as well as
/// inject_connection_established / inject_connection_closed calls.
#[test]
fn test_swarm_disconnect() {
// Since the test does not try to open any substreams, we can
// use the dummy protocols handler.
let handler_proto = DummyProtocolsHandler { keep_alive: KeepAlive::Yes };
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone().into()).unwrap();
swarm2.listen_on(addr2.clone().into()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
let mut reconnected = false;
let num_connections = 10;
for _ in 0..num_connections {
swarm1.dial_addr(addr2.clone()).unwrap();
}
let mut state = State::Connecting;
executor::block_on(future::poll_fn(move |cx| {
loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
if swarms_connected(&swarm1, &swarm2, num_connections) {
if reconnected {
return Poll::Ready(())
}
swarm2.disconnect_peer_id(swarm1_id.clone()).expect("Error disconnecting");
swarm1.behaviour.reset();
swarm2.behaviour.reset();
state = State::Disconnecting;
}
}
State::Disconnecting => {
if swarms_disconnected(&swarm1, &swarm2, num_connections) {
if reconnected {
return Poll::Ready(())
}
reconnected = true;
swarm1.behaviour.reset();
swarm2.behaviour.reset();
for _ in 0..num_connections {
swarm2.dial_addr(addr1.clone()).unwrap();
}
state = State::Connecting;
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending
}
}
}))
}
/// Establishes multiple connections between two peers,
/// after which one peer disconnects the other
/// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`].
///
/// The test expects both behaviours to be notified via pairs of
/// inject_connected / inject_disconnected as well as
/// inject_connection_established / inject_connection_closed calls.
#[test]
fn test_behaviour_disconnect_all() {
// Since the test does not try to open any substreams, we can
// use the dummy protocols handler.
let handler_proto = DummyProtocolsHandler { keep_alive: KeepAlive::Yes };
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone().into()).unwrap();
swarm2.listen_on(addr2.clone().into()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
let mut reconnected = false;
let num_connections = 10;
for _ in 0..num_connections {
swarm1.dial_addr(addr2.clone()).unwrap();
}
let mut state = State::Connecting;
executor::block_on(future::poll_fn(move |cx| {
loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
if swarms_connected(&swarm1, &swarm2, num_connections) {
if reconnected {
return Poll::Ready(())
}
swarm2
.behaviour
.inner()
.next_action
.replace(NetworkBehaviourAction::CloseConnection {
peer_id: swarm1_id.clone(),
connection: CloseConnection::All,
});
swarm1.behaviour.reset();
swarm2.behaviour.reset();
state = State::Disconnecting;
}
}
State::Disconnecting => {
if swarms_disconnected(&swarm1, &swarm2, num_connections) {
if reconnected {
return Poll::Ready(())
}
reconnected = true;
swarm1.behaviour.reset();
swarm2.behaviour.reset();
for _ in 0..num_connections {
swarm2.dial_addr(addr1.clone()).unwrap();
}
state = State::Connecting;
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending
}
}
}))
}
/// Establishes multiple connections between two peers,
/// after which one peer closes a single connection
/// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`].
///
/// The test expects both behaviours to be notified via pairs of
/// inject_connected / inject_disconnected as well as
/// inject_connection_established / inject_connection_closed calls.
#[test]
fn test_behaviour_disconnect_one() {
// Since the test does not try to open any substreams, we can
// use the dummy protocols handler.
let handler_proto = DummyProtocolsHandler { keep_alive: KeepAlive::Yes };
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
swarm1.listen_on(addr1.clone().into()).unwrap();
swarm2.listen_on(addr2.clone().into()).unwrap();
let swarm1_id = *swarm1.local_peer_id();
let num_connections = 10;
for _ in 0..num_connections {
swarm1.dial_addr(addr2.clone()).unwrap();
}
let mut state = State::Connecting;
let mut disconnected_conn_id = None;
executor::block_on(future::poll_fn(move |cx| {
loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
if swarms_connected(&swarm1, &swarm2, num_connections) {
disconnected_conn_id = {
let conn_id = swarm2.behaviour.inject_connection_established[num_connections / 2].1;
swarm2
.behaviour
.inner()
.next_action
.replace(NetworkBehaviourAction::CloseConnection {
peer_id: swarm1_id.clone(),
connection: CloseConnection::One(conn_id),
});
Some(conn_id)
};
swarm1.behaviour.reset();
swarm2.behaviour.reset();
state = State::Disconnecting;
}
}
State::Disconnecting => {
for s in &[&swarm1, &swarm2] {
assert_eq!(s.behaviour.inject_disconnected.len(), 0);
assert_eq!(s.behaviour.inject_connection_established.len(), 0);
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_closed.len() == 1
}) {
let conn_id = swarm2.behaviour.inject_connection_closed[0].1;
assert_eq!(Some(conn_id), disconnected_conn_id);
return Poll::Ready(());
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending
}
}
}))
}
}

View File

@ -167,6 +167,8 @@ where
self.inject_listener_closed = Vec::new();
self.poll = 0;
}
pub fn inner(&mut self) -> &mut TInner { &mut self.inner }
}
impl<TInner> NetworkBehaviour for CallTraceBehaviour<TInner>