protocols/ping: Don't force close conn if not supported by remote (#2149)

Don't close connection if ping protocol is unsupported by remote. Previously, a
failed protocol negotation for ping caused a force close of the connection. As a
result, all nodes in a network had to support ping. To allow networks where some
nodes don't support ping, we now emit `PingFailure::Unsupported` once for every
connection on which ping is not supported.

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Thomas Eizinger
2021-07-31 06:21:21 +10:00
committed by GitHub
parent ad90167042
commit 1e001a2e0e
6 changed files with 158 additions and 166 deletions

View File

@ -223,6 +223,12 @@ fn main() -> Result<(), Box<dyn Error>> {
} => { } => {
println!("ping: timeout to {}", peer.to_base58()); println!("ping: timeout to {}", peer.to_base58());
} }
PingEvent {
peer,
result: Result::Err(PingFailure::Unsupported),
} => {
println!("ping: {} does not support ping protocol", peer.to_base58());
}
PingEvent { PingEvent {
peer, peer,
result: Result::Err(PingFailure::Other { error }), result: Result::Err(PingFailure::Other { error }),

View File

@ -2,6 +2,19 @@
- Update dependencies. - Update dependencies.
- Don't close connection if ping protocol is unsupported by remote.
Previously, a failed protocol negotation for ping caused a force close of the connection.
As a result, all nodes in a network had to support ping.
To allow networks where some nodes don't support ping, we now emit
`PingFailure::Unsupported` once for every connection on which ping is not supported.
In case you want to stick with the old behavior, you need to close the connection
manually on `PingFailure::Unsupported`.
Fixes [#2109](https://github.com/libp2p/rust-libp2p/issues/2109). Also see [PR 2149].
[PR 2149]: https://github.com/libp2p/rust-libp2p/pull/2149/
# 0.30.0 [2021-07-12] # 0.30.0 [2021-07-12]
- Update dependencies. - Update dependencies.

View File

@ -21,6 +21,7 @@
use crate::protocol; use crate::protocol;
use futures::prelude::*; use futures::prelude::*;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use libp2p_core::{UpgradeError, upgrade::NegotiationError};
use libp2p_swarm::{ use libp2p_swarm::{
KeepAlive, KeepAlive,
NegotiatedSubstream, NegotiatedSubstream,
@ -140,6 +141,8 @@ pub enum PingFailure {
/// The ping timed out, i.e. no response was received within the /// The ping timed out, i.e. no response was received within the
/// configured ping timeout. /// configured ping timeout.
Timeout, Timeout,
/// The peer does not support the ping protocol.
Unsupported,
/// The ping failed for reasons other than a timeout. /// The ping failed for reasons other than a timeout.
Other { error: Box<dyn std::error::Error + Send + 'static> } Other { error: Box<dyn std::error::Error + Send + 'static> }
} }
@ -148,7 +151,8 @@ impl fmt::Display for PingFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
PingFailure::Timeout => f.write_str("Ping timeout"), PingFailure::Timeout => f.write_str("Ping timeout"),
PingFailure::Other { error } => write!(f, "Ping error: {}", error) PingFailure::Other { error } => write!(f, "Ping error: {}", error),
PingFailure::Unsupported => write!(f, "Ping protocol not supported"),
} }
} }
} }
@ -157,7 +161,8 @@ impl Error for PingFailure {
fn source(&self) -> Option<&(dyn Error + 'static)> { fn source(&self) -> Option<&(dyn Error + 'static)> {
match self { match self {
PingFailure::Timeout => None, PingFailure::Timeout => None,
PingFailure::Other { error } => Some(&**error) PingFailure::Other { error } => Some(&**error),
PingFailure::Unsupported => None,
} }
} }
} }
@ -184,6 +189,21 @@ pub struct PingHandler {
/// substream, this is always a future that waits for the /// substream, this is always a future that waits for the
/// next inbound ping to be answered. /// next inbound ping to be answered.
inbound: Option<PongFuture>, inbound: Option<PongFuture>,
/// Tracks the state of our handler.
state: State
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum State {
/// We are inactive because the other peer doesn't support ping.
Inactive {
/// Whether or not we've reported the missing support yet.
///
/// This is used to avoid repeated events being emitted for a specific connection.
reported: bool
},
/// We are actively pinging the other peer.
Active,
} }
impl PingHandler { impl PingHandler {
@ -196,6 +216,7 @@ impl PingHandler {
failures: 0, failures: 0,
outbound: None, outbound: None,
inbound: None, inbound: None,
state: State::Active,
} }
} }
} }
@ -226,12 +247,22 @@ impl ProtocolsHandler for PingHandler {
fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<Void>) { fn inject_dial_upgrade_error(&mut self, _info: (), error: ProtocolsHandlerUpgrErr<Void>) {
self.outbound = None; // Request a new substream on the next `poll`. self.outbound = None; // Request a new substream on the next `poll`.
self.pending_errors.push_front(
match error { let error = match error {
// Note: This timeout only covers protocol negotiation. ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
ProtocolsHandlerUpgrErr::Timeout => PingFailure::Timeout, debug_assert_eq!(self.state, State::Active);
e => PingFailure::Other { error: Box::new(e) },
}) self.state = State::Inactive {
reported: false
};
return;
},
// Note: This timeout only covers protocol negotiation.
ProtocolsHandlerUpgrErr::Timeout => PingFailure::Timeout,
e => PingFailure::Other { error: Box::new(e) },
};
self.pending_errors.push_front(error);
} }
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
@ -243,6 +274,17 @@ impl ProtocolsHandler for PingHandler {
} }
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ProtocolsHandlerEvent<protocol::Ping, (), PingResult, Self::Error>> { fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ProtocolsHandlerEvent<protocol::Ping, (), PingResult, Self::Error>> {
match self.state {
State::Inactive { reported: true } => {
return Poll::Pending // nothing to do on this connection
},
State::Inactive { reported: false } => {
self.state = State::Inactive { reported: true };
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(PingFailure::Unsupported)));
},
State::Active => {}
}
// Respond to inbound pings. // Respond to inbound pings.
if let Some(fut) = self.inbound.as_mut() { if let Some(fut) = self.inbound.as_mut() {
match fut.poll_unpin(cx) { match fut.poll_unpin(cx) {
@ -355,4 +397,3 @@ enum PingState {
/// A ping is being sent and the response awaited. /// A ping is being sent and the response awaited.
Ping(PingFuture), Ping(PingFuture),
} }

View File

@ -31,7 +31,7 @@ use libp2p_core::{
use libp2p_mplex as mplex; use libp2p_mplex as mplex;
use libp2p_noise as noise; use libp2p_noise as noise;
use libp2p_ping::*; use libp2p_ping::*;
use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_swarm::{DummyBehaviour, KeepAlive, Swarm, SwarmEvent};
use libp2p_tcp::TcpConfig; use libp2p_tcp::TcpConfig;
use libp2p_yamux as yamux; use libp2p_yamux as yamux;
use futures::{prelude::*, channel::mpsc}; use futures::{prelude::*, channel::mpsc};
@ -83,18 +83,18 @@ fn ping_pong() {
loop { loop {
match swarm2.select_next_some().await { match swarm2.select_next_some().await {
SwarmEvent::Behaviour(PingEvent { SwarmEvent::Behaviour(PingEvent {
peer, peer,
result: Ok(PingSuccess::Ping { rtt }) result: Ok(PingSuccess::Ping { rtt })
}) => { }) => {
count2 -= 1; count2 -= 1;
if count2 == 0 { if count2 == 0 {
return (pid2.clone(), peer, rtt) return (pid2.clone(), peer, rtt)
} }
}, },
SwarmEvent::Behaviour(PingEvent { SwarmEvent::Behaviour(PingEvent {
result: Err(e), result: Err(e),
.. ..
}) => panic!("Ping failure: {:?}", e), }) => panic!("Ping failure: {:?}", e),
_ => {} _ => {}
} }
@ -189,6 +189,52 @@ fn max_failures() {
QuickCheck::new().tests(10).quickcheck(prop as fn(_,_)) QuickCheck::new().tests(10).quickcheck(prop as fn(_,_))
} }
#[test]
fn unsupported_doesnt_fail() {
let (peer1_id, trans) = mk_transport(MuxerChoice::Mplex);
let mut swarm1 = Swarm::new(trans, DummyBehaviour::with_keep_alive(KeepAlive::Yes), peer1_id.clone());
let (peer2_id, trans) = mk_transport(MuxerChoice::Mplex);
let mut swarm2 = Swarm::new(trans, Ping::new(PingConfig::new().with_keep_alive(true)), peer2_id.clone());
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
swarm1.listen_on(addr).unwrap();
async_std::task::spawn(async move {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
_ => {}
}
}
});
let result = async_std::task::block_on(async move {
swarm2.dial_addr(rx.next().await.unwrap()).unwrap();
loop {
match swarm2.select_next_some().await {
SwarmEvent::Behaviour(PingEvent {
result: Err(PingFailure::Unsupported), ..
}) => {
swarm2.disconnect_peer_id(peer1_id).unwrap();
}
SwarmEvent::ConnectionClosed { cause: Some(e), .. } => {
break Err(e);
}
SwarmEvent::ConnectionClosed { cause: None, .. } => {
break Ok(());
}
_ => {}
}
}
});
result.expect("node with ping should not fail connection due to unsupported protocol");
}
fn mk_transport(muxer: MuxerChoice) -> ( fn mk_transport(muxer: MuxerChoice) -> (
PeerId, PeerId,

View File

@ -24,24 +24,18 @@ use futures::stream::{Stream, StreamExt};
use futures::task::Spawn; use futures::task::Spawn;
use libp2p::kad::record::store::MemoryStore; use libp2p::kad::record::store::MemoryStore;
use libp2p::NetworkBehaviour; use libp2p::NetworkBehaviour;
use libp2p_core::connection::{ConnectedPoint, ConnectionId}; use libp2p_core::connection::ConnectedPoint;
use libp2p_core::either::EitherTransport; use libp2p_core::either::EitherTransport;
use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::{MemoryTransport, Transport, TransportError}; use libp2p_core::transport::{MemoryTransport, Transport, TransportError};
use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
use libp2p_core::{identity, upgrade, PeerId}; use libp2p_core::{identity, upgrade, PeerId};
use libp2p_identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo}; use libp2p_identify::{Identify, IdentifyConfig, IdentifyEvent, IdentifyInfo};
use libp2p_kad::{GetClosestPeersOk, Kademlia, KademliaEvent, QueryResult}; use libp2p_kad::{GetClosestPeersOk, Kademlia, KademliaEvent, QueryResult};
use libp2p_ping::{Ping, PingConfig, PingEvent}; use libp2p_ping::{Ping, PingConfig, PingEvent};
use libp2p_plaintext::PlainText2Config; use libp2p_plaintext::PlainText2Config;
use libp2p_relay::{Relay, RelayConfig}; use libp2p_relay::{Relay, RelayConfig};
use libp2p_swarm::protocols_handler::{ use libp2p_swarm::protocols_handler::KeepAlive;
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, use libp2p_swarm::{AddressRecord, DummyBehaviour, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, Swarm, SwarmEvent};
};
use libp2p_swarm::{
AddressRecord, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction,
NetworkBehaviourEventProcess, PollParameters, Swarm, SwarmEvent,
};
use std::iter; use std::iter;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
@ -720,7 +714,7 @@ fn inactive_connection_timeout() {
let mut relay_swarm = build_keep_alive_swarm(); let mut relay_swarm = build_keep_alive_swarm();
// Connections only kept alive by Source Node and Destination Node. // Connections only kept alive by Source Node and Destination Node.
relay_swarm.behaviour_mut().keep_alive.keep_alive = KeepAlive::No; *relay_swarm.behaviour_mut().keep_alive.keep_alive_mut() = KeepAlive::No;
let relay_peer_id = *relay_swarm.local_peer_id(); let relay_peer_id = *relay_swarm.local_peer_id();
let dst_peer_id = *dst_swarm.local_peer_id(); let dst_peer_id = *dst_swarm.local_peer_id();
@ -1183,7 +1177,7 @@ impl NetworkBehaviourEventProcess<()> for CombinedBehaviour {
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
struct CombinedKeepAliveBehaviour { struct CombinedKeepAliveBehaviour {
relay: Relay, relay: Relay,
keep_alive: KeepAliveBehaviour, keep_alive: DummyBehaviour,
} }
impl NetworkBehaviourEventProcess<()> for CombinedKeepAliveBehaviour { impl NetworkBehaviourEventProcess<()> for CombinedKeepAliveBehaviour {
@ -1310,13 +1304,13 @@ fn build_keep_alive_swarm() -> Swarm<CombinedKeepAliveBehaviour> {
let combined_behaviour = CombinedKeepAliveBehaviour { let combined_behaviour = CombinedKeepAliveBehaviour {
relay: relay_behaviour, relay: relay_behaviour,
keep_alive: KeepAliveBehaviour::default(), keep_alive: DummyBehaviour::with_keep_alive(KeepAlive::Yes),
}; };
Swarm::new(transport, combined_behaviour, local_peer_id) Swarm::new(transport, combined_behaviour, local_peer_id)
} }
fn build_keep_alive_only_swarm() -> Swarm<KeepAliveBehaviour> { fn build_keep_alive_only_swarm() -> Swarm<DummyBehaviour> {
let local_key = identity::Keypair::generate_ed25519(); let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public(); let local_public_key = local_key.public();
let plaintext = PlainText2Config { let plaintext = PlainText2Config {
@ -1332,127 +1326,7 @@ fn build_keep_alive_only_swarm() -> Swarm<KeepAliveBehaviour> {
.multiplex(libp2p_yamux::YamuxConfig::default()) .multiplex(libp2p_yamux::YamuxConfig::default())
.boxed(); .boxed();
Swarm::new(transport, KeepAliveBehaviour::default(), local_peer_id) Swarm::new(transport, DummyBehaviour::with_keep_alive(KeepAlive::Yes), local_peer_id)
}
#[derive(Clone)]
pub struct KeepAliveBehaviour {
keep_alive: KeepAlive,
}
impl Default for KeepAliveBehaviour {
fn default() -> Self {
Self {
keep_alive: KeepAlive::Yes,
}
}
}
impl libp2p_swarm::NetworkBehaviour for KeepAliveBehaviour {
type ProtocolsHandler = KeepAliveProtocolsHandler;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
KeepAliveProtocolsHandler {
keep_alive: self.keep_alive,
}
}
fn inject_event(
&mut self,
_: PeerId,
_: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
void::unreachable(event);
}
fn poll(
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
Poll::Pending
}
}
/// Implementation of `ProtocolsHandler` that doesn't handle anything.
#[derive(Clone, Debug)]
pub struct KeepAliveProtocolsHandler {
pub keep_alive: KeepAlive,
}
impl ProtocolsHandler for KeepAliveProtocolsHandler {
type InEvent = Void;
type OutEvent = Void;
type Error = Void;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(DeniedUpgrade, ())
}
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo,
) {
}
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo,
) {
}
fn inject_event(&mut self, _: Self::InEvent) {}
fn inject_address_change(&mut self, _: &Multiaddr) {}
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
}
fn inject_listen_upgrade_error(
&mut self,
_: Self::InboundOpenInfo,
_: ProtocolsHandlerUpgrErr<
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
_: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Self::Error,
>,
> {
Poll::Pending
}
} }
struct DummyPollParameters {} struct DummyPollParameters {}

View File

@ -94,13 +94,7 @@ use futures::{
executor::ThreadPoolBuilder, executor::ThreadPoolBuilder,
stream::FusedStream, stream::FusedStream,
}; };
use libp2p_core::{ use libp2p_core::{Executor, Multiaddr, Negotiated, PeerId, Transport, connection::{
Executor,
Transport,
Multiaddr,
Negotiated,
PeerId,
connection::{
ConnectionError, ConnectionError,
ConnectionId, ConnectionId,
ConnectionLimit, ConnectionLimit,
@ -110,10 +104,7 @@ use libp2p_core::{
ListenerId, ListenerId,
PendingConnectionError, PendingConnectionError,
Substream Substream
}, }, muxing::StreamMuxerBox, network::{
transport::{self, TransportError},
muxing::StreamMuxerBox,
network::{
self, self,
ConnectionLimits, ConnectionLimits,
Network, Network,
@ -121,9 +112,7 @@ use libp2p_core::{
NetworkEvent, NetworkEvent,
NetworkConfig, NetworkConfig,
peer::ConnectedPeer, peer::ConnectedPeer,
}, }, transport::{self, TransportError}, upgrade::{ProtocolName}};
upgrade::{ProtocolName},
};
use registry::{Addresses, AddressIntoIter}; use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::{error, fmt, io, pin::Pin, task::{Context, Poll}}; use std::{error, fmt, io, pin::Pin, task::{Context, Poll}};
@ -1144,8 +1133,29 @@ impl error::Error for DialError {
} }
/// Dummy implementation of [`NetworkBehaviour`] that doesn't do anything. /// Dummy implementation of [`NetworkBehaviour`] that doesn't do anything.
#[derive(Clone, Default)] #[derive(Clone)]
pub struct DummyBehaviour { pub struct DummyBehaviour {
keep_alive: KeepAlive
}
impl DummyBehaviour {
pub fn with_keep_alive(keep_alive: KeepAlive) -> Self {
Self {
keep_alive
}
}
pub fn keep_alive_mut(&mut self) -> &mut KeepAlive {
&mut self.keep_alive
}
}
impl Default for DummyBehaviour {
fn default() -> Self {
Self {
keep_alive: KeepAlive::No
}
}
} }
impl NetworkBehaviour for DummyBehaviour { impl NetworkBehaviour for DummyBehaviour {
@ -1153,7 +1163,9 @@ impl NetworkBehaviour for DummyBehaviour {
type OutEvent = void::Void; type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
protocols_handler::DummyProtocolsHandler::default() protocols_handler::DummyProtocolsHandler {
keep_alive: self.keep_alive
}
} }
fn inject_event( fn inject_event(