mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-20 06:31:21 +00:00
protocols/request-response: Remove throttled module (#2236)
Co-authored-by: Thomas Eizinger <thomas@eizinger.io> Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
parent
3c0f5c7848
commit
f030b15be0
@ -8,7 +8,11 @@
|
|||||||
- Manually implement `Debug` for `RequestResponseHandlerEvent` and
|
- Manually implement `Debug` for `RequestResponseHandlerEvent` and
|
||||||
`RequestProtocol`. See [PR 2183].
|
`RequestProtocol`. See [PR 2183].
|
||||||
|
|
||||||
|
- Remove `RequestResponse::throttled` and the `throttled` module.
|
||||||
|
See [PR 2236].
|
||||||
|
|
||||||
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
|
[PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183
|
||||||
|
[PR 2236]: https://github.com/libp2p/rust-libp2p/pull/2236
|
||||||
|
|
||||||
# 0.12.0 [2021-07-12]
|
# 0.12.0 [2021-07-12]
|
||||||
|
|
||||||
|
@ -17,7 +17,6 @@ libp2p-core = { version = "0.30.0", path = "../../core", default-features = fals
|
|||||||
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
|
libp2p-swarm = { version = "0.31.0", path = "../../swarm" }
|
||||||
log = "0.4.11"
|
log = "0.4.11"
|
||||||
lru = "0.6"
|
lru = "0.6"
|
||||||
minicbor = { version = "0.11", features = ["std", "derive"] }
|
|
||||||
rand = "0.7"
|
rand = "0.7"
|
||||||
smallvec = "1.6.1"
|
smallvec = "1.6.1"
|
||||||
unsigned-varint = { version = "0.7", features = ["std", "futures"] }
|
unsigned-varint = { version = "0.7", features = ["std", "futures"] }
|
||||||
|
@ -58,11 +58,9 @@
|
|||||||
|
|
||||||
pub mod codec;
|
pub mod codec;
|
||||||
pub mod handler;
|
pub mod handler;
|
||||||
pub mod throttled;
|
|
||||||
|
|
||||||
pub use codec::{ProtocolName, RequestResponseCodec};
|
pub use codec::{ProtocolName, RequestResponseCodec};
|
||||||
pub use handler::ProtocolSupport;
|
pub use handler::ProtocolSupport;
|
||||||
pub use throttled::Throttled;
|
|
||||||
|
|
||||||
use futures::channel::oneshot;
|
use futures::channel::oneshot;
|
||||||
use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
|
use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
|
||||||
@ -248,11 +246,6 @@ impl<TResponse> ResponseChannel<TResponse> {
|
|||||||
pub fn is_open(&self) -> bool {
|
pub fn is_open(&self) -> bool {
|
||||||
!self.sender.is_canceled()
|
!self.sender.is_canceled()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the ID of the inbound request waiting for a response.
|
|
||||||
pub(crate) fn request_id(&self) -> RequestId {
|
|
||||||
self.request_id
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The ID of an inbound or outbound request.
|
/// The ID of an inbound or outbound request.
|
||||||
@ -369,19 +362,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a `RequestResponse` which limits requests per peer.
|
|
||||||
///
|
|
||||||
/// The behaviour is wrapped in [`Throttled`] and detects the limits
|
|
||||||
/// per peer at runtime which are then enforced.
|
|
||||||
pub fn throttled<I>(c: TCodec, protos: I, cfg: RequestResponseConfig) -> Throttled<TCodec>
|
|
||||||
where
|
|
||||||
I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
|
|
||||||
TCodec: Send,
|
|
||||||
TCodec::Protocol: Sync,
|
|
||||||
{
|
|
||||||
Throttled::new(c, protos, cfg)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Initiates sending a request.
|
/// Initiates sending a request.
|
||||||
///
|
///
|
||||||
/// If the targeted peer is currently not connected, a dialing
|
/// If the targeted peer is currently not connected, a dialing
|
||||||
|
@ -1,790 +0,0 @@
|
|||||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
|
||||||
//
|
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
||||||
// copy of this software and associated documentation files (the "Software"),
|
|
||||||
// to deal in the Software without restriction, including without limitation
|
|
||||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
||||||
// and/or sell copies of the Software, and to permit persons to whom the
|
|
||||||
// Software is furnished to do so, subject to the following conditions:
|
|
||||||
//
|
|
||||||
// The above copyright notice and this permission notice shall be included in
|
|
||||||
// all copies or substantial portions of the Software.
|
|
||||||
//
|
|
||||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
||||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
||||||
// DEALINGS IN THE SOFTWARE.
|
|
||||||
|
|
||||||
//! Limit the number of requests peers can send to each other.
|
|
||||||
//!
|
|
||||||
//! Each peer is assigned a budget for sending and a budget for receiving
|
|
||||||
//! requests. Initially a peer assumes it has a send budget of 1. When its
|
|
||||||
//! budget has been used up its remote peer will send a credit message which
|
|
||||||
//! informs it how many more requests it can send before it needs to wait for
|
|
||||||
//! the next credit message. Credit messages which error or time out are
|
|
||||||
//! retried until they have reached the peer which is assumed once a
|
|
||||||
//! corresponding ack or a new request has been received from the peer.
|
|
||||||
//!
|
|
||||||
//! The `Throttled` behaviour wraps an existing `RequestResponse` behaviour
|
|
||||||
//! and uses a codec implementation that sends ordinary requests and responses
|
|
||||||
//! as well as a special credit message to which an ack message is expected
|
|
||||||
//! as a response. It does so by putting a small CBOR encoded header in front
|
|
||||||
//! of each message the inner codec produces.
|
|
||||||
|
|
||||||
mod codec;
|
|
||||||
|
|
||||||
use super::{
|
|
||||||
ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseConfig,
|
|
||||||
RequestResponseEvent, RequestResponseMessage,
|
|
||||||
};
|
|
||||||
use crate::handler::{RequestResponseHandler, RequestResponseHandlerEvent};
|
|
||||||
use codec::{Codec, Message, ProtocolWrapper, Type};
|
|
||||||
use futures::ready;
|
|
||||||
use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
|
|
||||||
use libp2p_swarm::{
|
|
||||||
DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
|
|
||||||
};
|
|
||||||
use lru::LruCache;
|
|
||||||
use std::{cmp::max, num::NonZeroU16};
|
|
||||||
use std::{
|
|
||||||
collections::{HashMap, HashSet, VecDeque},
|
|
||||||
task::{Context, Poll},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub type ResponseChannel<R> = super::ResponseChannel<Message<R>>;
|
|
||||||
|
|
||||||
/// A wrapper around [`RequestResponse`] which adds request limits per peer.
|
|
||||||
pub struct Throttled<C>
|
|
||||||
where
|
|
||||||
C: RequestResponseCodec + Clone + Send + 'static,
|
|
||||||
C::Protocol: Sync,
|
|
||||||
{
|
|
||||||
/// A random id used for logging.
|
|
||||||
id: u32,
|
|
||||||
/// The wrapped behaviour.
|
|
||||||
behaviour: RequestResponse<Codec<C>>,
|
|
||||||
/// Information per peer.
|
|
||||||
peer_info: HashMap<PeerId, PeerInfo>,
|
|
||||||
/// Information about previously connected peers.
|
|
||||||
offline_peer_info: LruCache<PeerId, PeerInfo>,
|
|
||||||
/// The default limit applies to all peers unless overriden.
|
|
||||||
default_limit: Limit,
|
|
||||||
/// Permanent limit overrides per peer.
|
|
||||||
limit_overrides: HashMap<PeerId, Limit>,
|
|
||||||
/// Pending events to report in `Throttled::poll`.
|
|
||||||
events: VecDeque<Event<C::Request, C::Response, Message<C::Response>>>,
|
|
||||||
/// The current credit ID.
|
|
||||||
next_grant_id: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Information about a credit grant that is sent to remote peers.
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
|
||||||
struct Grant {
|
|
||||||
/// The grant ID. Used to deduplicate retransmitted credit grants.
|
|
||||||
id: GrantId,
|
|
||||||
/// The ID of the outbound credit grant message.
|
|
||||||
request: RequestId,
|
|
||||||
/// The credit given in this grant, i.e. the number of additional
|
|
||||||
/// requests the remote is allowed to send.
|
|
||||||
credit: u16,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Max. number of inbound requests that can be received.
|
|
||||||
#[derive(Clone, Copy, Debug)]
|
|
||||||
struct Limit {
|
|
||||||
/// The current receive limit.
|
|
||||||
max_recv: NonZeroU16,
|
|
||||||
/// The next receive limit which becomes active after
|
|
||||||
/// the current limit has been reached.
|
|
||||||
next_max: NonZeroU16,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Limit {
|
|
||||||
/// Create a new limit.
|
|
||||||
fn new(max: NonZeroU16) -> Self {
|
|
||||||
// The max. limit provided will be effective after the initial request
|
|
||||||
// from a peer which is always allowed has been answered. Values greater
|
|
||||||
// than 1 would prevent sending the credit grant, leading to a stalling
|
|
||||||
// sender so we must not use `max` right away.
|
|
||||||
Limit {
|
|
||||||
max_recv: NonZeroU16::new(1).expect("1 > 0"),
|
|
||||||
next_max: max,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set a new limit.
|
|
||||||
///
|
|
||||||
/// The new limit becomes effective when all current inbound
|
|
||||||
/// requests have been processed and replied to.
|
|
||||||
fn set(&mut self, next: NonZeroU16) {
|
|
||||||
self.next_max = next
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Activate the new limit.
|
|
||||||
fn switch(&mut self) -> u16 {
|
|
||||||
self.max_recv = self.next_max;
|
|
||||||
self.max_recv.get()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type GrantId = u64;
|
|
||||||
|
|
||||||
/// Information related to the current send budget with a peer.
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
struct SendBudget {
|
|
||||||
/// The last received credit grant.
|
|
||||||
grant: Option<GrantId>,
|
|
||||||
/// The remaining credit for requests to send.
|
|
||||||
remaining: u16,
|
|
||||||
/// Credit grant requests received and acknowledged where the outcome
|
|
||||||
/// of the acknowledgement (i.e. response sent) is still undetermined.
|
|
||||||
/// Used to avoid emitting events for successful (`ResponseSent`) or failed
|
|
||||||
/// acknowledgements.
|
|
||||||
received: HashSet<RequestId>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Information related to the current receive budget with a peer.
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
struct RecvBudget {
|
|
||||||
/// The grant currently given to the remote but yet to be acknowledged.
|
|
||||||
///
|
|
||||||
/// Set to `Some` when a new grant is sent to the remote, followed
|
|
||||||
/// by `None` when an acknowledgment or a request is received. The
|
|
||||||
/// latter is seen as an implicit acknowledgement.
|
|
||||||
grant: Option<Grant>,
|
|
||||||
/// The limit for new credit grants when the `remaining` credit is
|
|
||||||
/// exhausted.
|
|
||||||
limit: Limit,
|
|
||||||
/// The remaining credit for requests to receive.
|
|
||||||
remaining: u16,
|
|
||||||
/// Credit grants sent whose outcome is still undetermined.
|
|
||||||
/// Used to avoid emitting events for failed credit grants.
|
|
||||||
///
|
|
||||||
/// > **Note**: While receiving an inbound request is an implicit
|
|
||||||
/// > acknowledgement for the last sent `grant`, the outcome of
|
|
||||||
/// > the outbound request remains undetermined until a success or
|
|
||||||
/// > failure event is received for that request or the corresponding
|
|
||||||
/// > connection closes.
|
|
||||||
sent: HashSet<RequestId>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Budget information about a peer.
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
struct PeerInfo {
|
|
||||||
send_budget: SendBudget,
|
|
||||||
recv_budget: RecvBudget,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PeerInfo {
|
|
||||||
fn new(recv_limit: Limit) -> Self {
|
|
||||||
PeerInfo {
|
|
||||||
send_budget: SendBudget {
|
|
||||||
grant: None,
|
|
||||||
remaining: 1,
|
|
||||||
received: HashSet::new(),
|
|
||||||
},
|
|
||||||
recv_budget: RecvBudget {
|
|
||||||
grant: None,
|
|
||||||
limit: recv_limit,
|
|
||||||
remaining: 1,
|
|
||||||
sent: HashSet::new(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_disconnected(mut self) -> Self {
|
|
||||||
self.send_budget.received = HashSet::new();
|
|
||||||
self.send_budget.remaining = 1;
|
|
||||||
self.recv_budget.sent = HashSet::new();
|
|
||||||
self.recv_budget.remaining = max(1, self.recv_budget.remaining);
|
|
||||||
// Since we potentially reset the remaining receive budget,
|
|
||||||
// we forget about the potentially still unacknowledged last grant.
|
|
||||||
self.recv_budget.grant = None;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C> Throttled<C>
|
|
||||||
where
|
|
||||||
C: RequestResponseCodec + Send + Clone,
|
|
||||||
C::Protocol: Sync,
|
|
||||||
{
|
|
||||||
/// Create a new throttled request-response behaviour.
|
|
||||||
pub fn new<I>(c: C, protos: I, cfg: RequestResponseConfig) -> Self
|
|
||||||
where
|
|
||||||
I: IntoIterator<Item = (C::Protocol, ProtocolSupport)>,
|
|
||||||
C: Send,
|
|
||||||
C::Protocol: Sync,
|
|
||||||
{
|
|
||||||
let protos = protos
|
|
||||||
.into_iter()
|
|
||||||
.map(|(p, ps)| (ProtocolWrapper::new(b"/t/1", p), ps));
|
|
||||||
Throttled::from(RequestResponse::new(Codec::new(c, 8192), protos, cfg))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Wrap an existing `RequestResponse` behaviour and apply send/recv limits.
|
|
||||||
pub fn from(behaviour: RequestResponse<Codec<C>>) -> Self {
|
|
||||||
Throttled {
|
|
||||||
id: rand::random(),
|
|
||||||
behaviour,
|
|
||||||
peer_info: HashMap::new(),
|
|
||||||
offline_peer_info: LruCache::new(8192),
|
|
||||||
default_limit: Limit::new(NonZeroU16::new(1).expect("1 > 0")),
|
|
||||||
limit_overrides: HashMap::new(),
|
|
||||||
events: VecDeque::new(),
|
|
||||||
next_grant_id: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set the global default receive limit per peer.
|
|
||||||
pub fn set_receive_limit(&mut self, limit: NonZeroU16) {
|
|
||||||
log::trace!("{:08x}: new default limit: {:?}", self.id, limit);
|
|
||||||
self.default_limit = Limit::new(limit)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Override the receive limit of a single peer.
|
|
||||||
pub fn override_receive_limit(&mut self, p: &PeerId, limit: NonZeroU16) {
|
|
||||||
log::debug!("{:08x}: override limit for {}: {:?}", self.id, p, limit);
|
|
||||||
if let Some(info) = self.peer_info.get_mut(p) {
|
|
||||||
info.recv_budget.limit.set(limit)
|
|
||||||
} else if let Some(info) = self.offline_peer_info.get_mut(p) {
|
|
||||||
info.recv_budget.limit.set(limit)
|
|
||||||
}
|
|
||||||
self.limit_overrides.insert(*p, Limit::new(limit));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove any limit overrides for the given peer.
|
|
||||||
pub fn remove_override(&mut self, p: &PeerId) {
|
|
||||||
log::trace!("{:08x}: removing limit override for {}", self.id, p);
|
|
||||||
self.limit_overrides.remove(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Has the limit of outbound requests been reached for the given peer?
|
|
||||||
pub fn can_send(&mut self, p: &PeerId) -> bool {
|
|
||||||
self.peer_info
|
|
||||||
.get(p)
|
|
||||||
.map(|i| i.send_budget.remaining > 0)
|
|
||||||
.unwrap_or(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send a request to a peer.
|
|
||||||
///
|
|
||||||
/// If the limit of outbound requests has been reached, the request is
|
|
||||||
/// returned. Sending more outbound requests should only be attempted
|
|
||||||
/// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`].
|
|
||||||
pub fn send_request(&mut self, p: &PeerId, req: C::Request) -> Result<RequestId, C::Request> {
|
|
||||||
let connected = &mut self.peer_info;
|
|
||||||
let disconnected = &mut self.offline_peer_info;
|
|
||||||
let remaining = if let Some(info) = connected.get_mut(p).or_else(|| disconnected.get_mut(p))
|
|
||||||
{
|
|
||||||
if info.send_budget.remaining == 0 {
|
|
||||||
log::trace!(
|
|
||||||
"{:08x}: no more budget to send another request to {}",
|
|
||||||
self.id,
|
|
||||||
p
|
|
||||||
);
|
|
||||||
return Err(req);
|
|
||||||
}
|
|
||||||
info.send_budget.remaining -= 1;
|
|
||||||
info.send_budget.remaining
|
|
||||||
} else {
|
|
||||||
let limit = self
|
|
||||||
.limit_overrides
|
|
||||||
.get(p)
|
|
||||||
.copied()
|
|
||||||
.unwrap_or(self.default_limit);
|
|
||||||
let mut info = PeerInfo::new(limit);
|
|
||||||
info.send_budget.remaining -= 1;
|
|
||||||
let remaining = info.send_budget.remaining;
|
|
||||||
self.offline_peer_info.put(*p, info);
|
|
||||||
remaining
|
|
||||||
};
|
|
||||||
|
|
||||||
let rid = self.behaviour.send_request(p, Message::request(req));
|
|
||||||
|
|
||||||
log::trace! { "{:08x}: sending request {} to {} (budget remaining = {})",
|
|
||||||
self.id,
|
|
||||||
rid,
|
|
||||||
p,
|
|
||||||
remaining
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(rid)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Answer an inbound request with a response.
|
|
||||||
///
|
|
||||||
/// See [`RequestResponse::send_response`] for details.
|
|
||||||
pub fn send_response(
|
|
||||||
&mut self,
|
|
||||||
ch: ResponseChannel<C::Response>,
|
|
||||||
res: C::Response,
|
|
||||||
) -> Result<(), C::Response> {
|
|
||||||
log::trace!(
|
|
||||||
"{:08x}: sending response {} to peer {}",
|
|
||||||
self.id,
|
|
||||||
ch.request_id(),
|
|
||||||
&ch.peer
|
|
||||||
);
|
|
||||||
if let Some(info) = self.peer_info.get_mut(&ch.peer) {
|
|
||||||
if info.recv_budget.remaining == 0 {
|
|
||||||
// need to send more credit to the remote peer
|
|
||||||
let crd = info.recv_budget.limit.switch();
|
|
||||||
info.recv_budget.remaining = info.recv_budget.limit.max_recv.get();
|
|
||||||
self.send_credit(&ch.peer, crd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
match self.behaviour.send_response(ch, Message::response(res)) {
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(m) => Err(m.into_parts().1.expect("Missing response data.")),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add a known peer address.
|
|
||||||
///
|
|
||||||
/// See [`RequestResponse::add_address`] for details.
|
|
||||||
pub fn add_address(&mut self, p: &PeerId, a: Multiaddr) {
|
|
||||||
self.behaviour.add_address(p, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove a previously added peer address.
|
|
||||||
///
|
|
||||||
/// See [`RequestResponse::remove_address`] for details.
|
|
||||||
pub fn remove_address(&mut self, p: &PeerId, a: &Multiaddr) {
|
|
||||||
self.behaviour.remove_address(p, a)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Are we connected to the given peer?
|
|
||||||
///
|
|
||||||
/// See [`RequestResponse::is_connected`] for details.
|
|
||||||
pub fn is_connected(&self, p: &PeerId) -> bool {
|
|
||||||
self.behaviour.is_connected(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Are we waiting for a response to the given request?
|
|
||||||
///
|
|
||||||
/// See [`RequestResponse::is_pending_outbound`] for details.
|
|
||||||
pub fn is_pending_outbound(&self, p: &PeerId, r: &RequestId) -> bool {
|
|
||||||
self.behaviour.is_pending_outbound(p, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Is the remote waiting for the local node to respond to the given
|
|
||||||
/// request?
|
|
||||||
///
|
|
||||||
/// See [`RequestResponse::is_pending_inbound`] for details.
|
|
||||||
pub fn is_pending_inbound(&self, p: &PeerId, r: &RequestId) -> bool {
|
|
||||||
self.behaviour.is_pending_inbound(p, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send a credit grant to the given peer.
|
|
||||||
fn send_credit(&mut self, p: &PeerId, credit: u16) {
|
|
||||||
if let Some(info) = self.peer_info.get_mut(p) {
|
|
||||||
let cid = self.next_grant_id;
|
|
||||||
self.next_grant_id += 1;
|
|
||||||
let rid = self.behaviour.send_request(p, Message::credit(credit, cid));
|
|
||||||
log::trace!(
|
|
||||||
"{:08x}: sending {} credit as grant {} to {}",
|
|
||||||
self.id,
|
|
||||||
credit,
|
|
||||||
cid,
|
|
||||||
p
|
|
||||||
);
|
|
||||||
let grant = Grant {
|
|
||||||
id: cid,
|
|
||||||
request: rid,
|
|
||||||
credit,
|
|
||||||
};
|
|
||||||
info.recv_budget.grant = Some(grant);
|
|
||||||
info.recv_budget.sent.insert(rid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A Wrapper around [`RequestResponseEvent`].
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Event<Req, Res, CRes = Res> {
|
|
||||||
/// A regular request-response event.
|
|
||||||
Event(RequestResponseEvent<Req, Res, CRes>),
|
|
||||||
/// We received more inbound requests than allowed.
|
|
||||||
TooManyInboundRequests(PeerId),
|
|
||||||
/// When previously reaching the send limit of a peer,
|
|
||||||
/// this event is eventually emitted when sending is
|
|
||||||
/// allowed to resume.
|
|
||||||
ResumeSending(PeerId),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C> NetworkBehaviour for Throttled<C>
|
|
||||||
where
|
|
||||||
C: RequestResponseCodec + Send + Clone + 'static,
|
|
||||||
C::Protocol: Sync,
|
|
||||||
{
|
|
||||||
type ProtocolsHandler = RequestResponseHandler<Codec<C>>;
|
|
||||||
type OutEvent = Event<C::Request, C::Response, Message<C::Response>>;
|
|
||||||
|
|
||||||
fn new_handler(&mut self) -> Self::ProtocolsHandler {
|
|
||||||
self.behaviour.new_handler()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn addresses_of_peer(&mut self, p: &PeerId) -> Vec<Multiaddr> {
|
|
||||||
self.behaviour.addresses_of_peer(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn inject_connection_established(
|
|
||||||
&mut self,
|
|
||||||
p: &PeerId,
|
|
||||||
id: &ConnectionId,
|
|
||||||
end: &ConnectedPoint,
|
|
||||||
) {
|
|
||||||
self.behaviour.inject_connection_established(p, id, end)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn inject_connection_closed(
|
|
||||||
&mut self,
|
|
||||||
peer: &PeerId,
|
|
||||||
id: &ConnectionId,
|
|
||||||
end: &ConnectedPoint,
|
|
||||||
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
|
|
||||||
) {
|
|
||||||
self.behaviour
|
|
||||||
.inject_connection_closed(peer, id, end, handler);
|
|
||||||
if let Some(info) = self.peer_info.get_mut(peer) {
|
|
||||||
if let Some(grant) = &mut info.recv_budget.grant {
|
|
||||||
log::debug! { "{:08x}: resending credit grant {} to {} after connection closed",
|
|
||||||
self.id,
|
|
||||||
grant.id,
|
|
||||||
peer
|
|
||||||
};
|
|
||||||
let msg = Message::credit(grant.credit, grant.id);
|
|
||||||
grant.request = self.behaviour.send_request(peer, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn inject_connected(&mut self, p: &PeerId) {
|
|
||||||
log::trace!("{:08x}: connected to {}", self.id, p);
|
|
||||||
self.behaviour.inject_connected(p);
|
|
||||||
// The limit may have been added by `Throttled::send_request` already.
|
|
||||||
if !self.peer_info.contains_key(p) {
|
|
||||||
if let Some(info) = self.offline_peer_info.pop(p) {
|
|
||||||
let recv_budget = info.recv_budget.remaining;
|
|
||||||
self.peer_info.insert(*p, info);
|
|
||||||
if recv_budget > 1 {
|
|
||||||
self.send_credit(p, recv_budget - 1);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let limit = self
|
|
||||||
.limit_overrides
|
|
||||||
.get(p)
|
|
||||||
.copied()
|
|
||||||
.unwrap_or(self.default_limit);
|
|
||||||
self.peer_info.insert(*p, PeerInfo::new(limit));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn inject_disconnected(&mut self, p: &PeerId) {
|
|
||||||
log::trace!("{:08x}: disconnected from {}", self.id, p);
|
|
||||||
if let Some(info) = self.peer_info.remove(p) {
|
|
||||||
self.offline_peer_info.put(*p, info.into_disconnected());
|
|
||||||
}
|
|
||||||
self.behaviour.inject_disconnected(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn inject_dial_failure(
|
|
||||||
&mut self,
|
|
||||||
p: &PeerId,
|
|
||||||
handler: Self::ProtocolsHandler,
|
|
||||||
error: DialError,
|
|
||||||
) {
|
|
||||||
self.behaviour.inject_dial_failure(p, handler, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn inject_event(
|
|
||||||
&mut self,
|
|
||||||
p: PeerId,
|
|
||||||
i: ConnectionId,
|
|
||||||
e: RequestResponseHandlerEvent<Codec<C>>,
|
|
||||||
) {
|
|
||||||
self.behaviour.inject_event(p, i, e)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn poll(
|
|
||||||
&mut self,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
params: &mut impl PollParameters,
|
|
||||||
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
|
|
||||||
loop {
|
|
||||||
if let Some(ev) = self.events.pop_front() {
|
|
||||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
|
||||||
} else if self.events.capacity() > super::EMPTY_QUEUE_SHRINK_THRESHOLD {
|
|
||||||
self.events.shrink_to_fit()
|
|
||||||
}
|
|
||||||
|
|
||||||
let event = match ready!(self.behaviour.poll(cx, params)) {
|
|
||||||
NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::Message {
|
|
||||||
peer,
|
|
||||||
message,
|
|
||||||
}) => {
|
|
||||||
let message = match message {
|
|
||||||
RequestResponseMessage::Response {
|
|
||||||
request_id,
|
|
||||||
response,
|
|
||||||
} => match &response.header().typ {
|
|
||||||
Some(Type::Ack) => {
|
|
||||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
|
||||||
if let Some(id) = info.recv_budget.grant.as_ref().map(|c| c.id)
|
|
||||||
{
|
|
||||||
if Some(id) == response.header().ident {
|
|
||||||
log::trace!(
|
|
||||||
"{:08x}: received ack {} from {}",
|
|
||||||
self.id,
|
|
||||||
id,
|
|
||||||
peer
|
|
||||||
);
|
|
||||||
info.recv_budget.grant = None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
info.recv_budget.sent.remove(&request_id);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Some(Type::Response) => {
|
|
||||||
log::trace!(
|
|
||||||
"{:08x}: received response {} from {}",
|
|
||||||
self.id,
|
|
||||||
request_id,
|
|
||||||
peer
|
|
||||||
);
|
|
||||||
if let Some(rs) = response.into_parts().1 {
|
|
||||||
RequestResponseMessage::Response {
|
|
||||||
request_id,
|
|
||||||
response: rs,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log::error! { "{:08x}: missing data for response {} from peer {}",
|
|
||||||
self.id,
|
|
||||||
request_id,
|
|
||||||
peer
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ty => {
|
|
||||||
log::trace! {
|
|
||||||
"{:08x}: unknown message type: {:?} from {}; expected response or credit",
|
|
||||||
self.id,
|
|
||||||
ty,
|
|
||||||
peer
|
|
||||||
};
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
RequestResponseMessage::Request {
|
|
||||||
request_id,
|
|
||||||
request,
|
|
||||||
channel,
|
|
||||||
} => match &request.header().typ {
|
|
||||||
Some(Type::Credit) => {
|
|
||||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
|
||||||
let id = if let Some(n) = request.header().ident {
|
|
||||||
n
|
|
||||||
} else {
|
|
||||||
log::warn! { "{:08x}: missing credit id in message from {}",
|
|
||||||
self.id,
|
|
||||||
peer
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let credit = request.header().credit.unwrap_or(0);
|
|
||||||
log::trace! { "{:08x}: received {} additional credit {} from {}",
|
|
||||||
self.id,
|
|
||||||
credit,
|
|
||||||
id,
|
|
||||||
peer
|
|
||||||
};
|
|
||||||
if info.send_budget.grant < Some(id) {
|
|
||||||
if info.send_budget.remaining == 0 && credit > 0 {
|
|
||||||
log::trace!(
|
|
||||||
"{:08x}: sending to peer {} can resume",
|
|
||||||
self.id,
|
|
||||||
peer
|
|
||||||
);
|
|
||||||
self.events.push_back(Event::ResumeSending(peer))
|
|
||||||
}
|
|
||||||
info.send_budget.remaining += credit;
|
|
||||||
info.send_budget.grant = Some(id);
|
|
||||||
}
|
|
||||||
// Note: Failing to send a response to a credit grant is
|
|
||||||
// handled along with other inbound failures further below.
|
|
||||||
let _ = self.behaviour.send_response(channel, Message::ack(id));
|
|
||||||
info.send_budget.received.insert(request_id);
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Some(Type::Request) => {
|
|
||||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
|
||||||
log::trace! { "{:08x}: received request {} (recv. budget = {})",
|
|
||||||
self.id,
|
|
||||||
request_id,
|
|
||||||
info.recv_budget.remaining
|
|
||||||
};
|
|
||||||
if info.recv_budget.remaining == 0 {
|
|
||||||
log::debug!(
|
|
||||||
"{:08x}: peer {} exceeds its budget",
|
|
||||||
self.id,
|
|
||||||
peer
|
|
||||||
);
|
|
||||||
self.events.push_back(Event::TooManyInboundRequests(peer));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
info.recv_budget.remaining -= 1;
|
|
||||||
// We consider a request as proof that our credit grant has
|
|
||||||
// reached the peer. Usually, an ACK has already been
|
|
||||||
// received.
|
|
||||||
info.recv_budget.grant = None;
|
|
||||||
}
|
|
||||||
if let Some(rq) = request.into_parts().1 {
|
|
||||||
RequestResponseMessage::Request {
|
|
||||||
request_id,
|
|
||||||
request: rq,
|
|
||||||
channel,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log::error! { "{:08x}: missing data for request {} from peer {}",
|
|
||||||
self.id,
|
|
||||||
request_id,
|
|
||||||
peer
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ty => {
|
|
||||||
log::trace! {
|
|
||||||
"{:08x}: unknown message type: {:?} from {}; expected request or ack",
|
|
||||||
self.id,
|
|
||||||
ty,
|
|
||||||
peer
|
|
||||||
};
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
let event = RequestResponseEvent::Message { peer, message };
|
|
||||||
NetworkBehaviourAction::GenerateEvent(Event::Event(event))
|
|
||||||
}
|
|
||||||
NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::OutboundFailure {
|
|
||||||
peer,
|
|
||||||
request_id,
|
|
||||||
error,
|
|
||||||
}) => {
|
|
||||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
|
||||||
if let Some(grant) = info.recv_budget.grant.as_mut() {
|
|
||||||
if grant.request == request_id {
|
|
||||||
log::debug! {
|
|
||||||
"{:08x}: failed to send {} as credit {} to {}; retrying...",
|
|
||||||
self.id,
|
|
||||||
grant.credit,
|
|
||||||
grant.id,
|
|
||||||
peer
|
|
||||||
};
|
|
||||||
let msg = Message::credit(grant.credit, grant.id);
|
|
||||||
grant.request = self.behaviour.send_request(&peer, msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the outbound failure was for a credit message, don't report it on
|
|
||||||
// the public API and retry the sending.
|
|
||||||
if info.recv_budget.sent.remove(&request_id) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let event = RequestResponseEvent::OutboundFailure {
|
|
||||||
peer,
|
|
||||||
request_id,
|
|
||||||
error,
|
|
||||||
};
|
|
||||||
NetworkBehaviourAction::GenerateEvent(Event::Event(event))
|
|
||||||
}
|
|
||||||
NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::InboundFailure {
|
|
||||||
peer,
|
|
||||||
request_id,
|
|
||||||
error,
|
|
||||||
}) => {
|
|
||||||
// If the inbound failure occurred in the context of responding to a
|
|
||||||
// credit grant, don't report it on the public API.
|
|
||||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
|
||||||
if info.send_budget.received.remove(&request_id) {
|
|
||||||
log::debug! {
|
|
||||||
"{:08}: failed to acknowledge credit grant from {}: {:?}",
|
|
||||||
self.id, peer, error
|
|
||||||
};
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let event = RequestResponseEvent::InboundFailure {
|
|
||||||
peer,
|
|
||||||
request_id,
|
|
||||||
error,
|
|
||||||
};
|
|
||||||
NetworkBehaviourAction::GenerateEvent(Event::Event(event))
|
|
||||||
}
|
|
||||||
NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::ResponseSent {
|
|
||||||
peer,
|
|
||||||
request_id,
|
|
||||||
}) => {
|
|
||||||
// If this event is for an ACK response that was sent for
|
|
||||||
// the last received credit grant, skip it.
|
|
||||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
|
||||||
if info.send_budget.received.remove(&request_id) {
|
|
||||||
log::trace! {
|
|
||||||
"{:08}: successfully sent ACK for credit grant {:?}.",
|
|
||||||
self.id,
|
|
||||||
info.send_budget.grant,
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
NetworkBehaviourAction::GenerateEvent(Event::Event(
|
|
||||||
RequestResponseEvent::ResponseSent { peer, request_id },
|
|
||||||
))
|
|
||||||
}
|
|
||||||
NetworkBehaviourAction::DialAddress { address, handler } => {
|
|
||||||
NetworkBehaviourAction::DialAddress { address, handler }
|
|
||||||
}
|
|
||||||
NetworkBehaviourAction::DialPeer {
|
|
||||||
peer_id,
|
|
||||||
condition,
|
|
||||||
handler,
|
|
||||||
} => NetworkBehaviourAction::DialPeer {
|
|
||||||
peer_id,
|
|
||||||
condition,
|
|
||||||
handler,
|
|
||||||
},
|
|
||||||
NetworkBehaviourAction::NotifyHandler {
|
|
||||||
peer_id,
|
|
||||||
handler,
|
|
||||||
event,
|
|
||||||
} => NetworkBehaviourAction::NotifyHandler {
|
|
||||||
peer_id,
|
|
||||||
handler,
|
|
||||||
event,
|
|
||||||
},
|
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
|
|
||||||
NetworkBehaviourAction::ReportObservedAddr { address, score }
|
|
||||||
}
|
|
||||||
NetworkBehaviourAction::CloseConnection {
|
|
||||||
peer_id,
|
|
||||||
connection,
|
|
||||||
} => NetworkBehaviourAction::CloseConnection {
|
|
||||||
peer_id,
|
|
||||||
connection,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
return Poll::Ready(event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,302 +0,0 @@
|
|||||||
// Copyright 2020 Parity Technologies (UK) Ltd.
|
|
||||||
//
|
|
||||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
||||||
// copy of this software and associated documentation files (the "Software"),
|
|
||||||
// to deal in the Software without restriction, including without limitation
|
|
||||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
||||||
// and/or sell copies of the Software, and to permit persons to whom the
|
|
||||||
// Software is furnished to do so, subject to the following conditions:
|
|
||||||
//
|
|
||||||
// The above copyright notice and this permission notice shall be included in
|
|
||||||
// all copies or substantial portions of the Software.
|
|
||||||
//
|
|
||||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
||||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
||||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
||||||
// DEALINGS IN THE SOFTWARE.
|
|
||||||
|
|
||||||
use super::RequestResponseCodec;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::{Bytes, BytesMut};
|
|
||||||
use futures::prelude::*;
|
|
||||||
use libp2p_core::ProtocolName;
|
|
||||||
use minicbor::{Decode, Encode};
|
|
||||||
use std::io;
|
|
||||||
use unsigned_varint::{aio, io::ReadError};
|
|
||||||
|
|
||||||
/// A protocol header.
|
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Encode, Decode)]
|
|
||||||
#[cbor(map)]
|
|
||||||
pub struct Header {
|
|
||||||
/// The type of message.
|
|
||||||
#[n(0)]
|
|
||||||
pub typ: Option<Type>,
|
|
||||||
/// The number of additional requests the remote is willing to receive.
|
|
||||||
#[n(1)]
|
|
||||||
pub credit: Option<u16>,
|
|
||||||
/// An identifier used for sending credit grants.
|
|
||||||
#[n(2)]
|
|
||||||
pub ident: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A protocol message type.
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
|
||||||
pub enum Type {
|
|
||||||
#[n(0)]
|
|
||||||
Request,
|
|
||||||
#[n(1)]
|
|
||||||
Response,
|
|
||||||
#[n(2)]
|
|
||||||
Credit,
|
|
||||||
#[n(3)]
|
|
||||||
Ack,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A protocol message consisting of header and data.
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
||||||
pub struct Message<T> {
|
|
||||||
header: Header,
|
|
||||||
data: Option<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Message<T> {
|
|
||||||
/// Create a new message of some type.
|
|
||||||
fn new(header: Header) -> Self {
|
|
||||||
Message { header, data: None }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a request message.
|
|
||||||
pub fn request(data: T) -> Self {
|
|
||||||
let mut m = Message::new(Header {
|
|
||||||
typ: Some(Type::Request),
|
|
||||||
..Header::default()
|
|
||||||
});
|
|
||||||
m.data = Some(data);
|
|
||||||
m
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a response message.
|
|
||||||
pub fn response(data: T) -> Self {
|
|
||||||
let mut m = Message::new(Header {
|
|
||||||
typ: Some(Type::Response),
|
|
||||||
..Header::default()
|
|
||||||
});
|
|
||||||
m.data = Some(data);
|
|
||||||
m
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a credit grant.
|
|
||||||
pub fn credit(credit: u16, ident: u64) -> Self {
|
|
||||||
Message::new(Header {
|
|
||||||
typ: Some(Type::Credit),
|
|
||||||
credit: Some(credit),
|
|
||||||
ident: Some(ident),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create an acknowledge message.
|
|
||||||
pub fn ack(ident: u64) -> Self {
|
|
||||||
Message::new(Header {
|
|
||||||
typ: Some(Type::Ack),
|
|
||||||
credit: None,
|
|
||||||
ident: Some(ident),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Access the message header.
|
|
||||||
pub fn header(&self) -> &Header {
|
|
||||||
&self.header
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Access the message data.
|
|
||||||
pub fn data(&self) -> Option<&T> {
|
|
||||||
self.data.as_ref()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Consume this message and return header and data.
|
|
||||||
pub fn into_parts(self) -> (Header, Option<T>) {
|
|
||||||
(self.header, self.data)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A wrapper around a `ProtocolName` impl which augments the protocol name.
|
|
||||||
///
|
|
||||||
/// The type implements `ProtocolName` itself and creates a name for a
|
|
||||||
/// request-response protocol based on the protocol name of the wrapped type.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct ProtocolWrapper<P>(P, Bytes);
|
|
||||||
|
|
||||||
impl<P: ProtocolName> ProtocolWrapper<P> {
|
|
||||||
pub fn new(prefix: &[u8], p: P) -> Self {
|
|
||||||
let mut full = BytesMut::from(prefix);
|
|
||||||
full.extend_from_slice(p.protocol_name());
|
|
||||||
ProtocolWrapper(p, full.freeze())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<P> ProtocolName for ProtocolWrapper<P> {
|
|
||||||
fn protocol_name(&self) -> &[u8] {
|
|
||||||
self.1.as_ref()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A `RequestResponseCodec` wrapper that adds headers to the payload data.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Codec<C> {
|
|
||||||
/// The wrapped codec.
|
|
||||||
inner: C,
|
|
||||||
/// Encoding/decoding buffer.
|
|
||||||
buffer: Vec<u8>,
|
|
||||||
/// Max. header length.
|
|
||||||
max_header_len: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C> Codec<C> {
|
|
||||||
/// Create a codec by wrapping an existing one.
|
|
||||||
pub fn new(c: C, max_header_len: u32) -> Self {
|
|
||||||
Codec {
|
|
||||||
inner: c,
|
|
||||||
buffer: Vec::new(),
|
|
||||||
max_header_len,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read and decode a request header.
|
|
||||||
async fn read_header<T, H>(&mut self, io: &mut T) -> io::Result<H>
|
|
||||||
where
|
|
||||||
T: AsyncRead + Unpin + Send,
|
|
||||||
H: for<'a> minicbor::Decode<'a>,
|
|
||||||
{
|
|
||||||
let header_len = aio::read_u32(&mut *io).await.map_err(|e| match e {
|
|
||||||
ReadError::Io(e) => e,
|
|
||||||
other => io::Error::new(io::ErrorKind::Other, other),
|
|
||||||
})?;
|
|
||||||
if header_len > self.max_header_len {
|
|
||||||
return Err(io::Error::new(
|
|
||||||
io::ErrorKind::InvalidData,
|
|
||||||
"header too large to read",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
self.buffer.resize(u32_to_usize(header_len), 0u8);
|
|
||||||
io.read_exact(&mut self.buffer).await?;
|
|
||||||
minicbor::decode(&self.buffer).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Encode and write a response header.
|
|
||||||
async fn write_header<T, H>(&mut self, hdr: &H, io: &mut T) -> io::Result<()>
|
|
||||||
where
|
|
||||||
T: AsyncWrite + Unpin + Send,
|
|
||||||
H: minicbor::Encode,
|
|
||||||
{
|
|
||||||
self.buffer.clear();
|
|
||||||
minicbor::encode(hdr, &mut self.buffer)
|
|
||||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
|
||||||
if self.buffer.len() > u32_to_usize(self.max_header_len) {
|
|
||||||
return Err(io::Error::new(
|
|
||||||
io::ErrorKind::InvalidData,
|
|
||||||
"header too large to write",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
let mut b = unsigned_varint::encode::u32_buffer();
|
|
||||||
let header_len = unsigned_varint::encode::u32(self.buffer.len() as u32, &mut b);
|
|
||||||
io.write_all(header_len).await?;
|
|
||||||
io.write_all(&self.buffer).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<C> RequestResponseCodec for Codec<C>
|
|
||||||
where
|
|
||||||
C: RequestResponseCodec + Send,
|
|
||||||
C::Protocol: Sync,
|
|
||||||
{
|
|
||||||
type Protocol = ProtocolWrapper<C::Protocol>;
|
|
||||||
type Request = Message<C::Request>;
|
|
||||||
type Response = Message<C::Response>;
|
|
||||||
|
|
||||||
async fn read_request<T>(&mut self, p: &Self::Protocol, io: &mut T) -> io::Result<Self::Request>
|
|
||||||
where
|
|
||||||
T: AsyncRead + Unpin + Send,
|
|
||||||
{
|
|
||||||
let mut msg = Message::new(self.read_header(io).await?);
|
|
||||||
match msg.header.typ {
|
|
||||||
Some(Type::Request) => {
|
|
||||||
msg.data = Some(self.inner.read_request(&p.0, io).await?);
|
|
||||||
Ok(msg)
|
|
||||||
}
|
|
||||||
Some(Type::Credit) => Ok(msg),
|
|
||||||
Some(Type::Response) | Some(Type::Ack) | None => {
|
|
||||||
log::debug!(
|
|
||||||
"unexpected {:?} when expecting request or credit grant",
|
|
||||||
msg.header.typ
|
|
||||||
);
|
|
||||||
Err(io::ErrorKind::InvalidData.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_response<T>(
|
|
||||||
&mut self,
|
|
||||||
p: &Self::Protocol,
|
|
||||||
io: &mut T,
|
|
||||||
) -> io::Result<Self::Response>
|
|
||||||
where
|
|
||||||
T: AsyncRead + Unpin + Send,
|
|
||||||
{
|
|
||||||
let mut msg = Message::new(self.read_header(io).await?);
|
|
||||||
match msg.header.typ {
|
|
||||||
Some(Type::Response) => {
|
|
||||||
msg.data = Some(self.inner.read_response(&p.0, io).await?);
|
|
||||||
Ok(msg)
|
|
||||||
}
|
|
||||||
Some(Type::Ack) => Ok(msg),
|
|
||||||
Some(Type::Request) | Some(Type::Credit) | None => {
|
|
||||||
log::debug!(
|
|
||||||
"unexpected {:?} when expecting response or ack",
|
|
||||||
msg.header.typ
|
|
||||||
);
|
|
||||||
Err(io::ErrorKind::InvalidData.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_request<T>(
|
|
||||||
&mut self,
|
|
||||||
p: &Self::Protocol,
|
|
||||||
io: &mut T,
|
|
||||||
r: Self::Request,
|
|
||||||
) -> io::Result<()>
|
|
||||||
where
|
|
||||||
T: AsyncWrite + Unpin + Send,
|
|
||||||
{
|
|
||||||
self.write_header(&r.header, io).await?;
|
|
||||||
if let Some(data) = r.data {
|
|
||||||
self.inner.write_request(&p.0, io, data).await?
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_response<T>(
|
|
||||||
&mut self,
|
|
||||||
p: &Self::Protocol,
|
|
||||||
io: &mut T,
|
|
||||||
r: Self::Response,
|
|
||||||
) -> io::Result<()>
|
|
||||||
where
|
|
||||||
T: AsyncWrite + Unpin + Send,
|
|
||||||
{
|
|
||||||
self.write_header(&r.header, io).await?;
|
|
||||||
if let Some(data) = r.data {
|
|
||||||
self.inner.write_response(&p.0, io, data).await?
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(any(target_pointer_width = "64", target_pointer_width = "32"))]
|
|
||||||
fn u32_to_usize(n: u32) -> usize {
|
|
||||||
n as usize
|
|
||||||
}
|
|
@ -21,7 +21,7 @@
|
|||||||
//! Integration tests for the `RequestResponse` network behaviour.
|
//! Integration tests for the `RequestResponse` network behaviour.
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt, AsyncWriteExt};
|
use futures::{channel::mpsc, prelude::*, AsyncWriteExt};
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
identity,
|
identity,
|
||||||
muxing::StreamMuxerBox,
|
muxing::StreamMuxerBox,
|
||||||
@ -34,7 +34,6 @@ use libp2p_request_response::*;
|
|||||||
use libp2p_swarm::{Swarm, SwarmEvent};
|
use libp2p_swarm::{Swarm, SwarmEvent};
|
||||||
use libp2p_tcp::TcpConfig;
|
use libp2p_tcp::TcpConfig;
|
||||||
use rand::{self, Rng};
|
use rand::{self, Rng};
|
||||||
use std::{collections::HashSet, num::NonZeroU16};
|
|
||||||
use std::{io, iter};
|
use std::{io, iter};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -293,127 +292,6 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn ping_protocol_throttled() {
|
|
||||||
let ping = Ping("ping".to_string().into_bytes());
|
|
||||||
let pong = Pong("pong".to_string().into_bytes());
|
|
||||||
|
|
||||||
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
|
|
||||||
let cfg = RequestResponseConfig::default();
|
|
||||||
|
|
||||||
let (peer1_id, trans) = mk_transport();
|
|
||||||
let ping_proto1 = RequestResponse::throttled(PingCodec(), protocols.clone(), cfg.clone());
|
|
||||||
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id);
|
|
||||||
|
|
||||||
let (peer2_id, trans) = mk_transport();
|
|
||||||
let ping_proto2 = RequestResponse::throttled(PingCodec(), protocols, cfg);
|
|
||||||
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id);
|
|
||||||
|
|
||||||
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
|
|
||||||
|
|
||||||
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
|
|
||||||
swarm1.listen_on(addr).unwrap();
|
|
||||||
|
|
||||||
let expected_ping = ping.clone();
|
|
||||||
let expected_pong = pong.clone();
|
|
||||||
|
|
||||||
let limit1: u16 = rand::thread_rng().gen_range(1, 10);
|
|
||||||
let limit2: u16 = rand::thread_rng().gen_range(1, 10);
|
|
||||||
swarm1
|
|
||||||
.behaviour_mut()
|
|
||||||
.set_receive_limit(NonZeroU16::new(limit1).unwrap());
|
|
||||||
swarm2
|
|
||||||
.behaviour_mut()
|
|
||||||
.set_receive_limit(NonZeroU16::new(limit2).unwrap());
|
|
||||||
|
|
||||||
let peer1 = async move {
|
|
||||||
for i in 1.. {
|
|
||||||
match swarm1.select_next_some().await {
|
|
||||||
SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(),
|
|
||||||
SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message {
|
|
||||||
peer,
|
|
||||||
message:
|
|
||||||
RequestResponseMessage::Request {
|
|
||||||
request, channel, ..
|
|
||||||
},
|
|
||||||
})) => {
|
|
||||||
assert_eq!(&request, &expected_ping);
|
|
||||||
assert_eq!(&peer, &peer2_id);
|
|
||||||
swarm1
|
|
||||||
.behaviour_mut()
|
|
||||||
.send_response(channel, pong.clone())
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
SwarmEvent::Behaviour(throttled::Event::Event(
|
|
||||||
RequestResponseEvent::ResponseSent { peer, .. },
|
|
||||||
)) => {
|
|
||||||
assert_eq!(&peer, &peer2_id);
|
|
||||||
}
|
|
||||||
SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e),
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
if i % 31 == 0 {
|
|
||||||
let lim = rand::thread_rng().gen_range(1, 17);
|
|
||||||
swarm1
|
|
||||||
.behaviour_mut()
|
|
||||||
.override_receive_limit(&peer2_id, NonZeroU16::new(lim).unwrap());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let num_pings: u16 = rand::thread_rng().gen_range(100, 1000);
|
|
||||||
|
|
||||||
let peer2 = async move {
|
|
||||||
let mut count = 0;
|
|
||||||
let addr = rx.next().await.unwrap();
|
|
||||||
swarm2.behaviour_mut().add_address(&peer1_id, addr.clone());
|
|
||||||
|
|
||||||
let mut blocked = false;
|
|
||||||
let mut req_ids = HashSet::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if !blocked {
|
|
||||||
while let Some(id) = swarm2
|
|
||||||
.behaviour_mut()
|
|
||||||
.send_request(&peer1_id, ping.clone())
|
|
||||||
.ok()
|
|
||||||
{
|
|
||||||
req_ids.insert(id);
|
|
||||||
}
|
|
||||||
blocked = true;
|
|
||||||
}
|
|
||||||
match swarm2.select_next_some().await {
|
|
||||||
SwarmEvent::Behaviour(throttled::Event::ResumeSending(peer)) => {
|
|
||||||
assert_eq!(peer, peer1_id);
|
|
||||||
blocked = false
|
|
||||||
}
|
|
||||||
SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message {
|
|
||||||
peer,
|
|
||||||
message:
|
|
||||||
RequestResponseMessage::Response {
|
|
||||||
request_id,
|
|
||||||
response,
|
|
||||||
},
|
|
||||||
})) => {
|
|
||||||
count += 1;
|
|
||||||
assert_eq!(&response, &expected_pong);
|
|
||||||
assert_eq!(&peer, &peer1_id);
|
|
||||||
assert!(req_ids.remove(&request_id));
|
|
||||||
if count >= num_pings {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SwarmEvent::Behaviour(e) => panic!("Peer2: Unexpected event: {:?}", e),
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut pool = LocalPool::new();
|
|
||||||
pool.spawner().spawn(peer1.boxed()).unwrap();
|
|
||||||
pool.run_until(peer2);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) {
|
fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) {
|
||||||
let id_keys = identity::Keypair::generate_ed25519();
|
let id_keys = identity::Keypair::generate_ed25519();
|
||||||
let peer_id = id_keys.public().to_peer_id();
|
let peer_id = id_keys.public().to_peer_id();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user