swarm: Split off "keep alive" functionality from DummyConnectionHandler (#2859)

Previously, the `DummyConnectionHandler` offered a "keep alive" functionality,
i.e. it allowed users to set the value of what is returned from
`ConnectionHandler::keep_alive`. This handler is primarily used in tests or
`NetworkBehaviour`s that don't open any connections (like mDNS). In all of these
cases, it is statically known whether we want to keep connections alive. As
such, this functionality is better represented by a static
`KeepAliveConnectionHandler` that always returns `KeepAlive::Yes` and a
`DummyConnectionHandler` that always returns `KeepAlive::No`.

To follow the naming conventions described in
https://github.com/libp2p/rust-libp2p/issues/2217, we introduce a top-level
`keep_alive` and `dummy` behaviour in `libp2p-swarm` that contains both the
`NetworkBehaviour` and `ConnectionHandler` implementation for either case.
This commit is contained in:
Thomas Eizinger
2022-10-06 03:50:11 +11:00
committed by GitHub
parent da0403dc45
commit bdf9209824
26 changed files with 412 additions and 319 deletions

View File

@ -23,7 +23,7 @@ use either::Either;
use libp2p_core::connection::ConnectionId;
use libp2p_core::upgrade::{self, DeniedUpgrade};
use libp2p_core::{ConnectedPoint, PeerId};
use libp2p_swarm::handler::DummyConnectionHandler;
use libp2p_swarm::dummy;
use libp2p_swarm::handler::SendWrapper;
use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler};
@ -44,7 +44,7 @@ pub enum Role {
}
impl IntoConnectionHandler for Prototype {
type Handler = Either<relayed::Handler, Either<direct::Handler, DummyConnectionHandler>>;
type Handler = Either<relayed::Handler, Either<direct::Handler, dummy::ConnectionHandler>>;
fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
match self {
@ -52,7 +52,7 @@ impl IntoConnectionHandler for Prototype {
if endpoint.is_relayed() {
Either::Left(relayed::Handler::new(endpoint.clone()))
} else {
Either::Right(Either::Right(DummyConnectionHandler::default()))
Either::Right(Either::Right(dummy::ConnectionHandler))
}
}
Self::DirectConnection {

View File

@ -31,8 +31,7 @@ use if_watch::{IfEvent, IfWatcher};
use libp2p_core::transport::ListenerId;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::{
handler::DummyConnectionHandler, ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
PollParameters,
dummy, ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use smallvec::SmallVec;
use std::collections::hash_map::{Entry, HashMap};
@ -120,11 +119,11 @@ where
T: Builder + Stream,
S: AsyncSocket,
{
type ConnectionHandler = DummyConnectionHandler;
type ConnectionHandler = dummy::ConnectionHandler;
type OutEvent = MdnsEvent;
fn new_handler(&mut self) -> Self::ConnectionHandler {
DummyConnectionHandler::default()
dummy::ConnectionHandler
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
@ -168,7 +167,7 @@ where
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, DummyConnectionHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, dummy::ConnectionHandler>> {
// Poll ifwatch.
while let Poll::Ready(event) = Pin::new(&mut self.if_watch).poll(cx) {
match event {

View File

@ -8,8 +8,11 @@
- Update to `libp2p-swarm` `v0.40.0`.
- Deprecate `Config::with_keep_alive`. See [PR 2859].
[PR 2857]: https://github.com/libp2p/rust-libp2p/pull/2857
[PR 2937]: https://github.com/libp2p/rust-libp2p/pull/2937
[PR 2859]: https://github.com/libp2p/rust-libp2p/pull/2859/
# 0.39.0

View File

@ -111,6 +111,10 @@ impl Config {
/// If the maximum number of allowed ping failures is reached, the
/// connection is always terminated as a result of [`ConnectionHandler::poll`]
/// returning an error, regardless of the keep-alive setting.
#[deprecated(
since = "0.40.0",
note = "Use `libp2p::swarm::behaviour::KeepAlive` if you need to keep connections alive unconditionally."
)]
pub fn with_keep_alive(mut self, b: bool) -> Self {
self.keep_alive = b;
self

View File

@ -30,24 +30,24 @@ use libp2p::core::{
use libp2p::mplex;
use libp2p::noise;
use libp2p::ping;
use libp2p::swarm::{DummyBehaviour, KeepAlive, Swarm, SwarmEvent};
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::tcp::{GenTcpConfig, TcpTransport};
use libp2p::yamux;
use libp2p::NetworkBehaviour;
use libp2p_swarm::keep_alive;
use quickcheck::*;
use std::{num::NonZeroU8, time::Duration};
#[test]
fn ping_pong() {
fn prop(count: NonZeroU8, muxer: MuxerChoice) {
let cfg = ping::Config::new()
.with_keep_alive(true)
.with_interval(Duration::from_millis(10));
let cfg = ping::Config::new().with_interval(Duration::from_millis(10));
let (peer1_id, trans) = mk_transport(muxer);
let mut swarm1 = Swarm::new(trans, ping::Behaviour::new(cfg.clone()), peer1_id);
let mut swarm1 = Swarm::new(trans, Behaviour::new(cfg.clone()), peer1_id);
let (peer2_id, trans) = mk_transport(muxer);
let mut swarm2 = Swarm::new(trans, ping::Behaviour::new(cfg), peer2_id);
let mut swarm2 = Swarm::new(trans, Behaviour::new(cfg), peer2_id);
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
@ -62,16 +62,19 @@ fn ping_pong() {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(ping::Event {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
peer,
result: Ok(ping::Success::Ping { rtt }),
}) => {
})) => {
count1 -= 1;
if count1 == 0 {
return (pid1, peer, rtt);
}
}
SwarmEvent::Behaviour(ping::Event { result: Err(e), .. }) => {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(e),
..
})) => {
panic!("Ping failure: {:?}", e)
}
_ => {}
@ -85,16 +88,19 @@ fn ping_pong() {
loop {
match swarm2.select_next_some().await {
SwarmEvent::Behaviour(ping::Event {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
peer,
result: Ok(ping::Success::Ping { rtt }),
}) => {
})) => {
count2 -= 1;
if count2 == 0 {
return (pid2, peer, rtt);
}
}
SwarmEvent::Behaviour(ping::Event { result: Err(e), .. }) => {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(e),
..
})) => {
panic!("Ping failure: {:?}", e)
}
_ => {}
@ -117,16 +123,15 @@ fn ping_pong() {
fn max_failures() {
fn prop(max_failures: NonZeroU8, muxer: MuxerChoice) {
let cfg = ping::Config::new()
.with_keep_alive(true)
.with_interval(Duration::from_millis(10))
.with_timeout(Duration::from_millis(0))
.with_max_failures(max_failures.into());
let (peer1_id, trans) = mk_transport(muxer);
let mut swarm1 = Swarm::new(trans, ping::Behaviour::new(cfg.clone()), peer1_id);
let mut swarm1 = Swarm::new(trans, Behaviour::new(cfg.clone()), peer1_id);
let (peer2_id, trans) = mk_transport(muxer);
let mut swarm2 = Swarm::new(trans, ping::Behaviour::new(cfg), peer2_id);
let mut swarm2 = Swarm::new(trans, Behaviour::new(cfg), peer2_id);
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
@ -139,13 +144,16 @@ fn max_failures() {
loop {
match swarm1.select_next_some().await {
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
SwarmEvent::Behaviour(ping::Event {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Ok(ping::Success::Ping { .. }),
..
}) => {
})) => {
count1 = 0; // there may be an occasional success
}
SwarmEvent::Behaviour(ping::Event { result: Err(_), .. }) => {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(_),
..
})) => {
count1 += 1;
}
SwarmEvent::ConnectionClosed { .. } => return count1,
@ -161,13 +169,16 @@ fn max_failures() {
loop {
match swarm2.select_next_some().await {
SwarmEvent::Behaviour(ping::Event {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Ok(ping::Success::Ping { .. }),
..
}) => {
})) => {
count2 = 0; // there may be an occasional success
}
SwarmEvent::Behaviour(ping::Event { result: Err(_), .. }) => {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(_),
..
})) => {
count2 += 1;
}
SwarmEvent::ConnectionClosed { .. } => return count2,
@ -187,18 +198,10 @@ fn max_failures() {
#[test]
fn unsupported_doesnt_fail() {
let (peer1_id, trans) = mk_transport(MuxerChoice::Mplex);
let mut swarm1 = Swarm::new(
trans,
DummyBehaviour::with_keep_alive(KeepAlive::Yes),
peer1_id,
);
let mut swarm1 = Swarm::new(trans, keep_alive::Behaviour, peer1_id);
let (peer2_id, trans) = mk_transport(MuxerChoice::Mplex);
let mut swarm2 = Swarm::new(
trans,
ping::Behaviour::new(ping::Config::new().with_keep_alive(true)),
peer2_id,
);
let mut swarm2 = Swarm::new(trans, Behaviour::default(), peer2_id);
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
@ -218,10 +221,10 @@ fn unsupported_doesnt_fail() {
loop {
match swarm2.select_next_some().await {
SwarmEvent::Behaviour(ping::Event {
SwarmEvent::Behaviour(BehaviourEvent::Ping(ping::Event {
result: Err(ping::Failure::Unsupported),
..
}) => {
})) => {
swarm2.disconnect_peer_id(peer1_id).unwrap();
}
SwarmEvent::ConnectionClosed { cause: Some(e), .. } => {
@ -265,3 +268,18 @@ impl Arbitrary for MuxerChoice {
*g.choose(&[MuxerChoice::Mplex, MuxerChoice::Yamux]).unwrap()
}
}
#[derive(NetworkBehaviour, Default)]
struct Behaviour {
keep_alive: keep_alive::Behaviour,
ping: ping::Behaviour,
}
impl Behaviour {
fn new(config: ping::Config) -> Self {
Self {
keep_alive: keep_alive::Behaviour,
ping: ping::Behaviour::new(config),
}
}
}

View File

@ -35,7 +35,7 @@ use futures::stream::StreamExt;
use libp2p_core::connection::{ConnectedPoint, ConnectionId};
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::dial_opts::DialOpts;
use libp2p_swarm::handler::DummyConnectionHandler;
use libp2p_swarm::dummy;
use libp2p_swarm::{
ConnectionHandlerUpgrErr, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
@ -144,7 +144,7 @@ impl NetworkBehaviour for Client {
peer_id: &PeerId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_handler: Either<handler::Handler, DummyConnectionHandler>,
_handler: Either<handler::Handler, dummy::ConnectionHandler>,
_remaining_established: usize,
) {
if !endpoint.is_relayed() {

View File

@ -31,12 +31,10 @@ use instant::Instant;
use libp2p_core::either::EitherError;
use libp2p_core::multiaddr::Protocol;
use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::handler::{
DummyConnectionHandler, InboundUpgradeSend, OutboundUpgradeSend, SendWrapper,
};
use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler,
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use log::debug;
use std::collections::{HashMap, VecDeque};
@ -125,7 +123,7 @@ impl Prototype {
}
impl IntoConnectionHandler for Prototype {
type Handler = Either<Handler, DummyConnectionHandler>;
type Handler = Either<Handler, dummy::ConnectionHandler>;
fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
if endpoint.is_relayed() {
@ -138,7 +136,7 @@ impl IntoConnectionHandler for Prototype {
}
// Deny all substreams on relayed connection.
Either::Right(DummyConnectionHandler::default())
Either::Right(dummy::ConnectionHandler)
} else {
let mut handler = Handler {
remote_peer_id: *remote_peer_id,

View File

@ -30,9 +30,8 @@ use instant::Instant;
use libp2p_core::connection::{ConnectedPoint, ConnectionId};
use libp2p_core::multiaddr::Protocol;
use libp2p_core::PeerId;
use libp2p_swarm::handler::DummyConnectionHandler;
use libp2p_swarm::{
ConnectionHandlerUpgrErr, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
dummy, ConnectionHandlerUpgrErr, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters,
};
use std::collections::{hash_map, HashMap, HashSet, VecDeque};
@ -234,7 +233,7 @@ impl NetworkBehaviour for Relay {
peer: &PeerId,
connection: &ConnectionId,
_: &ConnectedPoint,
_handler: Either<handler::Handler, DummyConnectionHandler>,
_handler: Either<handler::Handler, dummy::ConnectionHandler>,
_remaining_established: usize,
) {
if let hash_map::Entry::Occupied(mut peer) = self.reservations.entry(*peer) {
@ -283,7 +282,7 @@ impl NetworkBehaviour for Relay {
assert!(
!endpoint.is_relayed(),
"`DummyConnectionHandler` handles relayed connections. It \
"`dummy::ConnectionHandler` handles relayed connections. It \
denies all inbound substreams."
);
@ -410,7 +409,7 @@ impl NetworkBehaviour for Relay {
assert!(
!endpoint.is_relayed(),
"`DummyConnectionHandler` handles relayed connections. It \
"`dummy::ConnectionHandler` handles relayed connections. It \
denies all inbound substreams."
);

View File

@ -33,11 +33,11 @@ use instant::Instant;
use libp2p_core::connection::ConnectionId;
use libp2p_core::either::EitherError;
use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::handler::{DummyConnectionHandler, SendWrapper};
use libp2p_swarm::handler::SendWrapper;
use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler,
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
dummy, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
IntoConnectionHandler, KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use std::collections::VecDeque;
use std::fmt;
@ -342,12 +342,12 @@ pub struct Prototype {
}
impl IntoConnectionHandler for Prototype {
type Handler = Either<Handler, DummyConnectionHandler>;
type Handler = Either<Handler, dummy::ConnectionHandler>;
fn into_handler(self, _remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
if endpoint.is_relayed() {
// Deny all substreams on relayed connection.
Either::Right(DummyConnectionHandler::default())
Either::Right(dummy::ConnectionHandler)
} else {
Either::Left(Handler {
endpoint: endpoint.clone(),

View File

@ -23,10 +23,11 @@ use libp2p::core::identity;
use libp2p::core::PeerId;
use libp2p::multiaddr::Protocol;
use libp2p::ping;
use libp2p::swarm::SwarmEvent;
use libp2p::swarm::{keep_alive, SwarmEvent};
use libp2p::Swarm;
use libp2p::{development_transport, rendezvous, Multiaddr};
use std::time::Duration;
use void::Void;
const NAMESPACE: &str = "rendezvous";
@ -44,11 +45,8 @@ async fn main() {
development_transport(identity.clone()).await.unwrap(),
MyBehaviour {
rendezvous: rendezvous::client::Behaviour::new(identity.clone()),
ping: ping::Behaviour::new(
ping::Config::new()
.with_interval(Duration::from_secs(1))
.with_keep_alive(true),
),
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour,
},
PeerId::from(identity.public()),
);
@ -139,10 +137,17 @@ impl From<ping::Event> for MyEvent {
}
}
impl From<Void> for MyEvent {
fn from(event: Void) -> Self {
void::unreachable(event)
}
}
#[derive(libp2p::NetworkBehaviour)]
#[behaviour(event_process = false)]
#[behaviour(out_event = "MyEvent")]
struct MyBehaviour {
rendezvous: rendezvous::client::Behaviour,
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
}

View File

@ -23,10 +23,11 @@ use libp2p::core::identity;
use libp2p::core::PeerId;
use libp2p::identify;
use libp2p::ping;
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::swarm::{keep_alive, Swarm, SwarmEvent};
use libp2p::{development_transport, rendezvous};
use libp2p::{Multiaddr, NetworkBehaviour};
use std::time::Duration;
use void::Void;
#[tokio::main]
async fn main() {
@ -47,11 +48,8 @@ async fn main() {
identity.public(),
)),
rendezvous: rendezvous::client::Behaviour::new(identity.clone()),
ping: ping::Behaviour::new(
ping::Config::new()
.with_interval(Duration::from_secs(1))
.with_keep_alive(true),
),
ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))),
keep_alive: keep_alive::Behaviour,
},
PeerId::from(identity.public()),
);
@ -139,6 +137,12 @@ impl From<ping::Event> for MyEvent {
}
}
impl From<Void> for MyEvent {
fn from(event: Void) -> Self {
void::unreachable(event)
}
}
#[derive(NetworkBehaviour)]
#[behaviour(event_process = false)]
#[behaviour(out_event = "MyEvent")]
@ -146,4 +150,5 @@ struct MyBehaviour {
identify: identify::Behaviour,
rendezvous: rendezvous::client::Behaviour,
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
}

View File

@ -23,9 +23,10 @@ use libp2p::core::identity;
use libp2p::core::PeerId;
use libp2p::identify;
use libp2p::ping;
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::swarm::{keep_alive, Swarm, SwarmEvent};
use libp2p::NetworkBehaviour;
use libp2p::{development_transport, rendezvous};
use void::Void;
/// Examples for the rendezvous protocol:
///
@ -51,7 +52,8 @@ async fn main() {
identity.public(),
)),
rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()),
ping: ping::Behaviour::new(ping::Config::new().with_keep_alive(true)),
ping: ping::Behaviour::new(ping::Config::new()),
keep_alive: keep_alive::Behaviour,
},
PeerId::from(identity.public()),
);
@ -123,6 +125,12 @@ impl From<identify::Event> for MyEvent {
}
}
impl From<Void> for MyEvent {
fn from(event: Void) -> Self {
void::unreachable(event)
}
}
#[derive(NetworkBehaviour)]
#[behaviour(event_process = false)]
#[behaviour(out_event = "MyEvent")]
@ -130,4 +138,5 @@ struct MyBehaviour {
identify: identify::Behaviour,
rendezvous: rendezvous::server::Behaviour,
ping: ping::Behaviour,
keep_alive: keep_alive::Behaviour,
}