mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 19:02:13 +00:00
Add Throttled
to libp2p-request-response. (#1696)
* Use a single exchange instead of two one_shots. * Add `Throttled` to libp2p-request-response. Wraps the existing `RequestResponse` behaviour and applies strict limits to the number of inbound and outbound requests per peer. The wrapper is opt-in and if not used, the protocol behaviour of `RequestResponse` does not change. This PR also does not introduce an extra protocol, hence the limits applied need to be known a priori for all nodes which is not always possible or desirable. As mentioned in #1687 I think that we should eventually augment the protocol with metadata which allows a more dynamic exchange of requests and responses. This PR also replaces the two oneshot channels with a single one from the scambio crate which saves one allocation per request/response. If not desirable because the crate has seen less testing the first commit could be reverted. * Fix rustdoc error. * Remove some leftovers from development. * Add docs to `NetworBehaviourAction::{map_in,map_out}`. * Apply suggestions from code review Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Add `ping_protocol_throttled` test. * Add another test. * Revert "Use a single exchange instead of two one_shots." This reverts commit e34e1297d411298f6c69e238aa6c96e0b795d989. # Conflicts: # protocols/request-response/Cargo.toml # protocols/request-response/src/handler/protocol.rs * Update CHANGELOG. * Update CHANGELOG. Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
parent
b595972961
commit
30de0b4d64
@ -1,5 +1,8 @@
|
|||||||
# 0.2.0
|
# 0.2.0 // unreleased
|
||||||
|
|
||||||
|
- Added `RequestResponse::throttled` to wrap the behaviour into one that
|
||||||
|
enforces limits on inbound and outbound requests per peer. The limits
|
||||||
|
have to be known upfront by all nodes.
|
||||||
- Bump `libp2p-core` and `libp2p-swarm` dependencies.
|
- Bump `libp2p-core` and `libp2p-swarm` dependencies.
|
||||||
|
|
||||||
# 0.1.1
|
# 0.1.1
|
||||||
|
@ -14,6 +14,9 @@ async-trait = "0.1"
|
|||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
libp2p-core = { version = "0.21.0", path = "../../core" }
|
libp2p-core = { version = "0.21.0", path = "../../core" }
|
||||||
libp2p-swarm = { version = "0.21.0", path = "../../swarm" }
|
libp2p-swarm = { version = "0.21.0", path = "../../swarm" }
|
||||||
|
log = "0.4.11"
|
||||||
|
lru = "0.6"
|
||||||
|
rand = "0.7"
|
||||||
smallvec = "1.4"
|
smallvec = "1.4"
|
||||||
wasm-timer = "0.2"
|
wasm-timer = "0.2"
|
||||||
|
|
||||||
|
@ -110,6 +110,7 @@ where
|
|||||||
|
|
||||||
/// The events emitted by the [`RequestResponseHandler`].
|
/// The events emitted by the [`RequestResponseHandler`].
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum RequestResponseHandlerEvent<TCodec>
|
pub enum RequestResponseHandlerEvent<TCodec>
|
||||||
where
|
where
|
||||||
TCodec: RequestResponseCodec
|
TCodec: RequestResponseCodec
|
||||||
|
@ -26,17 +26,9 @@
|
|||||||
use crate::RequestId;
|
use crate::RequestId;
|
||||||
use crate::codec::RequestResponseCodec;
|
use crate::codec::RequestResponseCodec;
|
||||||
|
|
||||||
use futures::{
|
use futures::{channel::oneshot, future::BoxFuture, prelude::*};
|
||||||
channel::oneshot,
|
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
|
||||||
future::BoxFuture,
|
use libp2p_swarm::NegotiatedSubstream;
|
||||||
prelude::*,
|
|
||||||
};
|
|
||||||
use libp2p_core::{
|
|
||||||
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
|
|
||||||
};
|
|
||||||
use libp2p_swarm::{
|
|
||||||
NegotiatedSubstream,
|
|
||||||
};
|
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
|
@ -70,9 +70,11 @@
|
|||||||
|
|
||||||
pub mod codec;
|
pub mod codec;
|
||||||
pub mod handler;
|
pub mod handler;
|
||||||
|
pub mod throttled;
|
||||||
|
|
||||||
pub use codec::{RequestResponseCodec, ProtocolName};
|
pub use codec::{RequestResponseCodec, ProtocolName};
|
||||||
pub use handler::ProtocolSupport;
|
pub use handler::ProtocolSupport;
|
||||||
|
pub use throttled::Throttled;
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
channel::oneshot,
|
channel::oneshot,
|
||||||
@ -309,6 +311,11 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrap this behaviour in [`Throttled`] to limit the number of concurrent requests per peer.
|
||||||
|
pub fn throttled(self) -> Throttled<TCodec> {
|
||||||
|
Throttled::new(self)
|
||||||
|
}
|
||||||
|
|
||||||
/// Initiates sending a request.
|
/// Initiates sending a request.
|
||||||
///
|
///
|
||||||
/// If the targeted peer is currently not connected, a dialing
|
/// If the targeted peer is currently not connected, a dialing
|
||||||
@ -604,4 +611,3 @@ struct Connection {
|
|||||||
id: ConnectionId,
|
id: ConnectionId,
|
||||||
address: Option<Multiaddr>,
|
address: Option<Multiaddr>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
302
protocols/request-response/src/throttled.rs
Normal file
302
protocols/request-response/src/throttled.rs
Normal file
@ -0,0 +1,302 @@
|
|||||||
|
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use crate::handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
|
||||||
|
use libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId};
|
||||||
|
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||||
|
use lru::LruCache;
|
||||||
|
use std::{collections::{HashMap, VecDeque}, task::{Context, Poll}};
|
||||||
|
use std::{cmp::min, num::NonZeroU16};
|
||||||
|
use super::{
|
||||||
|
RequestId,
|
||||||
|
RequestResponse,
|
||||||
|
RequestResponseCodec,
|
||||||
|
RequestResponseEvent,
|
||||||
|
ResponseChannel
|
||||||
|
};
|
||||||
|
|
||||||
|
/// A wrapper around [`RequestResponse`] which adds request limits per peer.
|
||||||
|
///
|
||||||
|
/// Each peer is assigned a default limit of concurrent requests and
|
||||||
|
/// responses, which can be overriden per peer.
|
||||||
|
///
|
||||||
|
/// It is not possible to send more requests than configured and receiving
|
||||||
|
/// more is reported as an error event. Since `libp2p-request-response` is
|
||||||
|
/// not its own protocol, there is no way to communicate limits to peers,
|
||||||
|
/// hence nodes must have pre-established knowledge about each other's limits.
|
||||||
|
pub struct Throttled<C: RequestResponseCodec> {
|
||||||
|
/// A random id used for logging.
|
||||||
|
id: u32,
|
||||||
|
/// The wrapped behaviour.
|
||||||
|
behaviour: RequestResponse<C>,
|
||||||
|
/// Limits per peer.
|
||||||
|
limits: HashMap<PeerId, Limit>,
|
||||||
|
/// After disconnects we keep limits around to prevent circumventing
|
||||||
|
/// them by successive reconnects.
|
||||||
|
previous: LruCache<PeerId, Limit>,
|
||||||
|
/// The default limit applied to all peers unless overriden.
|
||||||
|
default: Limit,
|
||||||
|
/// Pending events to report in `Throttled::poll`.
|
||||||
|
events: VecDeque<Event<C::Request, C::Response>>
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A `Limit` of inbound and outbound requests.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct Limit {
|
||||||
|
/// The remaining number of outbound requests that can be send.
|
||||||
|
send_budget: u16,
|
||||||
|
/// The remaining number of inbound requests that can be received.
|
||||||
|
recv_budget: u16,
|
||||||
|
/// The original limit which applies to inbound and outbound requests.
|
||||||
|
maximum: NonZeroU16
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Limit {
|
||||||
|
fn default() -> Self {
|
||||||
|
let maximum = NonZeroU16::new(1).expect("1 > 0");
|
||||||
|
Limit {
|
||||||
|
send_budget: maximum.get(),
|
||||||
|
recv_budget: maximum.get(),
|
||||||
|
maximum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A Wrapper around [`RequestResponseEvent`].
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Event<Req, Res> {
|
||||||
|
/// A regular request-response event.
|
||||||
|
Event(RequestResponseEvent<Req, Res>),
|
||||||
|
/// We received more inbound requests than allowed.
|
||||||
|
TooManyInboundRequests(PeerId),
|
||||||
|
/// When previously reaching the send limit of a peer,
|
||||||
|
/// this event is eventually emitted when sending is
|
||||||
|
/// allowed to resume.
|
||||||
|
ResumeSending(PeerId)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C: RequestResponseCodec + Clone> Throttled<C> {
|
||||||
|
/// Wrap an existing `RequestResponse` behaviour and apply send/recv limits.
|
||||||
|
pub fn new(behaviour: RequestResponse<C>) -> Self {
|
||||||
|
Throttled {
|
||||||
|
id: rand::random(),
|
||||||
|
behaviour,
|
||||||
|
limits: HashMap::new(),
|
||||||
|
previous: LruCache::new(2048),
|
||||||
|
default: Limit::default(),
|
||||||
|
events: VecDeque::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the current default limit applied to all peers.
|
||||||
|
pub fn default_limit(&self) -> u16 {
|
||||||
|
self.default.maximum.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Override the global default limit.
|
||||||
|
///
|
||||||
|
/// See [`Throttled::set_limit`] to override limits for individual peers.
|
||||||
|
pub fn set_default_limit(&mut self, limit: NonZeroU16) {
|
||||||
|
log::trace!("{:08x}: new default limit: {:?}", self.id, limit);
|
||||||
|
self.default = Limit {
|
||||||
|
send_budget: limit.get(),
|
||||||
|
recv_budget: limit.get(),
|
||||||
|
maximum: limit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Specify the send and receive limit for a single peer.
|
||||||
|
pub fn set_limit(&mut self, id: &PeerId, limit: NonZeroU16) {
|
||||||
|
log::trace!("{:08x}: new limit for {}: {:?}", self.id, id, limit);
|
||||||
|
self.previous.pop(id);
|
||||||
|
self.limits.insert(id.clone(), Limit {
|
||||||
|
send_budget: limit.get(),
|
||||||
|
recv_budget: limit.get(),
|
||||||
|
maximum: limit
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Has the limit of outbound requests been reached for the given peer?
|
||||||
|
pub fn can_send(&mut self, id: &PeerId) -> bool {
|
||||||
|
self.limits.get(id).map(|l| l.send_budget > 0).unwrap_or(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a request to a peer.
|
||||||
|
///
|
||||||
|
/// If the limit of outbound requests has been reached, the request is
|
||||||
|
/// returned. Sending more outbound requests should only be attempted
|
||||||
|
/// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`].
|
||||||
|
pub fn send_request(&mut self, id: &PeerId, req: C::Request) -> Result<RequestId, C::Request> {
|
||||||
|
log::trace!("{:08x}: sending request to {}", self.id, id);
|
||||||
|
|
||||||
|
// Getting the limit is somewhat complicated due to the connection state.
|
||||||
|
// Applications may try to send a request to a peer we have never been connected
|
||||||
|
// to, or a peer we have previously been connected to. In the first case, the
|
||||||
|
// default limit applies and in the latter, the cached limit from the previous
|
||||||
|
// connection (if still available).
|
||||||
|
let mut limit =
|
||||||
|
if let Some(limit) = self.limits.get_mut(id) {
|
||||||
|
limit
|
||||||
|
} else {
|
||||||
|
let limit = self.previous.pop(id).unwrap_or_else(|| self.default.clone());
|
||||||
|
self.limits.entry(id.clone()).or_insert(limit)
|
||||||
|
};
|
||||||
|
|
||||||
|
if limit.send_budget == 0 {
|
||||||
|
log::trace!("{:08x}: no budget to send request to {}", self.id, id);
|
||||||
|
return Err(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
limit.send_budget -= 1;
|
||||||
|
|
||||||
|
Ok(self.behaviour.send_request(id, req))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Answer an inbound request with a response.
|
||||||
|
///
|
||||||
|
/// See [`RequestResponse::send_response`] for details.
|
||||||
|
pub fn send_response(&mut self, ch: ResponseChannel<C::Response>, rs: C::Response) {
|
||||||
|
if let Some(limit) = self.limits.get_mut(&ch.peer) {
|
||||||
|
limit.recv_budget += 1;
|
||||||
|
debug_assert!(limit.recv_budget <= limit.maximum.get())
|
||||||
|
}
|
||||||
|
self.behaviour.send_response(ch, rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a known peer address.
|
||||||
|
///
|
||||||
|
/// See [`RequestResponse::add_address`] for details.
|
||||||
|
pub fn add_address(&mut self, id: &PeerId, ma: Multiaddr) {
|
||||||
|
self.behaviour.add_address(id, ma)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a previously added peer address.
|
||||||
|
///
|
||||||
|
/// See [`RequestResponse::remove_address`] for details.
|
||||||
|
pub fn remove_address(&mut self, id: &PeerId, ma: &Multiaddr) {
|
||||||
|
self.behaviour.remove_address(id, ma)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Are we connected to the given peer?
|
||||||
|
///
|
||||||
|
/// See [`RequestResponse::is_connected`] for details.
|
||||||
|
pub fn is_connected(&self, id: &PeerId) -> bool {
|
||||||
|
self.behaviour.is_connected(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Are we waiting for a response to the given request?
|
||||||
|
///
|
||||||
|
/// See [`RequestResponse::is_pending`] for details.
|
||||||
|
pub fn is_pending(&self, id: &RequestId) -> bool {
|
||||||
|
self.behaviour.is_pending(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<C> NetworkBehaviour for Throttled<C>
|
||||||
|
where
|
||||||
|
C: RequestResponseCodec + Send + Clone + 'static
|
||||||
|
{
|
||||||
|
type ProtocolsHandler = RequestResponseHandler<C>;
|
||||||
|
type OutEvent = Event<C::Request, C::Response>;
|
||||||
|
|
||||||
|
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
||||||
|
self.behaviour.new_handler()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
|
||||||
|
self.behaviour.addresses_of_peer(peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_connection_established(&mut self, p: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
|
||||||
|
self.behaviour.inject_connection_established(p, id, end)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_connection_closed(&mut self, p: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
|
||||||
|
self.behaviour.inject_connection_closed(p, id, end);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_connected(&mut self, peer: &PeerId) {
|
||||||
|
log::trace!("{:08x}: connected to {}", self.id, peer);
|
||||||
|
self.behaviour.inject_connected(peer);
|
||||||
|
// The limit may have been added by [`Throttled::send_request`] already.
|
||||||
|
if !self.limits.contains_key(peer) {
|
||||||
|
let limit = self.previous.pop(peer).unwrap_or_else(|| self.default.clone());
|
||||||
|
self.limits.insert(peer.clone(), limit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_disconnected(&mut self, peer: &PeerId) {
|
||||||
|
log::trace!("{:08x}: disconnected from {}", self.id, peer);
|
||||||
|
self.behaviour.inject_disconnected(peer);
|
||||||
|
// Save the limit in case the peer reconnects soon.
|
||||||
|
if let Some(limit) = self.limits.remove(peer) {
|
||||||
|
self.previous.put(peer.clone(), limit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_dial_failure(&mut self, peer: &PeerId) {
|
||||||
|
self.behaviour.inject_dial_failure(peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn inject_event(&mut self, p: PeerId, i: ConnectionId, e: RequestResponseHandlerEvent<C>) {
|
||||||
|
match e {
|
||||||
|
// Cases where an outbound request has been resolved.
|
||||||
|
| RequestResponseHandlerEvent::Response {..}
|
||||||
|
| RequestResponseHandlerEvent::OutboundTimeout (_)
|
||||||
|
| RequestResponseHandlerEvent::OutboundUnsupportedProtocols (_) =>
|
||||||
|
if let Some(limit) = self.limits.get_mut(&p) {
|
||||||
|
if limit.send_budget == 0 {
|
||||||
|
log::trace!("{:08x}: sending to peer {} can resume", self.id, p);
|
||||||
|
self.events.push_back(Event::ResumeSending(p.clone()))
|
||||||
|
}
|
||||||
|
limit.send_budget = min(limit.send_budget + 1, limit.maximum.get())
|
||||||
|
}
|
||||||
|
// A new inbound request.
|
||||||
|
| RequestResponseHandlerEvent::Request {..} =>
|
||||||
|
if let Some(limit) = self.limits.get_mut(&p) {
|
||||||
|
if limit.recv_budget == 0 {
|
||||||
|
log::error!("{:08x}: peer {} exceeds its budget", self.id, p);
|
||||||
|
return self.events.push_back(Event::TooManyInboundRequests(p))
|
||||||
|
}
|
||||||
|
limit.recv_budget -= 1
|
||||||
|
}
|
||||||
|
// The inbound request has expired so grant more budget to receive another one.
|
||||||
|
| RequestResponseHandlerEvent::InboundTimeout =>
|
||||||
|
if let Some(limit) = self.limits.get_mut(&p) {
|
||||||
|
limit.recv_budget = min(limit.recv_budget + 1, limit.maximum.get())
|
||||||
|
}
|
||||||
|
// Nothing to do here ...
|
||||||
|
| RequestResponseHandlerEvent::InboundUnsupportedProtocols => {}
|
||||||
|
}
|
||||||
|
self.behaviour.inject_event(p, i, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(&mut self, cx: &mut Context<'_>, p: &mut impl PollParameters)
|
||||||
|
-> Poll<NetworkBehaviourAction<RequestProtocol<C>, Self::OutEvent>>
|
||||||
|
{
|
||||||
|
if let Some(ev) = self.events.pop_front() {
|
||||||
|
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev))
|
||||||
|
} else if self.events.capacity() > super::EMPTY_QUEUE_SHRINK_THRESHOLD {
|
||||||
|
self.events.shrink_to_fit()
|
||||||
|
}
|
||||||
|
|
||||||
|
self.behaviour.poll(cx, p).map(|a| a.map_out(Event::Event))
|
||||||
|
}
|
||||||
|
}
|
@ -35,13 +35,11 @@ use libp2p_swarm::Swarm;
|
|||||||
use libp2p_tcp::TcpConfig;
|
use libp2p_tcp::TcpConfig;
|
||||||
use futures::{prelude::*, channel::mpsc};
|
use futures::{prelude::*, channel::mpsc};
|
||||||
use rand::{self, Rng};
|
use rand::{self, Rng};
|
||||||
use std::{io, iter};
|
use std::{collections::HashSet, io, iter, num::NonZeroU16};
|
||||||
|
|
||||||
/// Exercises a simple ping protocol.
|
/// Exercises a simple ping protocol.
|
||||||
#[test]
|
#[test]
|
||||||
fn ping_protocol() {
|
fn ping_protocol() {
|
||||||
let num_pings: u8 = rand::thread_rng().gen_range(1, 100);
|
|
||||||
|
|
||||||
let ping = Ping("ping".to_string().into_bytes());
|
let ping = Ping("ping".to_string().into_bytes());
|
||||||
let pong = Pong("pong".to_string().into_bytes());
|
let pong = Pong("pong".to_string().into_bytes());
|
||||||
|
|
||||||
@ -85,6 +83,8 @@ fn ping_protocol() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let num_pings: u8 = rand::thread_rng().gen_range(1, 100);
|
||||||
|
|
||||||
let peer2 = async move {
|
let peer2 = async move {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
let addr = rx.next().await.unwrap();
|
let addr = rx.next().await.unwrap();
|
||||||
@ -116,6 +116,202 @@ fn ping_protocol() {
|
|||||||
let () = async_std::task::block_on(peer2);
|
let () = async_std::task::block_on(peer2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Like `ping_protocol`, but throttling concurrent requests.
|
||||||
|
#[test]
|
||||||
|
fn ping_protocol_throttled() {
|
||||||
|
let ping = Ping("ping".to_string().into_bytes());
|
||||||
|
let pong = Pong("pong".to_string().into_bytes());
|
||||||
|
|
||||||
|
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
|
||||||
|
let cfg = RequestResponseConfig::default();
|
||||||
|
|
||||||
|
let (peer1_id, trans) = mk_transport();
|
||||||
|
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled();
|
||||||
|
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
|
||||||
|
|
||||||
|
let (peer2_id, trans) = mk_transport();
|
||||||
|
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled();
|
||||||
|
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone());
|
||||||
|
|
||||||
|
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
|
||||||
|
|
||||||
|
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
|
||||||
|
Swarm::listen_on(&mut swarm1, addr).unwrap();
|
||||||
|
|
||||||
|
let expected_ping = ping.clone();
|
||||||
|
let expected_pong = pong.clone();
|
||||||
|
|
||||||
|
let limit: u16 = rand::thread_rng().gen_range(1, 10);
|
||||||
|
swarm1.set_default_limit(NonZeroU16::new(limit).unwrap());
|
||||||
|
swarm2.set_default_limit(NonZeroU16::new(limit).unwrap());
|
||||||
|
|
||||||
|
let peer1 = async move {
|
||||||
|
while let Some(_) = swarm1.next().now_or_never() {}
|
||||||
|
|
||||||
|
let l = Swarm::listeners(&swarm1).next().unwrap();
|
||||||
|
tx.send(l.clone()).await.unwrap();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match swarm1.next().await {
|
||||||
|
throttled::Event::Event(RequestResponseEvent::Message {
|
||||||
|
peer,
|
||||||
|
message: RequestResponseMessage::Request { request, channel }
|
||||||
|
}) => {
|
||||||
|
assert_eq!(&request, &expected_ping);
|
||||||
|
assert_eq!(&peer, &peer2_id);
|
||||||
|
swarm1.send_response(channel, pong.clone());
|
||||||
|
},
|
||||||
|
e => panic!("Peer1: Unexpected event: {:?}", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let num_pings: u8 = rand::thread_rng().gen_range(1, 100);
|
||||||
|
|
||||||
|
let peer2 = async move {
|
||||||
|
let mut count = 0;
|
||||||
|
let addr = rx.next().await.unwrap();
|
||||||
|
swarm2.add_address(&peer1_id, addr.clone());
|
||||||
|
let mut blocked = false;
|
||||||
|
let mut req_ids = HashSet::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if !blocked {
|
||||||
|
while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() {
|
||||||
|
req_ids.insert(id);
|
||||||
|
}
|
||||||
|
blocked = true;
|
||||||
|
}
|
||||||
|
match swarm2.next().await {
|
||||||
|
throttled::Event::ResumeSending(peer) => {
|
||||||
|
assert_eq!(peer, peer1_id);
|
||||||
|
blocked = false
|
||||||
|
}
|
||||||
|
throttled::Event::Event(RequestResponseEvent::Message {
|
||||||
|
peer,
|
||||||
|
message: RequestResponseMessage::Response { request_id, response }
|
||||||
|
}) => {
|
||||||
|
count += 1;
|
||||||
|
assert_eq!(&response, &expected_pong);
|
||||||
|
assert_eq!(&peer, &peer1_id);
|
||||||
|
assert!(req_ids.remove(&request_id));
|
||||||
|
if count >= num_pings {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e => panic!("Peer2: Unexpected event: {:?}", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
async_std::task::spawn(Box::pin(peer1));
|
||||||
|
let () = async_std::task::block_on(peer2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ping_protocol_limit_violation() {
|
||||||
|
let ping = Ping("ping".to_string().into_bytes());
|
||||||
|
let pong = Pong("pong".to_string().into_bytes());
|
||||||
|
|
||||||
|
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
|
||||||
|
let cfg = RequestResponseConfig::default();
|
||||||
|
|
||||||
|
let (peer1_id, trans) = mk_transport();
|
||||||
|
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled();
|
||||||
|
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
|
||||||
|
|
||||||
|
let (peer2_id, trans) = mk_transport();
|
||||||
|
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled();
|
||||||
|
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone());
|
||||||
|
|
||||||
|
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
|
||||||
|
|
||||||
|
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
|
||||||
|
Swarm::listen_on(&mut swarm1, addr).unwrap();
|
||||||
|
|
||||||
|
let expected_ping = ping.clone();
|
||||||
|
let expected_pong = pong.clone();
|
||||||
|
|
||||||
|
swarm2.set_default_limit(NonZeroU16::new(2).unwrap());
|
||||||
|
|
||||||
|
let peer1 = async move {
|
||||||
|
while let Some(_) = swarm1.next().now_or_never() {}
|
||||||
|
|
||||||
|
let l = Swarm::listeners(&swarm1).next().unwrap();
|
||||||
|
tx.send(l.clone()).await.unwrap();
|
||||||
|
|
||||||
|
let mut pending_responses = Vec::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match swarm1.next().await {
|
||||||
|
throttled::Event::Event(RequestResponseEvent::Message {
|
||||||
|
peer,
|
||||||
|
message: RequestResponseMessage::Request { request, channel }
|
||||||
|
}) => {
|
||||||
|
assert_eq!(&request, &expected_ping);
|
||||||
|
assert_eq!(&peer, &peer2_id);
|
||||||
|
pending_responses.push((channel, pong.clone()));
|
||||||
|
},
|
||||||
|
throttled::Event::TooManyInboundRequests(p) => {
|
||||||
|
assert_eq!(p, peer2_id);
|
||||||
|
break
|
||||||
|
}
|
||||||
|
e => panic!("Peer1: Unexpected event: {:?}", e)
|
||||||
|
}
|
||||||
|
if pending_responses.len() >= 2 {
|
||||||
|
for (channel, pong) in pending_responses.drain(..) {
|
||||||
|
swarm1.send_response(channel, pong)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let num_pings: u8 = rand::thread_rng().gen_range(1, 100);
|
||||||
|
|
||||||
|
let peer2 = async move {
|
||||||
|
let mut count = 0;
|
||||||
|
let addr = rx.next().await.unwrap();
|
||||||
|
swarm2.add_address(&peer1_id, addr.clone());
|
||||||
|
let mut blocked = false;
|
||||||
|
let mut req_ids = HashSet::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if !blocked {
|
||||||
|
while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() {
|
||||||
|
req_ids.insert(id);
|
||||||
|
}
|
||||||
|
blocked = true;
|
||||||
|
}
|
||||||
|
match swarm2.next().await {
|
||||||
|
throttled::Event::ResumeSending(peer) => {
|
||||||
|
assert_eq!(peer, peer1_id);
|
||||||
|
blocked = false
|
||||||
|
}
|
||||||
|
throttled::Event::Event(RequestResponseEvent::Message {
|
||||||
|
peer,
|
||||||
|
message: RequestResponseMessage::Response { request_id, response }
|
||||||
|
}) => {
|
||||||
|
count += 1;
|
||||||
|
assert_eq!(&response, &expected_pong);
|
||||||
|
assert_eq!(&peer, &peer1_id);
|
||||||
|
assert!(req_ids.remove(&request_id));
|
||||||
|
if count >= num_pings {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throttled::Event::Event(RequestResponseEvent::OutboundFailure { error, .. }) => {
|
||||||
|
assert!(matches!(error, OutboundFailure::ConnectionClosed));
|
||||||
|
break
|
||||||
|
}
|
||||||
|
e => panic!("Peer2: Unexpected event: {:?}", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
async_std::task::spawn(Box::pin(peer1));
|
||||||
|
let () = async_std::task::block_on(peer2);
|
||||||
|
}
|
||||||
|
|
||||||
fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) {
|
fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) {
|
||||||
let id_keys = identity::Keypair::generate_ed25519();
|
let id_keys = identity::Keypair::generate_ed25519();
|
||||||
let peer_id = id_keys.public().into_peer_id();
|
let peer_id = id_keys.public().into_peer_id();
|
||||||
|
@ -281,6 +281,44 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<TInEvent, TOutEvent> NetworkBehaviourAction<TInEvent, TOutEvent> {
|
||||||
|
/// Map the handler event.
|
||||||
|
pub fn map_in<E>(self, f: impl FnOnce(TInEvent) -> E) -> NetworkBehaviourAction<E, TOutEvent> {
|
||||||
|
match self {
|
||||||
|
NetworkBehaviourAction::GenerateEvent(e) =>
|
||||||
|
NetworkBehaviourAction::GenerateEvent(e),
|
||||||
|
NetworkBehaviourAction::DialAddress { address } =>
|
||||||
|
NetworkBehaviourAction::DialAddress { address },
|
||||||
|
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
|
||||||
|
NetworkBehaviourAction::DialPeer { peer_id, condition },
|
||||||
|
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
||||||
|
NetworkBehaviourAction::NotifyHandler {
|
||||||
|
peer_id,
|
||||||
|
handler,
|
||||||
|
event: f(event)
|
||||||
|
},
|
||||||
|
NetworkBehaviourAction::ReportObservedAddr { address } =>
|
||||||
|
NetworkBehaviourAction::ReportObservedAddr { address }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Map the event the swarm will return.
|
||||||
|
pub fn map_out<E>(self, f: impl FnOnce(TOutEvent) -> E) -> NetworkBehaviourAction<TInEvent, E> {
|
||||||
|
match self {
|
||||||
|
NetworkBehaviourAction::GenerateEvent(e) =>
|
||||||
|
NetworkBehaviourAction::GenerateEvent(f(e)),
|
||||||
|
NetworkBehaviourAction::DialAddress { address } =>
|
||||||
|
NetworkBehaviourAction::DialAddress { address },
|
||||||
|
NetworkBehaviourAction::DialPeer { peer_id, condition } =>
|
||||||
|
NetworkBehaviourAction::DialPeer { peer_id, condition },
|
||||||
|
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event } =>
|
||||||
|
NetworkBehaviourAction::NotifyHandler { peer_id, handler, event },
|
||||||
|
NetworkBehaviourAction::ReportObservedAddr { address } =>
|
||||||
|
NetworkBehaviourAction::ReportObservedAddr { address }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The options w.r.t. which connection handlers to notify of an event.
|
/// The options w.r.t. which connection handlers to notify of an event.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum NotifyHandler {
|
pub enum NotifyHandler {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user