From 30de0b4d644dac6dbe0cf6ac8c6498ebaf04b40d Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Wed, 12 Aug 2020 16:04:54 +0200 Subject: [PATCH] Add `Throttled` to libp2p-request-response. (#1696) * Use a single exchange instead of two one_shots. * Add `Throttled` to libp2p-request-response. Wraps the existing `RequestResponse` behaviour and applies strict limits to the number of inbound and outbound requests per peer. The wrapper is opt-in and if not used, the protocol behaviour of `RequestResponse` does not change. This PR also does not introduce an extra protocol, hence the limits applied need to be known a priori for all nodes which is not always possible or desirable. As mentioned in #1687 I think that we should eventually augment the protocol with metadata which allows a more dynamic exchange of requests and responses. This PR also replaces the two oneshot channels with a single one from the scambio crate which saves one allocation per request/response. If not desirable because the crate has seen less testing the first commit could be reverted. * Fix rustdoc error. * Remove some leftovers from development. * Add docs to `NetworBehaviourAction::{map_in,map_out}`. * Apply suggestions from code review Co-authored-by: Roman Borschel * Add `ping_protocol_throttled` test. * Add another test. * Revert "Use a single exchange instead of two one_shots." This reverts commit e34e1297d411298f6c69e238aa6c96e0b795d989. # Conflicts: # protocols/request-response/Cargo.toml # protocols/request-response/src/handler/protocol.rs * Update CHANGELOG. * Update CHANGELOG. Co-authored-by: Roman Borschel --- protocols/request-response/CHANGELOG.md | 5 +- protocols/request-response/Cargo.toml | 3 + protocols/request-response/src/handler.rs | 1 + .../request-response/src/handler/protocol.rs | 14 +- protocols/request-response/src/lib.rs | 8 +- protocols/request-response/src/throttled.rs | 302 ++++++++++++++++++ protocols/request-response/tests/ping.rs | 202 +++++++++++- swarm/src/behaviour.rs | 38 +++ 8 files changed, 557 insertions(+), 16 deletions(-) create mode 100644 protocols/request-response/src/throttled.rs diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 2b5df6ac..959887d4 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,5 +1,8 @@ -# 0.2.0 +# 0.2.0 // unreleased +- Added `RequestResponse::throttled` to wrap the behaviour into one that + enforces limits on inbound and outbound requests per peer. The limits + have to be known upfront by all nodes. - Bump `libp2p-core` and `libp2p-swarm` dependencies. # 0.1.1 diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index aaf1b993..77b12a52 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -14,6 +14,9 @@ async-trait = "0.1" futures = "0.3.1" libp2p-core = { version = "0.21.0", path = "../../core" } libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +log = "0.4.11" +lru = "0.6" +rand = "0.7" smallvec = "1.4" wasm-timer = "0.2" diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 17eff4db..b78e19fc 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -110,6 +110,7 @@ where /// The events emitted by the [`RequestResponseHandler`]. #[doc(hidden)] +#[derive(Debug)] pub enum RequestResponseHandlerEvent where TCodec: RequestResponseCodec diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index 6373e90d..bbd0b80f 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -26,17 +26,9 @@ use crate::RequestId; use crate::codec::RequestResponseCodec; -use futures::{ - channel::oneshot, - future::BoxFuture, - prelude::*, -}; -use libp2p_core::{ - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, -}; -use libp2p_swarm::{ - NegotiatedSubstream, -}; +use futures::{channel::oneshot, future::BoxFuture, prelude::*}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_swarm::NegotiatedSubstream; use smallvec::SmallVec; use std::io; diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 25b69d7a..2bd7e57d 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -70,9 +70,11 @@ pub mod codec; pub mod handler; +pub mod throttled; pub use codec::{RequestResponseCodec, ProtocolName}; pub use handler::ProtocolSupport; +pub use throttled::Throttled; use futures::{ channel::oneshot, @@ -309,6 +311,11 @@ where } } + /// Wrap this behaviour in [`Throttled`] to limit the number of concurrent requests per peer. + pub fn throttled(self) -> Throttled { + Throttled::new(self) + } + /// Initiates sending a request. /// /// If the targeted peer is currently not connected, a dialing @@ -604,4 +611,3 @@ struct Connection { id: ConnectionId, address: Option, } - diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs new file mode 100644 index 00000000..990c8665 --- /dev/null +++ b/protocols/request-response/src/throttled.rs @@ -0,0 +1,302 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent}; +use libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId}; +use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use lru::LruCache; +use std::{collections::{HashMap, VecDeque}, task::{Context, Poll}}; +use std::{cmp::min, num::NonZeroU16}; +use super::{ + RequestId, + RequestResponse, + RequestResponseCodec, + RequestResponseEvent, + ResponseChannel +}; + +/// A wrapper around [`RequestResponse`] which adds request limits per peer. +/// +/// Each peer is assigned a default limit of concurrent requests and +/// responses, which can be overriden per peer. +/// +/// It is not possible to send more requests than configured and receiving +/// more is reported as an error event. Since `libp2p-request-response` is +/// not its own protocol, there is no way to communicate limits to peers, +/// hence nodes must have pre-established knowledge about each other's limits. +pub struct Throttled { + /// A random id used for logging. + id: u32, + /// The wrapped behaviour. + behaviour: RequestResponse, + /// Limits per peer. + limits: HashMap, + /// After disconnects we keep limits around to prevent circumventing + /// them by successive reconnects. + previous: LruCache, + /// The default limit applied to all peers unless overriden. + default: Limit, + /// Pending events to report in `Throttled::poll`. + events: VecDeque> +} + +/// A `Limit` of inbound and outbound requests. +#[derive(Clone, Debug)] +struct Limit { + /// The remaining number of outbound requests that can be send. + send_budget: u16, + /// The remaining number of inbound requests that can be received. + recv_budget: u16, + /// The original limit which applies to inbound and outbound requests. + maximum: NonZeroU16 +} + +impl Default for Limit { + fn default() -> Self { + let maximum = NonZeroU16::new(1).expect("1 > 0"); + Limit { + send_budget: maximum.get(), + recv_budget: maximum.get(), + maximum + } + } +} + +/// A Wrapper around [`RequestResponseEvent`]. +#[derive(Debug)] +pub enum Event { + /// A regular request-response event. + Event(RequestResponseEvent), + /// We received more inbound requests than allowed. + TooManyInboundRequests(PeerId), + /// When previously reaching the send limit of a peer, + /// this event is eventually emitted when sending is + /// allowed to resume. + ResumeSending(PeerId) +} + +impl Throttled { + /// Wrap an existing `RequestResponse` behaviour and apply send/recv limits. + pub fn new(behaviour: RequestResponse) -> Self { + Throttled { + id: rand::random(), + behaviour, + limits: HashMap::new(), + previous: LruCache::new(2048), + default: Limit::default(), + events: VecDeque::new() + } + } + + /// Get the current default limit applied to all peers. + pub fn default_limit(&self) -> u16 { + self.default.maximum.get() + } + + /// Override the global default limit. + /// + /// See [`Throttled::set_limit`] to override limits for individual peers. + pub fn set_default_limit(&mut self, limit: NonZeroU16) { + log::trace!("{:08x}: new default limit: {:?}", self.id, limit); + self.default = Limit { + send_budget: limit.get(), + recv_budget: limit.get(), + maximum: limit + } + } + + /// Specify the send and receive limit for a single peer. + pub fn set_limit(&mut self, id: &PeerId, limit: NonZeroU16) { + log::trace!("{:08x}: new limit for {}: {:?}", self.id, id, limit); + self.previous.pop(id); + self.limits.insert(id.clone(), Limit { + send_budget: limit.get(), + recv_budget: limit.get(), + maximum: limit + }); + } + + /// Has the limit of outbound requests been reached for the given peer? + pub fn can_send(&mut self, id: &PeerId) -> bool { + self.limits.get(id).map(|l| l.send_budget > 0).unwrap_or(true) + } + + /// Send a request to a peer. + /// + /// If the limit of outbound requests has been reached, the request is + /// returned. Sending more outbound requests should only be attempted + /// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`]. + pub fn send_request(&mut self, id: &PeerId, req: C::Request) -> Result { + log::trace!("{:08x}: sending request to {}", self.id, id); + + // Getting the limit is somewhat complicated due to the connection state. + // Applications may try to send a request to a peer we have never been connected + // to, or a peer we have previously been connected to. In the first case, the + // default limit applies and in the latter, the cached limit from the previous + // connection (if still available). + let mut limit = + if let Some(limit) = self.limits.get_mut(id) { + limit + } else { + let limit = self.previous.pop(id).unwrap_or_else(|| self.default.clone()); + self.limits.entry(id.clone()).or_insert(limit) + }; + + if limit.send_budget == 0 { + log::trace!("{:08x}: no budget to send request to {}", self.id, id); + return Err(req) + } + + limit.send_budget -= 1; + + Ok(self.behaviour.send_request(id, req)) + } + + /// Answer an inbound request with a response. + /// + /// See [`RequestResponse::send_response`] for details. + pub fn send_response(&mut self, ch: ResponseChannel, rs: C::Response) { + if let Some(limit) = self.limits.get_mut(&ch.peer) { + limit.recv_budget += 1; + debug_assert!(limit.recv_budget <= limit.maximum.get()) + } + self.behaviour.send_response(ch, rs) + } + + /// Add a known peer address. + /// + /// See [`RequestResponse::add_address`] for details. + pub fn add_address(&mut self, id: &PeerId, ma: Multiaddr) { + self.behaviour.add_address(id, ma) + } + + /// Remove a previously added peer address. + /// + /// See [`RequestResponse::remove_address`] for details. + pub fn remove_address(&mut self, id: &PeerId, ma: &Multiaddr) { + self.behaviour.remove_address(id, ma) + } + + /// Are we connected to the given peer? + /// + /// See [`RequestResponse::is_connected`] for details. + pub fn is_connected(&self, id: &PeerId) -> bool { + self.behaviour.is_connected(id) + } + + /// Are we waiting for a response to the given request? + /// + /// See [`RequestResponse::is_pending`] for details. + pub fn is_pending(&self, id: &RequestId) -> bool { + self.behaviour.is_pending(id) + } +} + +impl NetworkBehaviour for Throttled +where + C: RequestResponseCodec + Send + Clone + 'static +{ + type ProtocolsHandler = RequestResponseHandler; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.behaviour.new_handler() + } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + self.behaviour.addresses_of_peer(peer) + } + + fn inject_connection_established(&mut self, p: &PeerId, id: &ConnectionId, end: &ConnectedPoint) { + self.behaviour.inject_connection_established(p, id, end) + } + + fn inject_connection_closed(&mut self, p: &PeerId, id: &ConnectionId, end: &ConnectedPoint) { + self.behaviour.inject_connection_closed(p, id, end); + } + + fn inject_connected(&mut self, peer: &PeerId) { + log::trace!("{:08x}: connected to {}", self.id, peer); + self.behaviour.inject_connected(peer); + // The limit may have been added by [`Throttled::send_request`] already. + if !self.limits.contains_key(peer) { + let limit = self.previous.pop(peer).unwrap_or_else(|| self.default.clone()); + self.limits.insert(peer.clone(), limit); + } + } + + fn inject_disconnected(&mut self, peer: &PeerId) { + log::trace!("{:08x}: disconnected from {}", self.id, peer); + self.behaviour.inject_disconnected(peer); + // Save the limit in case the peer reconnects soon. + if let Some(limit) = self.limits.remove(peer) { + self.previous.put(peer.clone(), limit); + } + } + + fn inject_dial_failure(&mut self, peer: &PeerId) { + self.behaviour.inject_dial_failure(peer) + } + + fn inject_event(&mut self, p: PeerId, i: ConnectionId, e: RequestResponseHandlerEvent) { + match e { + // Cases where an outbound request has been resolved. + | RequestResponseHandlerEvent::Response {..} + | RequestResponseHandlerEvent::OutboundTimeout (_) + | RequestResponseHandlerEvent::OutboundUnsupportedProtocols (_) => + if let Some(limit) = self.limits.get_mut(&p) { + if limit.send_budget == 0 { + log::trace!("{:08x}: sending to peer {} can resume", self.id, p); + self.events.push_back(Event::ResumeSending(p.clone())) + } + limit.send_budget = min(limit.send_budget + 1, limit.maximum.get()) + } + // A new inbound request. + | RequestResponseHandlerEvent::Request {..} => + if let Some(limit) = self.limits.get_mut(&p) { + if limit.recv_budget == 0 { + log::error!("{:08x}: peer {} exceeds its budget", self.id, p); + return self.events.push_back(Event::TooManyInboundRequests(p)) + } + limit.recv_budget -= 1 + } + // The inbound request has expired so grant more budget to receive another one. + | RequestResponseHandlerEvent::InboundTimeout => + if let Some(limit) = self.limits.get_mut(&p) { + limit.recv_budget = min(limit.recv_budget + 1, limit.maximum.get()) + } + // Nothing to do here ... + | RequestResponseHandlerEvent::InboundUnsupportedProtocols => {} + } + self.behaviour.inject_event(p, i, e) + } + + fn poll(&mut self, cx: &mut Context<'_>, p: &mut impl PollParameters) + -> Poll, Self::OutEvent>> + { + if let Some(ev) = self.events.pop_front() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) + } else if self.events.capacity() > super::EMPTY_QUEUE_SHRINK_THRESHOLD { + self.events.shrink_to_fit() + } + + self.behaviour.poll(cx, p).map(|a| a.map_out(Event::Event)) + } +} diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 107a37ed..ad620898 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -35,13 +35,11 @@ use libp2p_swarm::Swarm; use libp2p_tcp::TcpConfig; use futures::{prelude::*, channel::mpsc}; use rand::{self, Rng}; -use std::{io, iter}; +use std::{collections::HashSet, io, iter, num::NonZeroU16}; /// Exercises a simple ping protocol. #[test] fn ping_protocol() { - let num_pings: u8 = rand::thread_rng().gen_range(1, 100); - let ping = Ping("ping".to_string().into_bytes()); let pong = Pong("pong".to_string().into_bytes()); @@ -85,6 +83,8 @@ fn ping_protocol() { } }; + let num_pings: u8 = rand::thread_rng().gen_range(1, 100); + let peer2 = async move { let mut count = 0; let addr = rx.next().await.unwrap(); @@ -116,6 +116,202 @@ fn ping_protocol() { let () = async_std::task::block_on(peer2); } +/// Like `ping_protocol`, but throttling concurrent requests. +#[test] +fn ping_protocol_throttled() { + let ping = Ping("ping".to_string().into_bytes()); + let pong = Pong("pong".to_string().into_bytes()); + + let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let cfg = RequestResponseConfig::default(); + + let (peer1_id, trans) = mk_transport(); + let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled(); + let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); + + let (peer2_id, trans) = mk_transport(); + let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled(); + let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); + + let (mut tx, mut rx) = mpsc::channel::(1); + + let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + Swarm::listen_on(&mut swarm1, addr).unwrap(); + + let expected_ping = ping.clone(); + let expected_pong = pong.clone(); + + let limit: u16 = rand::thread_rng().gen_range(1, 10); + swarm1.set_default_limit(NonZeroU16::new(limit).unwrap()); + swarm2.set_default_limit(NonZeroU16::new(limit).unwrap()); + + let peer1 = async move { + while let Some(_) = swarm1.next().now_or_never() {} + + let l = Swarm::listeners(&swarm1).next().unwrap(); + tx.send(l.clone()).await.unwrap(); + + loop { + match swarm1.next().await { + throttled::Event::Event(RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Request { request, channel } + }) => { + assert_eq!(&request, &expected_ping); + assert_eq!(&peer, &peer2_id); + swarm1.send_response(channel, pong.clone()); + }, + e => panic!("Peer1: Unexpected event: {:?}", e) + } + } + }; + + let num_pings: u8 = rand::thread_rng().gen_range(1, 100); + + let peer2 = async move { + let mut count = 0; + let addr = rx.next().await.unwrap(); + swarm2.add_address(&peer1_id, addr.clone()); + let mut blocked = false; + let mut req_ids = HashSet::new(); + + loop { + if !blocked { + while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { + req_ids.insert(id); + } + blocked = true; + } + match swarm2.next().await { + throttled::Event::ResumeSending(peer) => { + assert_eq!(peer, peer1_id); + blocked = false + } + throttled::Event::Event(RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Response { request_id, response } + }) => { + count += 1; + assert_eq!(&response, &expected_pong); + assert_eq!(&peer, &peer1_id); + assert!(req_ids.remove(&request_id)); + if count >= num_pings { + break + } + } + e => panic!("Peer2: Unexpected event: {:?}", e) + } + } + }; + + async_std::task::spawn(Box::pin(peer1)); + let () = async_std::task::block_on(peer2); +} + +#[test] +fn ping_protocol_limit_violation() { + let ping = Ping("ping".to_string().into_bytes()); + let pong = Pong("pong".to_string().into_bytes()); + + let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let cfg = RequestResponseConfig::default(); + + let (peer1_id, trans) = mk_transport(); + let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled(); + let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); + + let (peer2_id, trans) = mk_transport(); + let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled(); + let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); + + let (mut tx, mut rx) = mpsc::channel::(1); + + let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + Swarm::listen_on(&mut swarm1, addr).unwrap(); + + let expected_ping = ping.clone(); + let expected_pong = pong.clone(); + + swarm2.set_default_limit(NonZeroU16::new(2).unwrap()); + + let peer1 = async move { + while let Some(_) = swarm1.next().now_or_never() {} + + let l = Swarm::listeners(&swarm1).next().unwrap(); + tx.send(l.clone()).await.unwrap(); + + let mut pending_responses = Vec::new(); + + loop { + match swarm1.next().await { + throttled::Event::Event(RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Request { request, channel } + }) => { + assert_eq!(&request, &expected_ping); + assert_eq!(&peer, &peer2_id); + pending_responses.push((channel, pong.clone())); + }, + throttled::Event::TooManyInboundRequests(p) => { + assert_eq!(p, peer2_id); + break + } + e => panic!("Peer1: Unexpected event: {:?}", e) + } + if pending_responses.len() >= 2 { + for (channel, pong) in pending_responses.drain(..) { + swarm1.send_response(channel, pong) + } + } + } + }; + + let num_pings: u8 = rand::thread_rng().gen_range(1, 100); + + let peer2 = async move { + let mut count = 0; + let addr = rx.next().await.unwrap(); + swarm2.add_address(&peer1_id, addr.clone()); + let mut blocked = false; + let mut req_ids = HashSet::new(); + + loop { + if !blocked { + while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { + req_ids.insert(id); + } + blocked = true; + } + match swarm2.next().await { + throttled::Event::ResumeSending(peer) => { + assert_eq!(peer, peer1_id); + blocked = false + } + throttled::Event::Event(RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Response { request_id, response } + }) => { + count += 1; + assert_eq!(&response, &expected_pong); + assert_eq!(&peer, &peer1_id); + assert!(req_ids.remove(&request_id)); + if count >= num_pings { + break + } + } + throttled::Event::Event(RequestResponseEvent::OutboundFailure { error, .. }) => { + assert!(matches!(error, OutboundFailure::ConnectionClosed)); + break + } + e => panic!("Peer2: Unexpected event: {:?}", e) + } + } + }; + + async_std::task::spawn(Box::pin(peer1)); + let () = async_std::task::block_on(peer2); +} + fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().into_peer_id(); diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 93d2df09..8da3425e 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -281,6 +281,44 @@ pub enum NetworkBehaviourAction { }, } +impl NetworkBehaviourAction { + /// Map the handler event. + pub fn map_in(self, f: impl FnOnce(TInEvent) -> E) -> NetworkBehaviourAction { + match self { + NetworkBehaviourAction::GenerateEvent(e) => + NetworkBehaviourAction::GenerateEvent(e), + NetworkBehaviourAction::DialAddress { address } => + NetworkBehaviourAction::DialAddress { address }, + NetworkBehaviourAction::DialPeer { peer_id, condition } => + NetworkBehaviourAction::DialPeer { peer_id, condition }, + NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event: f(event) + }, + NetworkBehaviourAction::ReportObservedAddr { address } => + NetworkBehaviourAction::ReportObservedAddr { address } + } + } + + /// Map the event the swarm will return. + pub fn map_out(self, f: impl FnOnce(TOutEvent) -> E) -> NetworkBehaviourAction { + match self { + NetworkBehaviourAction::GenerateEvent(e) => + NetworkBehaviourAction::GenerateEvent(f(e)), + NetworkBehaviourAction::DialAddress { address } => + NetworkBehaviourAction::DialAddress { address }, + NetworkBehaviourAction::DialPeer { peer_id, condition } => + NetworkBehaviourAction::DialPeer { peer_id, condition }, + NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } => + NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }, + NetworkBehaviourAction::ReportObservedAddr { address } => + NetworkBehaviourAction::ReportObservedAddr { address } + } + } +} + /// The options w.r.t. which connection handlers to notify of an event. #[derive(Debug, Clone)] pub enum NotifyHandler {