diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 08be7686..c33ac7b6 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -8,7 +8,11 @@ - Manually implement `Debug` for `RequestResponseHandlerEvent` and `RequestProtocol`. See [PR 2183]. +- Remove `RequestResponse::throttled` and the `throttled` module. + See [PR 2236]. + [PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 +[PR 2236]: https://github.com/libp2p/rust-libp2p/pull/2236 # 0.12.0 [2021-07-12] diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 6f71ec9f..288ee60b 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -17,7 +17,6 @@ libp2p-core = { version = "0.30.0", path = "../../core", default-features = fals libp2p-swarm = { version = "0.31.0", path = "../../swarm" } log = "0.4.11" lru = "0.6" -minicbor = { version = "0.11", features = ["std", "derive"] } rand = "0.7" smallvec = "1.6.1" unsigned-varint = { version = "0.7", features = ["std", "futures"] } diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index ef3913c1..3df2b40b 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -58,11 +58,9 @@ pub mod codec; pub mod handler; -pub mod throttled; pub use codec::{ProtocolName, RequestResponseCodec}; pub use handler::ProtocolSupport; -pub use throttled::Throttled; use futures::channel::oneshot; use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent}; @@ -248,11 +246,6 @@ impl ResponseChannel { pub fn is_open(&self) -> bool { !self.sender.is_canceled() } - - /// Get the ID of the inbound request waiting for a response. - pub(crate) fn request_id(&self) -> RequestId { - self.request_id - } } /// The ID of an inbound or outbound request. @@ -369,19 +362,6 @@ where } } - /// Creates a `RequestResponse` which limits requests per peer. - /// - /// The behaviour is wrapped in [`Throttled`] and detects the limits - /// per peer at runtime which are then enforced. - pub fn throttled(c: TCodec, protos: I, cfg: RequestResponseConfig) -> Throttled - where - I: IntoIterator, - TCodec: Send, - TCodec::Protocol: Sync, - { - Throttled::new(c, protos, cfg) - } - /// Initiates sending a request. /// /// If the targeted peer is currently not connected, a dialing diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs deleted file mode 100644 index 2b8693bc..00000000 --- a/protocols/request-response/src/throttled.rs +++ /dev/null @@ -1,790 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Limit the number of requests peers can send to each other. -//! -//! Each peer is assigned a budget for sending and a budget for receiving -//! requests. Initially a peer assumes it has a send budget of 1. When its -//! budget has been used up its remote peer will send a credit message which -//! informs it how many more requests it can send before it needs to wait for -//! the next credit message. Credit messages which error or time out are -//! retried until they have reached the peer which is assumed once a -//! corresponding ack or a new request has been received from the peer. -//! -//! The `Throttled` behaviour wraps an existing `RequestResponse` behaviour -//! and uses a codec implementation that sends ordinary requests and responses -//! as well as a special credit message to which an ack message is expected -//! as a response. It does so by putting a small CBOR encoded header in front -//! of each message the inner codec produces. - -mod codec; - -use super::{ - ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseConfig, - RequestResponseEvent, RequestResponseMessage, -}; -use crate::handler::{RequestResponseHandler, RequestResponseHandlerEvent}; -use codec::{Codec, Message, ProtocolWrapper, Type}; -use futures::ready; -use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; -use libp2p_swarm::{ - DialError, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters, -}; -use lru::LruCache; -use std::{cmp::max, num::NonZeroU16}; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - task::{Context, Poll}, -}; - -pub type ResponseChannel = super::ResponseChannel>; - -/// A wrapper around [`RequestResponse`] which adds request limits per peer. -pub struct Throttled -where - C: RequestResponseCodec + Clone + Send + 'static, - C::Protocol: Sync, -{ - /// A random id used for logging. - id: u32, - /// The wrapped behaviour. - behaviour: RequestResponse>, - /// Information per peer. - peer_info: HashMap, - /// Information about previously connected peers. - offline_peer_info: LruCache, - /// The default limit applies to all peers unless overriden. - default_limit: Limit, - /// Permanent limit overrides per peer. - limit_overrides: HashMap, - /// Pending events to report in `Throttled::poll`. - events: VecDeque>>, - /// The current credit ID. - next_grant_id: u64, -} - -/// Information about a credit grant that is sent to remote peers. -#[derive(Clone, Copy, Debug)] -struct Grant { - /// The grant ID. Used to deduplicate retransmitted credit grants. - id: GrantId, - /// The ID of the outbound credit grant message. - request: RequestId, - /// The credit given in this grant, i.e. the number of additional - /// requests the remote is allowed to send. - credit: u16, -} - -/// Max. number of inbound requests that can be received. -#[derive(Clone, Copy, Debug)] -struct Limit { - /// The current receive limit. - max_recv: NonZeroU16, - /// The next receive limit which becomes active after - /// the current limit has been reached. - next_max: NonZeroU16, -} - -impl Limit { - /// Create a new limit. - fn new(max: NonZeroU16) -> Self { - // The max. limit provided will be effective after the initial request - // from a peer which is always allowed has been answered. Values greater - // than 1 would prevent sending the credit grant, leading to a stalling - // sender so we must not use `max` right away. - Limit { - max_recv: NonZeroU16::new(1).expect("1 > 0"), - next_max: max, - } - } - - /// Set a new limit. - /// - /// The new limit becomes effective when all current inbound - /// requests have been processed and replied to. - fn set(&mut self, next: NonZeroU16) { - self.next_max = next - } - - /// Activate the new limit. - fn switch(&mut self) -> u16 { - self.max_recv = self.next_max; - self.max_recv.get() - } -} - -type GrantId = u64; - -/// Information related to the current send budget with a peer. -#[derive(Clone, Debug)] -struct SendBudget { - /// The last received credit grant. - grant: Option, - /// The remaining credit for requests to send. - remaining: u16, - /// Credit grant requests received and acknowledged where the outcome - /// of the acknowledgement (i.e. response sent) is still undetermined. - /// Used to avoid emitting events for successful (`ResponseSent`) or failed - /// acknowledgements. - received: HashSet, -} - -/// Information related to the current receive budget with a peer. -#[derive(Clone, Debug)] -struct RecvBudget { - /// The grant currently given to the remote but yet to be acknowledged. - /// - /// Set to `Some` when a new grant is sent to the remote, followed - /// by `None` when an acknowledgment or a request is received. The - /// latter is seen as an implicit acknowledgement. - grant: Option, - /// The limit for new credit grants when the `remaining` credit is - /// exhausted. - limit: Limit, - /// The remaining credit for requests to receive. - remaining: u16, - /// Credit grants sent whose outcome is still undetermined. - /// Used to avoid emitting events for failed credit grants. - /// - /// > **Note**: While receiving an inbound request is an implicit - /// > acknowledgement for the last sent `grant`, the outcome of - /// > the outbound request remains undetermined until a success or - /// > failure event is received for that request or the corresponding - /// > connection closes. - sent: HashSet, -} - -/// Budget information about a peer. -#[derive(Clone, Debug)] -struct PeerInfo { - send_budget: SendBudget, - recv_budget: RecvBudget, -} - -impl PeerInfo { - fn new(recv_limit: Limit) -> Self { - PeerInfo { - send_budget: SendBudget { - grant: None, - remaining: 1, - received: HashSet::new(), - }, - recv_budget: RecvBudget { - grant: None, - limit: recv_limit, - remaining: 1, - sent: HashSet::new(), - }, - } - } - - fn into_disconnected(mut self) -> Self { - self.send_budget.received = HashSet::new(); - self.send_budget.remaining = 1; - self.recv_budget.sent = HashSet::new(); - self.recv_budget.remaining = max(1, self.recv_budget.remaining); - // Since we potentially reset the remaining receive budget, - // we forget about the potentially still unacknowledged last grant. - self.recv_budget.grant = None; - self - } -} - -impl Throttled -where - C: RequestResponseCodec + Send + Clone, - C::Protocol: Sync, -{ - /// Create a new throttled request-response behaviour. - pub fn new(c: C, protos: I, cfg: RequestResponseConfig) -> Self - where - I: IntoIterator, - C: Send, - C::Protocol: Sync, - { - let protos = protos - .into_iter() - .map(|(p, ps)| (ProtocolWrapper::new(b"/t/1", p), ps)); - Throttled::from(RequestResponse::new(Codec::new(c, 8192), protos, cfg)) - } - - /// Wrap an existing `RequestResponse` behaviour and apply send/recv limits. - pub fn from(behaviour: RequestResponse>) -> Self { - Throttled { - id: rand::random(), - behaviour, - peer_info: HashMap::new(), - offline_peer_info: LruCache::new(8192), - default_limit: Limit::new(NonZeroU16::new(1).expect("1 > 0")), - limit_overrides: HashMap::new(), - events: VecDeque::new(), - next_grant_id: 0, - } - } - - /// Set the global default receive limit per peer. - pub fn set_receive_limit(&mut self, limit: NonZeroU16) { - log::trace!("{:08x}: new default limit: {:?}", self.id, limit); - self.default_limit = Limit::new(limit) - } - - /// Override the receive limit of a single peer. - pub fn override_receive_limit(&mut self, p: &PeerId, limit: NonZeroU16) { - log::debug!("{:08x}: override limit for {}: {:?}", self.id, p, limit); - if let Some(info) = self.peer_info.get_mut(p) { - info.recv_budget.limit.set(limit) - } else if let Some(info) = self.offline_peer_info.get_mut(p) { - info.recv_budget.limit.set(limit) - } - self.limit_overrides.insert(*p, Limit::new(limit)); - } - - /// Remove any limit overrides for the given peer. - pub fn remove_override(&mut self, p: &PeerId) { - log::trace!("{:08x}: removing limit override for {}", self.id, p); - self.limit_overrides.remove(p); - } - - /// Has the limit of outbound requests been reached for the given peer? - pub fn can_send(&mut self, p: &PeerId) -> bool { - self.peer_info - .get(p) - .map(|i| i.send_budget.remaining > 0) - .unwrap_or(true) - } - - /// Send a request to a peer. - /// - /// If the limit of outbound requests has been reached, the request is - /// returned. Sending more outbound requests should only be attempted - /// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`]. - pub fn send_request(&mut self, p: &PeerId, req: C::Request) -> Result { - let connected = &mut self.peer_info; - let disconnected = &mut self.offline_peer_info; - let remaining = if let Some(info) = connected.get_mut(p).or_else(|| disconnected.get_mut(p)) - { - if info.send_budget.remaining == 0 { - log::trace!( - "{:08x}: no more budget to send another request to {}", - self.id, - p - ); - return Err(req); - } - info.send_budget.remaining -= 1; - info.send_budget.remaining - } else { - let limit = self - .limit_overrides - .get(p) - .copied() - .unwrap_or(self.default_limit); - let mut info = PeerInfo::new(limit); - info.send_budget.remaining -= 1; - let remaining = info.send_budget.remaining; - self.offline_peer_info.put(*p, info); - remaining - }; - - let rid = self.behaviour.send_request(p, Message::request(req)); - - log::trace! { "{:08x}: sending request {} to {} (budget remaining = {})", - self.id, - rid, - p, - remaining - }; - - Ok(rid) - } - - /// Answer an inbound request with a response. - /// - /// See [`RequestResponse::send_response`] for details. - pub fn send_response( - &mut self, - ch: ResponseChannel, - res: C::Response, - ) -> Result<(), C::Response> { - log::trace!( - "{:08x}: sending response {} to peer {}", - self.id, - ch.request_id(), - &ch.peer - ); - if let Some(info) = self.peer_info.get_mut(&ch.peer) { - if info.recv_budget.remaining == 0 { - // need to send more credit to the remote peer - let crd = info.recv_budget.limit.switch(); - info.recv_budget.remaining = info.recv_budget.limit.max_recv.get(); - self.send_credit(&ch.peer, crd); - } - } - match self.behaviour.send_response(ch, Message::response(res)) { - Ok(()) => Ok(()), - Err(m) => Err(m.into_parts().1.expect("Missing response data.")), - } - } - - /// Add a known peer address. - /// - /// See [`RequestResponse::add_address`] for details. - pub fn add_address(&mut self, p: &PeerId, a: Multiaddr) { - self.behaviour.add_address(p, a) - } - - /// Remove a previously added peer address. - /// - /// See [`RequestResponse::remove_address`] for details. - pub fn remove_address(&mut self, p: &PeerId, a: &Multiaddr) { - self.behaviour.remove_address(p, a) - } - - /// Are we connected to the given peer? - /// - /// See [`RequestResponse::is_connected`] for details. - pub fn is_connected(&self, p: &PeerId) -> bool { - self.behaviour.is_connected(p) - } - - /// Are we waiting for a response to the given request? - /// - /// See [`RequestResponse::is_pending_outbound`] for details. - pub fn is_pending_outbound(&self, p: &PeerId, r: &RequestId) -> bool { - self.behaviour.is_pending_outbound(p, r) - } - - /// Is the remote waiting for the local node to respond to the given - /// request? - /// - /// See [`RequestResponse::is_pending_inbound`] for details. - pub fn is_pending_inbound(&self, p: &PeerId, r: &RequestId) -> bool { - self.behaviour.is_pending_inbound(p, r) - } - - /// Send a credit grant to the given peer. - fn send_credit(&mut self, p: &PeerId, credit: u16) { - if let Some(info) = self.peer_info.get_mut(p) { - let cid = self.next_grant_id; - self.next_grant_id += 1; - let rid = self.behaviour.send_request(p, Message::credit(credit, cid)); - log::trace!( - "{:08x}: sending {} credit as grant {} to {}", - self.id, - credit, - cid, - p - ); - let grant = Grant { - id: cid, - request: rid, - credit, - }; - info.recv_budget.grant = Some(grant); - info.recv_budget.sent.insert(rid); - } - } -} - -/// A Wrapper around [`RequestResponseEvent`]. -#[derive(Debug)] -pub enum Event { - /// A regular request-response event. - Event(RequestResponseEvent), - /// We received more inbound requests than allowed. - TooManyInboundRequests(PeerId), - /// When previously reaching the send limit of a peer, - /// this event is eventually emitted when sending is - /// allowed to resume. - ResumeSending(PeerId), -} - -impl NetworkBehaviour for Throttled -where - C: RequestResponseCodec + Send + Clone + 'static, - C::Protocol: Sync, -{ - type ProtocolsHandler = RequestResponseHandler>; - type OutEvent = Event>; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - self.behaviour.new_handler() - } - - fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { - self.behaviour.addresses_of_peer(p) - } - - fn inject_connection_established( - &mut self, - p: &PeerId, - id: &ConnectionId, - end: &ConnectedPoint, - ) { - self.behaviour.inject_connection_established(p, id, end) - } - - fn inject_connection_closed( - &mut self, - peer: &PeerId, - id: &ConnectionId, - end: &ConnectedPoint, - handler: ::Handler, - ) { - self.behaviour - .inject_connection_closed(peer, id, end, handler); - if let Some(info) = self.peer_info.get_mut(peer) { - if let Some(grant) = &mut info.recv_budget.grant { - log::debug! { "{:08x}: resending credit grant {} to {} after connection closed", - self.id, - grant.id, - peer - }; - let msg = Message::credit(grant.credit, grant.id); - grant.request = self.behaviour.send_request(peer, msg) - } - } - } - - fn inject_connected(&mut self, p: &PeerId) { - log::trace!("{:08x}: connected to {}", self.id, p); - self.behaviour.inject_connected(p); - // The limit may have been added by `Throttled::send_request` already. - if !self.peer_info.contains_key(p) { - if let Some(info) = self.offline_peer_info.pop(p) { - let recv_budget = info.recv_budget.remaining; - self.peer_info.insert(*p, info); - if recv_budget > 1 { - self.send_credit(p, recv_budget - 1); - } - } else { - let limit = self - .limit_overrides - .get(p) - .copied() - .unwrap_or(self.default_limit); - self.peer_info.insert(*p, PeerInfo::new(limit)); - } - } - } - - fn inject_disconnected(&mut self, p: &PeerId) { - log::trace!("{:08x}: disconnected from {}", self.id, p); - if let Some(info) = self.peer_info.remove(p) { - self.offline_peer_info.put(*p, info.into_disconnected()); - } - self.behaviour.inject_disconnected(p) - } - - fn inject_dial_failure( - &mut self, - p: &PeerId, - handler: Self::ProtocolsHandler, - error: DialError, - ) { - self.behaviour.inject_dial_failure(p, handler, error) - } - - fn inject_event( - &mut self, - p: PeerId, - i: ConnectionId, - e: RequestResponseHandlerEvent>, - ) { - self.behaviour.inject_event(p, i, e) - } - - fn poll( - &mut self, - cx: &mut Context<'_>, - params: &mut impl PollParameters, - ) -> Poll> { - loop { - if let Some(ev) = self.events.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); - } else if self.events.capacity() > super::EMPTY_QUEUE_SHRINK_THRESHOLD { - self.events.shrink_to_fit() - } - - let event = match ready!(self.behaviour.poll(cx, params)) { - NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::Message { - peer, - message, - }) => { - let message = match message { - RequestResponseMessage::Response { - request_id, - response, - } => match &response.header().typ { - Some(Type::Ack) => { - if let Some(info) = self.peer_info.get_mut(&peer) { - if let Some(id) = info.recv_budget.grant.as_ref().map(|c| c.id) - { - if Some(id) == response.header().ident { - log::trace!( - "{:08x}: received ack {} from {}", - self.id, - id, - peer - ); - info.recv_budget.grant = None; - } - } - info.recv_budget.sent.remove(&request_id); - } - continue; - } - Some(Type::Response) => { - log::trace!( - "{:08x}: received response {} from {}", - self.id, - request_id, - peer - ); - if let Some(rs) = response.into_parts().1 { - RequestResponseMessage::Response { - request_id, - response: rs, - } - } else { - log::error! { "{:08x}: missing data for response {} from peer {}", - self.id, - request_id, - peer - } - continue; - } - } - ty => { - log::trace! { - "{:08x}: unknown message type: {:?} from {}; expected response or credit", - self.id, - ty, - peer - }; - continue; - } - }, - RequestResponseMessage::Request { - request_id, - request, - channel, - } => match &request.header().typ { - Some(Type::Credit) => { - if let Some(info) = self.peer_info.get_mut(&peer) { - let id = if let Some(n) = request.header().ident { - n - } else { - log::warn! { "{:08x}: missing credit id in message from {}", - self.id, - peer - } - continue; - }; - let credit = request.header().credit.unwrap_or(0); - log::trace! { "{:08x}: received {} additional credit {} from {}", - self.id, - credit, - id, - peer - }; - if info.send_budget.grant < Some(id) { - if info.send_budget.remaining == 0 && credit > 0 { - log::trace!( - "{:08x}: sending to peer {} can resume", - self.id, - peer - ); - self.events.push_back(Event::ResumeSending(peer)) - } - info.send_budget.remaining += credit; - info.send_budget.grant = Some(id); - } - // Note: Failing to send a response to a credit grant is - // handled along with other inbound failures further below. - let _ = self.behaviour.send_response(channel, Message::ack(id)); - info.send_budget.received.insert(request_id); - } - continue; - } - Some(Type::Request) => { - if let Some(info) = self.peer_info.get_mut(&peer) { - log::trace! { "{:08x}: received request {} (recv. budget = {})", - self.id, - request_id, - info.recv_budget.remaining - }; - if info.recv_budget.remaining == 0 { - log::debug!( - "{:08x}: peer {} exceeds its budget", - self.id, - peer - ); - self.events.push_back(Event::TooManyInboundRequests(peer)); - continue; - } - info.recv_budget.remaining -= 1; - // We consider a request as proof that our credit grant has - // reached the peer. Usually, an ACK has already been - // received. - info.recv_budget.grant = None; - } - if let Some(rq) = request.into_parts().1 { - RequestResponseMessage::Request { - request_id, - request: rq, - channel, - } - } else { - log::error! { "{:08x}: missing data for request {} from peer {}", - self.id, - request_id, - peer - } - continue; - } - } - ty => { - log::trace! { - "{:08x}: unknown message type: {:?} from {}; expected request or ack", - self.id, - ty, - peer - }; - continue; - } - }, - }; - let event = RequestResponseEvent::Message { peer, message }; - NetworkBehaviourAction::GenerateEvent(Event::Event(event)) - } - NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::OutboundFailure { - peer, - request_id, - error, - }) => { - if let Some(info) = self.peer_info.get_mut(&peer) { - if let Some(grant) = info.recv_budget.grant.as_mut() { - if grant.request == request_id { - log::debug! { - "{:08x}: failed to send {} as credit {} to {}; retrying...", - self.id, - grant.credit, - grant.id, - peer - }; - let msg = Message::credit(grant.credit, grant.id); - grant.request = self.behaviour.send_request(&peer, msg); - } - } - - // If the outbound failure was for a credit message, don't report it on - // the public API and retry the sending. - if info.recv_budget.sent.remove(&request_id) { - continue; - } - } - let event = RequestResponseEvent::OutboundFailure { - peer, - request_id, - error, - }; - NetworkBehaviourAction::GenerateEvent(Event::Event(event)) - } - NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::InboundFailure { - peer, - request_id, - error, - }) => { - // If the inbound failure occurred in the context of responding to a - // credit grant, don't report it on the public API. - if let Some(info) = self.peer_info.get_mut(&peer) { - if info.send_budget.received.remove(&request_id) { - log::debug! { - "{:08}: failed to acknowledge credit grant from {}: {:?}", - self.id, peer, error - }; - continue; - } - } - let event = RequestResponseEvent::InboundFailure { - peer, - request_id, - error, - }; - NetworkBehaviourAction::GenerateEvent(Event::Event(event)) - } - NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::ResponseSent { - peer, - request_id, - }) => { - // If this event is for an ACK response that was sent for - // the last received credit grant, skip it. - if let Some(info) = self.peer_info.get_mut(&peer) { - if info.send_budget.received.remove(&request_id) { - log::trace! { - "{:08}: successfully sent ACK for credit grant {:?}.", - self.id, - info.send_budget.grant, - } - continue; - } - } - NetworkBehaviourAction::GenerateEvent(Event::Event( - RequestResponseEvent::ResponseSent { peer, request_id }, - )) - } - NetworkBehaviourAction::DialAddress { address, handler } => { - NetworkBehaviourAction::DialAddress { address, handler } - } - NetworkBehaviourAction::DialPeer { - peer_id, - condition, - handler, - } => NetworkBehaviourAction::DialPeer { - peer_id, - condition, - handler, - }, - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - } => NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }, - NetworkBehaviourAction::ReportObservedAddr { address, score } => { - NetworkBehaviourAction::ReportObservedAddr { address, score } - } - NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - } => NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }, - }; - - return Poll::Ready(event); - } - } -} diff --git a/protocols/request-response/src/throttled/codec.rs b/protocols/request-response/src/throttled/codec.rs deleted file mode 100644 index f82c4ae3..00000000 --- a/protocols/request-response/src/throttled/codec.rs +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use super::RequestResponseCodec; -use async_trait::async_trait; -use bytes::{Bytes, BytesMut}; -use futures::prelude::*; -use libp2p_core::ProtocolName; -use minicbor::{Decode, Encode}; -use std::io; -use unsigned_varint::{aio, io::ReadError}; - -/// A protocol header. -#[derive(Debug, Default, Clone, PartialEq, Eq, Encode, Decode)] -#[cbor(map)] -pub struct Header { - /// The type of message. - #[n(0)] - pub typ: Option, - /// The number of additional requests the remote is willing to receive. - #[n(1)] - pub credit: Option, - /// An identifier used for sending credit grants. - #[n(2)] - pub ident: Option, -} - -/// A protocol message type. -#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] -pub enum Type { - #[n(0)] - Request, - #[n(1)] - Response, - #[n(2)] - Credit, - #[n(3)] - Ack, -} - -/// A protocol message consisting of header and data. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Message { - header: Header, - data: Option, -} - -impl Message { - /// Create a new message of some type. - fn new(header: Header) -> Self { - Message { header, data: None } - } - - /// Create a request message. - pub fn request(data: T) -> Self { - let mut m = Message::new(Header { - typ: Some(Type::Request), - ..Header::default() - }); - m.data = Some(data); - m - } - - /// Create a response message. - pub fn response(data: T) -> Self { - let mut m = Message::new(Header { - typ: Some(Type::Response), - ..Header::default() - }); - m.data = Some(data); - m - } - - /// Create a credit grant. - pub fn credit(credit: u16, ident: u64) -> Self { - Message::new(Header { - typ: Some(Type::Credit), - credit: Some(credit), - ident: Some(ident), - }) - } - - /// Create an acknowledge message. - pub fn ack(ident: u64) -> Self { - Message::new(Header { - typ: Some(Type::Ack), - credit: None, - ident: Some(ident), - }) - } - - /// Access the message header. - pub fn header(&self) -> &Header { - &self.header - } - - /// Access the message data. - pub fn data(&self) -> Option<&T> { - self.data.as_ref() - } - - /// Consume this message and return header and data. - pub fn into_parts(self) -> (Header, Option) { - (self.header, self.data) - } -} - -/// A wrapper around a `ProtocolName` impl which augments the protocol name. -/// -/// The type implements `ProtocolName` itself and creates a name for a -/// request-response protocol based on the protocol name of the wrapped type. -#[derive(Debug, Clone)] -pub struct ProtocolWrapper

(P, Bytes); - -impl ProtocolWrapper

{ - pub fn new(prefix: &[u8], p: P) -> Self { - let mut full = BytesMut::from(prefix); - full.extend_from_slice(p.protocol_name()); - ProtocolWrapper(p, full.freeze()) - } -} - -impl

ProtocolName for ProtocolWrapper

{ - fn protocol_name(&self) -> &[u8] { - self.1.as_ref() - } -} - -/// A `RequestResponseCodec` wrapper that adds headers to the payload data. -#[derive(Debug, Clone)] -pub struct Codec { - /// The wrapped codec. - inner: C, - /// Encoding/decoding buffer. - buffer: Vec, - /// Max. header length. - max_header_len: u32, -} - -impl Codec { - /// Create a codec by wrapping an existing one. - pub fn new(c: C, max_header_len: u32) -> Self { - Codec { - inner: c, - buffer: Vec::new(), - max_header_len, - } - } - - /// Read and decode a request header. - async fn read_header(&mut self, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - H: for<'a> minicbor::Decode<'a>, - { - let header_len = aio::read_u32(&mut *io).await.map_err(|e| match e { - ReadError::Io(e) => e, - other => io::Error::new(io::ErrorKind::Other, other), - })?; - if header_len > self.max_header_len { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "header too large to read", - )); - } - self.buffer.resize(u32_to_usize(header_len), 0u8); - io.read_exact(&mut self.buffer).await?; - minicbor::decode(&self.buffer).map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - } - - /// Encode and write a response header. - async fn write_header(&mut self, hdr: &H, io: &mut T) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - H: minicbor::Encode, - { - self.buffer.clear(); - minicbor::encode(hdr, &mut self.buffer) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - if self.buffer.len() > u32_to_usize(self.max_header_len) { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "header too large to write", - )); - } - let mut b = unsigned_varint::encode::u32_buffer(); - let header_len = unsigned_varint::encode::u32(self.buffer.len() as u32, &mut b); - io.write_all(header_len).await?; - io.write_all(&self.buffer).await - } -} - -#[async_trait] -impl RequestResponseCodec for Codec -where - C: RequestResponseCodec + Send, - C::Protocol: Sync, -{ - type Protocol = ProtocolWrapper; - type Request = Message; - type Response = Message; - - async fn read_request(&mut self, p: &Self::Protocol, io: &mut T) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut msg = Message::new(self.read_header(io).await?); - match msg.header.typ { - Some(Type::Request) => { - msg.data = Some(self.inner.read_request(&p.0, io).await?); - Ok(msg) - } - Some(Type::Credit) => Ok(msg), - Some(Type::Response) | Some(Type::Ack) | None => { - log::debug!( - "unexpected {:?} when expecting request or credit grant", - msg.header.typ - ); - Err(io::ErrorKind::InvalidData.into()) - } - } - } - - async fn read_response( - &mut self, - p: &Self::Protocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let mut msg = Message::new(self.read_header(io).await?); - match msg.header.typ { - Some(Type::Response) => { - msg.data = Some(self.inner.read_response(&p.0, io).await?); - Ok(msg) - } - Some(Type::Ack) => Ok(msg), - Some(Type::Request) | Some(Type::Credit) | None => { - log::debug!( - "unexpected {:?} when expecting response or ack", - msg.header.typ - ); - Err(io::ErrorKind::InvalidData.into()) - } - } - } - - async fn write_request( - &mut self, - p: &Self::Protocol, - io: &mut T, - r: Self::Request, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - self.write_header(&r.header, io).await?; - if let Some(data) = r.data { - self.inner.write_request(&p.0, io, data).await? - } - Ok(()) - } - - async fn write_response( - &mut self, - p: &Self::Protocol, - io: &mut T, - r: Self::Response, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - self.write_header(&r.header, io).await?; - if let Some(data) = r.data { - self.inner.write_response(&p.0, io, data).await? - } - Ok(()) - } -} - -#[cfg(any(target_pointer_width = "64", target_pointer_width = "32"))] -fn u32_to_usize(n: u32) -> usize { - n as usize -} diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 884a378b..6cd6a732 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -21,7 +21,7 @@ //! Integration tests for the `RequestResponse` network behaviour. use async_trait::async_trait; -use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt, AsyncWriteExt}; +use futures::{channel::mpsc, prelude::*, AsyncWriteExt}; use libp2p_core::{ identity, muxing::StreamMuxerBox, @@ -34,7 +34,6 @@ use libp2p_request_response::*; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_tcp::TcpConfig; use rand::{self, Rng}; -use std::{collections::HashSet, num::NonZeroU16}; use std::{io, iter}; #[test] @@ -293,127 +292,6 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { }); } -#[test] -fn ping_protocol_throttled() { - let ping = Ping("ping".to_string().into_bytes()); - let pong = Pong("pong".to_string().into_bytes()); - - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - - let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::throttled(PingCodec(), protocols.clone(), cfg.clone()); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id); - - let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::throttled(PingCodec(), protocols, cfg); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id); - - let (mut tx, mut rx) = mpsc::channel::(1); - - let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - swarm1.listen_on(addr).unwrap(); - - let expected_ping = ping.clone(); - let expected_pong = pong.clone(); - - let limit1: u16 = rand::thread_rng().gen_range(1, 10); - let limit2: u16 = rand::thread_rng().gen_range(1, 10); - swarm1 - .behaviour_mut() - .set_receive_limit(NonZeroU16::new(limit1).unwrap()); - swarm2 - .behaviour_mut() - .set_receive_limit(NonZeroU16::new(limit2).unwrap()); - - let peer1 = async move { - for i in 1.. { - match swarm1.select_next_some().await { - SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(), - SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Request { - request, channel, .. - }, - })) => { - assert_eq!(&request, &expected_ping); - assert_eq!(&peer, &peer2_id); - swarm1 - .behaviour_mut() - .send_response(channel, pong.clone()) - .unwrap(); - } - SwarmEvent::Behaviour(throttled::Event::Event( - RequestResponseEvent::ResponseSent { peer, .. }, - )) => { - assert_eq!(&peer, &peer2_id); - } - SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e), - _ => {} - } - if i % 31 == 0 { - let lim = rand::thread_rng().gen_range(1, 17); - swarm1 - .behaviour_mut() - .override_receive_limit(&peer2_id, NonZeroU16::new(lim).unwrap()); - } - } - }; - - let num_pings: u16 = rand::thread_rng().gen_range(100, 1000); - - let peer2 = async move { - let mut count = 0; - let addr = rx.next().await.unwrap(); - swarm2.behaviour_mut().add_address(&peer1_id, addr.clone()); - - let mut blocked = false; - let mut req_ids = HashSet::new(); - - loop { - if !blocked { - while let Some(id) = swarm2 - .behaviour_mut() - .send_request(&peer1_id, ping.clone()) - .ok() - { - req_ids.insert(id); - } - blocked = true; - } - match swarm2.select_next_some().await { - SwarmEvent::Behaviour(throttled::Event::ResumeSending(peer)) => { - assert_eq!(peer, peer1_id); - blocked = false - } - SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message { - peer, - message: - RequestResponseMessage::Response { - request_id, - response, - }, - })) => { - count += 1; - assert_eq!(&response, &expected_pong); - assert_eq!(&peer, &peer1_id); - assert!(req_ids.remove(&request_id)); - if count >= num_pings { - break; - } - } - SwarmEvent::Behaviour(e) => panic!("Peer2: Unexpected event: {:?}", e), - _ => {} - } - } - }; - - let mut pool = LocalPool::new(); - pool.spawner().spawn(peer1.boxed()).unwrap(); - pool.run_until(peer2); -} - fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().to_peer_id();