[core/swarm] Emit events for active connection close and fix disconnect(). (#1619)

* Emit events for active connection close and fix `disconnect()`.

The `Network` does currently not emit events for actively
closed connections, e.g. via `EstablishedConnection::close`
or `ConnectedPeer::disconnect()`. As a result, when actively
closing connections, there will be `ConnectionEstablished`
events emitted without eventually a matching `ConnectionClosed`
event. This seems undesirable and has the consequence that
the `Swarm::ban_peer_id` feature in `libp2p-swarm` does not
result in appropriate calls to `NetworkBehaviour::inject_connection_closed`
and `NetworkBehaviour::inject_disconnected`. Furthermore,
the `disconnect()` functionality in `libp2p-core` is currently
broken as it leaves the `Pool` in an inconsistent state.

This commit does the following:

  1. When connection background tasks are dropped
     (i.e. removed from the `Manager`), they
     always terminate immediately, without attempting
     an orderly close of the connection.
  2. An orderly close is sent to the background task
     of a connection as a regular command. The
     background task emits a `Closed` event
     before terminating.
  3. `Pool::disconnect()` removes all connection
     tasks for the affected peer from the `Manager`,
     i.e. without an orderly close, thereby also
     fixing the discovered state inconsistency
     due to not removing the corresponding entries
     in the `Pool` itself after removing them from
     the `Manager`.
  4. A new test is added to `libp2p-swarm` that
     exercises the ban/unban functionality and
     places assertions on the number and order
     of calls to the `NetworkBehaviour`. In that
     context some new testing utilities have
     been added to `libp2p-swarm`.

This addresses https://github.com/libp2p/rust-libp2p/issues/1584.

* Update swarm/src/lib.rs

Co-authored-by: Toralf Wittner <tw@dtex.org>

* Incorporate some review feedback.

* Adapt to changes in master.

* More verbose panic messages.

* Simplify

There is no need for a `StartClose` future.

* Fix doc links.

* Further small cleanup.

* Update CHANGELOGs and versions.

Co-authored-by: Toralf Wittner <tw@dtex.org>
This commit is contained in:
Roman Borschel
2020-08-04 11:30:09 +02:00
committed by GitHub
parent 67f1b94907
commit 8e1d4edb8b
51 changed files with 886 additions and 308 deletions

View File

@ -55,6 +55,8 @@
mod behaviour;
mod registry;
#[cfg(test)]
mod test;
mod upgrade;
pub mod protocols_handler;
@ -156,7 +158,8 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
/// opened.
num_established: NonZeroU32,
},
/// A connection with the given peer has been closed.
/// A connection with the given peer has been closed,
/// possibly as a result of an error.
ConnectionClosed {
/// Identity of the peer that we have connected to.
peer_id: PeerId,
@ -164,8 +167,9 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
endpoint: ConnectedPoint,
/// Number of other remaining connections to this same peer.
num_established: u32,
/// Reason for the disconnection.
cause: ConnectionError<NodeHandlerWrapperError<THandleErr>>,
/// Reason for the disconnection, if it was not a successful
/// active close.
cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
},
/// A new connection arrived on a listener and is in the process of protocol negotiation.
///
@ -366,22 +370,19 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
me.network.remove_listener(id)
}
/// Tries to dial the given address.
///
/// Returns an error if the address is not supported.
/// Initiates a new dialing attempt to the given address.
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
let handler = me.behaviour.new_handler();
me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ())
}
/// Tries to initiate a dialing attempt to the given peer.
///
/// If a new dialing attempt has been initiated, `Ok(true)` is returned.
///
/// If no new dialing attempt has been initiated, meaning there is an ongoing
/// dialing attempt or `addresses_of_peer` reports no addresses, `Ok(false)`
/// is returned.
/// Initiates a new dialing attempt to the given peer.
pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> {
if me.banned_peers.contains(peer_id) {
me.behaviour.inject_dial_failure(peer_id);
return Err(DialError::Banned)
}
let self_listening = &me.listened_addrs;
let mut addrs = me.behaviour.addresses_of_peer(peer_id)
.into_iter()
@ -446,11 +447,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// Bans a peer by its peer ID.
///
/// Any incoming connection and any dialing attempt will immediately be rejected.
/// This function has no effect is the peer is already banned.
/// This function has no effect if the peer is already banned.
pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) {
me.banned_peers.insert(peer_id.clone());
if let Some(c) = me.network.peer(peer_id).into_connected() {
c.disconnect();
if me.banned_peers.insert(peer_id.clone()) {
if let Some(peer) = me.network.peer(peer_id).into_connected() {
peer.disconnect();
}
}
}
@ -529,8 +531,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
});
}
},
Poll::Ready(NetworkEvent::ConnectionError { id, connected, error, num_established }) => {
log::debug!("Connection {:?} closed: {:?}", connected, error);
Poll::Ready(NetworkEvent::ConnectionClosed { id, connected, error, num_established }) => {
if let Some(error) = error.as_ref() {
log::debug!("Connection {:?} closed: {:?}", connected, error);
} else {
log::debug!("Connection {:?} closed (active close).", connected);
}
let info = connected.info;
let endpoint = connected.endpoint;
this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint);
@ -776,14 +782,13 @@ enum PendingNotifyHandler {
///
/// Returns `None` if the connection is closing or the event has been
/// successfully sent, in either case the event is consumed.
fn notify_one<'a, TInEvent, TConnInfo, TPeerId>(
conn: &mut EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>,
fn notify_one<'a, TInEvent, TConnInfo>(
conn: &mut EstablishedConnection<'a, TInEvent, TConnInfo>,
event: TInEvent,
cx: &mut Context<'_>,
) -> Option<TInEvent>
where
TPeerId: Eq + std::hash::Hash + Clone,
TConnInfo: ConnectionInfo<PeerId = TPeerId>
TConnInfo: ConnectionInfo
{
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => Some(event),
@ -1124,6 +1129,8 @@ where TBehaviour: NetworkBehaviour,
/// The possible failures of [`ExpandedSwarm::dial`].
#[derive(Debug)]
pub enum DialError {
/// The peer is currently banned.
Banned,
/// The configured limit for simultaneous outgoing connections
/// has been reached.
ConnectionLimit(ConnectionLimit),
@ -1136,7 +1143,8 @@ impl fmt::Display for DialError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer.")
DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
DialError::Banned => write!(f, "Dial error: peer is banned.")
}
}
}
@ -1145,7 +1153,8 @@ impl error::Error for DialError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
DialError::ConnectionLimit(err) => Some(err),
DialError::NoAddresses => None
DialError::NoAddresses => None,
DialError::Banned => None
}
}
}
@ -1184,24 +1193,45 @@ impl NetworkBehaviour for DummyBehaviour {
{
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use crate::{DummyBehaviour, SwarmBuilder};
use crate::protocols_handler::DummyProtocolsHandler;
use crate::test::{MockBehaviour, CallTraceBehaviour};
use futures::{future, executor};
use libp2p_core::{
PeerId,
PublicKey,
identity,
transport::dummy::{DummyStream, DummyTransport}
upgrade,
multiaddr,
transport::{self, dummy::*}
};
use libp2p_mplex::Multiplex;
use super::*;
fn get_random_id() -> PublicKey {
fn get_random_id() -> identity::PublicKey {
identity::Keypair::generate_ed25519().public()
}
fn new_test_swarm<T, O>(handler_proto: T) -> Swarm<CallTraceBehaviour<MockBehaviour<T, O>>>
where
T: ProtocolsHandler + Clone,
T::OutEvent: Clone,
O: Send + 'static
{
let keypair1 = identity::Keypair::generate_ed25519();
let pubkey1 = keypair1.public();
let transport1 = transport::MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(keypair1))
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();
let behaviour1 = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
SwarmBuilder::new(transport1, behaviour1, pubkey1.into()).build()
}
#[test]
fn test_build_swarm() {
let id = get_random_id();
@ -1220,4 +1250,108 @@ mod tests {
let swarm = SwarmBuilder::new(transport, DummyBehaviour {}, id.into()).build();
assert!(swarm.network.incoming_limit().is_none())
}
/// Establishes a number of connections between two peers,
/// after which one peer bans the other.
///
/// The test expects both behaviours to be notified via pairs of
/// inject_connected / inject_disconnected as well as
/// inject_connection_established / inject_connection_closed calls.
#[test]
fn test_connect_disconnect_ban() {
// Since the test does not try to open any substreams, we can
// use the dummy protocols handler.
let mut handler_proto = DummyProtocolsHandler::default();
handler_proto.keep_alive = KeepAlive::Yes;
let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone());
let mut swarm2 = new_test_swarm::<_, ()>(handler_proto);
let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap();
Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap();
// Test execution state. Connection => Disconnecting => Connecting.
enum State {
Connecting,
Disconnecting,
}
let swarm1_id = Swarm::local_peer_id(&swarm1).clone();
let mut banned = false;
let mut unbanned = false;
let num_connections = 10;
for _ in 0 .. num_connections {
Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap();
}
let mut state = State::Connecting;
executor::block_on(future::poll_fn(move |cx| {
loop {
let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
match state {
State::Connecting => {
for s in &[&swarm1, &swarm2] {
if s.behaviour.inject_connection_established.len() > 0 {
assert_eq!(s.behaviour.inject_connected.len(), 1);
} else {
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
assert!(s.behaviour.inject_connection_closed.len() == 0);
assert!(s.behaviour.inject_disconnected.len() == 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_established.len() == num_connections
}) {
if banned {
return Poll::Ready(())
}
Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone());
swarm1.behaviour.reset();
swarm2.behaviour.reset();
banned = true;
state = State::Disconnecting;
}
}
State::Disconnecting => {
for s in &[&swarm1, &swarm2] {
if s.behaviour.inject_connection_closed.len() < num_connections {
assert_eq!(s.behaviour.inject_disconnected.len(), 0);
} else {
assert_eq!(s.behaviour.inject_disconnected.len(), 1);
}
assert_eq!(s.behaviour.inject_connection_established.len(), 0);
assert_eq!(s.behaviour.inject_connected.len(), 0);
}
if [&swarm1, &swarm2].iter().all(|s| {
s.behaviour.inject_connection_closed.len() == num_connections
}) {
if unbanned {
return Poll::Ready(())
}
// Unban the first peer and reconnect.
Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone());
swarm1.behaviour.reset();
swarm2.behaviour.reset();
unbanned = true;
for _ in 0 .. num_connections {
Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap();
}
state = State::Connecting;
}
}
}
if poll1.is_pending() && poll2.is_pending() {
return Poll::Pending
}
}
}))
}
}