mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-07-05 20:51:37 +00:00
Restore RequestResponse::throttled
. (#1726)
* Restore `RequestResponse::throttled`. In contrast to the existing "throttled" approach this PR adds back- pressure to the protocol without requiring pre-existing knowledge of all nodes about their limits. It adds small, CBOR-encoded headers to the actual payload data. Extra credit messages communicate back to the sender how many more requests it is allowed to send. * Remove some noise. * Resend credit grant after connection closed. Should an error in some lower layer cause a connection to be closed, our previously sent credit grant may not have reached the remote peer. Therefore, pessimistically, a credit grant is resent whenever a connection is closed. The remote ignores duplicate grants. * Remove inbound/outbound tracking per peer. * Send ACK as response to duplicate credit grants. * Simplify. * Fix grammar. * Incorporate review feedback. - Remove `ResponseSent` which was a leftover from previous attemps and issue a credit grant immediately in `send_response`. - Only resend credit grants after a connection is closed if we are still connected to this peer. * Move codec/header.rs to throttled/codec.rs. * More review suggestions. * Generalise `ProtocolWrapper` and use shorter prefix. * Update protocols/request-response/src/lib.rs Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Update protocols/request-response/src/throttled.rs Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Update protocols/request-response/src/throttled.rs Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> * Minor comment changes. * Limit max. header size to 8KiB * Always construct initial limit with 1. Since honest senders always assume a send budget of 1 and wait for credit afterwards, setting the default limit to a higher value can only become effective after informing the peer about it which means leaving `max_recv` at 1 and setting `next_max` to the desired value. Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
@ -70,13 +70,11 @@
|
||||
|
||||
pub mod codec;
|
||||
pub mod handler;
|
||||
|
||||
// Disabled until #1706 is fixed:
|
||||
// pub mod throttled;
|
||||
// pub use throttled::Throttled;
|
||||
pub mod throttled;
|
||||
|
||||
pub use codec::{RequestResponseCodec, ProtocolName};
|
||||
pub use handler::ProtocolSupport;
|
||||
pub use throttled::Throttled;
|
||||
|
||||
use futures::{
|
||||
channel::oneshot,
|
||||
@ -102,21 +100,25 @@ use libp2p_swarm::{
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
collections::{VecDeque, HashMap},
|
||||
fmt,
|
||||
time::Duration,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
task::{Context, Poll}
|
||||
};
|
||||
|
||||
/// An inbound request or response.
|
||||
#[derive(Debug)]
|
||||
pub enum RequestResponseMessage<TRequest, TResponse> {
|
||||
pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TResponse> {
|
||||
/// A request message.
|
||||
Request {
|
||||
/// The ID of this request.
|
||||
request_id: RequestId,
|
||||
/// The request message.
|
||||
request: TRequest,
|
||||
/// The sender of the request who is awaiting a response.
|
||||
///
|
||||
/// See [`RequestResponse::send_response`].
|
||||
channel: ResponseChannel<TResponse>,
|
||||
channel: ResponseChannel<TChannelResponse>,
|
||||
},
|
||||
/// A response message.
|
||||
Response {
|
||||
@ -131,13 +133,13 @@ pub enum RequestResponseMessage<TRequest, TResponse> {
|
||||
|
||||
/// The events emitted by a [`RequestResponse`] protocol.
|
||||
#[derive(Debug)]
|
||||
pub enum RequestResponseEvent<TRequest, TResponse> {
|
||||
pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse> {
|
||||
/// An incoming message (request or response).
|
||||
Message {
|
||||
/// The peer who sent the message.
|
||||
peer: PeerId,
|
||||
/// The incoming message.
|
||||
message: RequestResponseMessage<TRequest, TResponse>
|
||||
message: RequestResponseMessage<TRequest, TResponse, TChannelResponse>
|
||||
},
|
||||
/// An outbound request failed.
|
||||
OutboundFailure {
|
||||
@ -152,6 +154,8 @@ pub enum RequestResponseEvent<TRequest, TResponse> {
|
||||
InboundFailure {
|
||||
/// The peer from whom the request was received.
|
||||
peer: PeerId,
|
||||
/// The ID of the failed inbound request.
|
||||
request_id: RequestId,
|
||||
/// The error that occurred.
|
||||
error: InboundFailure,
|
||||
},
|
||||
@ -188,6 +192,8 @@ pub enum InboundFailure {
|
||||
Timeout,
|
||||
/// The local peer supports none of the requested protocols.
|
||||
UnsupportedProtocols,
|
||||
/// The connection closed before a response was delivered.
|
||||
ConnectionClosed,
|
||||
}
|
||||
|
||||
/// A channel for sending a response to an inbound request.
|
||||
@ -195,6 +201,7 @@ pub enum InboundFailure {
|
||||
/// See [`RequestResponse::send_response`].
|
||||
#[derive(Debug)]
|
||||
pub struct ResponseChannel<TResponse> {
|
||||
request_id: RequestId,
|
||||
peer: PeerId,
|
||||
sender: oneshot::Sender<TResponse>,
|
||||
}
|
||||
@ -210,14 +217,23 @@ impl<TResponse> ResponseChannel<TResponse> {
|
||||
pub fn is_open(&self) -> bool {
|
||||
!self.sender.is_canceled()
|
||||
}
|
||||
|
||||
/// Get the ID of the inbound request waiting for a response.
|
||||
pub(crate) fn request_id(&self) -> RequestId {
|
||||
self.request_id
|
||||
}
|
||||
}
|
||||
|
||||
/// The (local) ID of an outgoing request.
|
||||
///
|
||||
/// See [`RequestResponse::send_request`].
|
||||
/// The ID of an inbound or outbound request.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct RequestId(u64);
|
||||
|
||||
impl fmt::Display for RequestId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// The configuration for a `RequestResponse` protocol.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RequestResponseConfig {
|
||||
@ -259,6 +275,8 @@ where
|
||||
outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
|
||||
/// The next (local) request ID.
|
||||
next_request_id: RequestId,
|
||||
/// The next (inbound) request ID.
|
||||
next_inbound_id: Arc<AtomicU64>,
|
||||
/// The protocol configuration.
|
||||
config: RequestResponseConfig,
|
||||
/// The protocol codec for reading and writing requests and responses.
|
||||
@ -276,7 +294,7 @@ where
|
||||
/// to be established.
|
||||
pending_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
|
||||
/// Responses that have not yet been received.
|
||||
pending_responses: HashMap<RequestId, (PeerId, ConnectionId)>,
|
||||
pending_responses: HashMap<RequestId, (PeerId, ConnectionId)>
|
||||
}
|
||||
|
||||
impl<TCodec> RequestResponse<TCodec>
|
||||
@ -303,6 +321,7 @@ where
|
||||
inbound_protocols,
|
||||
outbound_protocols,
|
||||
next_request_id: RequestId(1),
|
||||
next_inbound_id: Arc::new(AtomicU64::new(1)),
|
||||
config: cfg,
|
||||
codec,
|
||||
pending_events: VecDeque::new(),
|
||||
@ -313,11 +332,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Disabled until #1706 is fixed.
|
||||
// /// Wrap this behaviour in [`Throttled`] to limit the number of concurrent requests per peer.
|
||||
// pub fn throttled(self) -> Throttled<TCodec> {
|
||||
// Throttled::new(self)
|
||||
// }
|
||||
/// Creates a `RequestResponse` which limits requests per peer.
|
||||
///
|
||||
/// The behaviour is wrapped in [`Throttled`] and detects the limits
|
||||
/// per peer at runtime which are then enforced.
|
||||
pub fn throttled<I>(c: TCodec, protos: I, cfg: RequestResponseConfig) -> Throttled<TCodec>
|
||||
where
|
||||
I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
|
||||
TCodec: Send,
|
||||
TCodec::Protocol: Sync
|
||||
{
|
||||
Throttled::new(c, protos, cfg)
|
||||
}
|
||||
|
||||
/// Initiates sending a request.
|
||||
///
|
||||
@ -389,13 +415,17 @@ where
|
||||
|
||||
/// Checks whether a peer is currently connected.
|
||||
pub fn is_connected(&self, peer: &PeerId) -> bool {
|
||||
self.connected.contains_key(peer)
|
||||
if let Some(connections) = self.connected.get(peer) {
|
||||
!connections.is_empty()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether an outbound request initiated by
|
||||
/// [`RequestResponse::send_request`] is still pending, i.e. waiting
|
||||
/// for a response.
|
||||
pub fn is_pending(&self, req_id: &RequestId) -> bool {
|
||||
pub fn is_pending_outbound(&self, req_id: &RequestId) -> bool {
|
||||
self.pending_responses.contains_key(req_id)
|
||||
}
|
||||
|
||||
@ -413,6 +443,9 @@ where
|
||||
-> Option<RequestProtocol<TCodec>>
|
||||
{
|
||||
if let Some(connections) = self.connected.get(peer) {
|
||||
if connections.is_empty() {
|
||||
return Some(request)
|
||||
}
|
||||
let ix = (request.request_id.0 as usize) % connections.len();
|
||||
let conn = connections[ix].id;
|
||||
self.pending_responses.insert(request.request_id, (peer.clone(), conn));
|
||||
@ -441,6 +474,7 @@ where
|
||||
self.codec.clone(),
|
||||
self.config.connection_keep_alive,
|
||||
self.config.request_timeout,
|
||||
self.next_inbound_id.clone()
|
||||
)
|
||||
}
|
||||
|
||||
@ -480,27 +514,22 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Any pending responses of requests sent over this connection
|
||||
// must be considered failed.
|
||||
let failed = self.pending_responses.iter()
|
||||
.filter_map(|(r, (p, c))|
|
||||
if conn == c {
|
||||
Some((p.clone(), *r))
|
||||
} else {
|
||||
None
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let pending_events = &mut self.pending_events;
|
||||
|
||||
for (peer, request_id) in failed {
|
||||
self.pending_responses.remove(&request_id);
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||
// Any pending responses of requests sent over this connection must be considered failed.
|
||||
self.pending_responses.retain(|rid, (peer, cid)| {
|
||||
if conn != cid {
|
||||
return true
|
||||
}
|
||||
pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::OutboundFailure {
|
||||
peer,
|
||||
request_id,
|
||||
peer: peer.clone(),
|
||||
request_id: *rid,
|
||||
error: OutboundFailure::ConnectionClosed
|
||||
}
|
||||
));
|
||||
}
|
||||
false
|
||||
});
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, peer: &PeerId) {
|
||||
@ -541,12 +570,12 @@ where
|
||||
NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::Message { peer, message }));
|
||||
}
|
||||
RequestResponseHandlerEvent::Request { request, sender } => {
|
||||
let channel = ResponseChannel { peer: peer.clone(), sender };
|
||||
let message = RequestResponseMessage::Request { request, channel };
|
||||
self.pending_events.push_back(
|
||||
NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::Message { peer, message }));
|
||||
RequestResponseHandlerEvent::Request { request_id, request, sender } => {
|
||||
let channel = ResponseChannel { request_id, peer: peer.clone(), sender };
|
||||
let message = RequestResponseMessage::Request { request_id, request, channel };
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::Message { peer, message }
|
||||
));
|
||||
}
|
||||
RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
|
||||
if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) {
|
||||
@ -559,13 +588,14 @@ where
|
||||
}));
|
||||
}
|
||||
}
|
||||
RequestResponseHandlerEvent::InboundTimeout => {
|
||||
self.pending_events.push_back(
|
||||
NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::InboundFailure {
|
||||
peer,
|
||||
error: InboundFailure::Timeout,
|
||||
}));
|
||||
RequestResponseHandlerEvent::InboundTimeout(request_id) => {
|
||||
self.pending_events.push_back(
|
||||
NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::InboundFailure {
|
||||
peer,
|
||||
request_id,
|
||||
error: InboundFailure::Timeout,
|
||||
}));
|
||||
}
|
||||
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => {
|
||||
self.pending_events.push_back(
|
||||
@ -576,11 +606,12 @@ where
|
||||
error: OutboundFailure::UnsupportedProtocols,
|
||||
}));
|
||||
}
|
||||
RequestResponseHandlerEvent::InboundUnsupportedProtocols => {
|
||||
RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => {
|
||||
self.pending_events.push_back(
|
||||
NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::InboundFailure {
|
||||
peer,
|
||||
request_id,
|
||||
error: InboundFailure::UnsupportedProtocols,
|
||||
}));
|
||||
}
|
||||
|
Reference in New Issue
Block a user