mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-07-01 10:41:33 +00:00
[request-response] Refine success & error reporting for inbound requests. (#1867)
* Refine error reporting for inbound request handling. At the moment one can neither get confirmation when a response has been sent on the underlying transport, nor is one aware of response omissions. The latter was originally intended as a feature for support of one-way protocols, which seems like a bad idea in hindsight. The lack of notification for sent responses may prohibit implementation of some request-response protocols that need to ensure a happens-before relation between sending a response and a subsequent request, besides uses for collecting statistics. Even with these changes, there is no active notification for failed inbound requests as a result of connections unexpectedly closing, as is the case for outbound requests. Instead, for pending inbound requests this scenario can be identified if necessary by the absense of both `InboundFailure` and `ResponseSent` events for a particular previously received request. Interest in this situation is not expected to be common and would otherwise require explicitly tracking all inbound requests in the `RequestResponse` behaviour, which would be a pity. `RequestResponse::send_response` now also synchronously returns an error if the inbound upgrade handling the request has been aborted, due to timeout or closing of the connection, giving more options for graceful error handling for inbound requests. As an aside, the `Throttled` wrapper now no longer emits inbound or outbound error events occurring in the context of sending credit requests or responses. This is in addition to not emitting `ResponseSent` events for ACK responses of credit grants. * Update protocols/request-response/src/lib.rs Co-authored-by: Max Inden <mail@max-inden.de> * Address some minor clippy warnings. (#1868) * Track pending credit request IDs. In order to avoid emitting events relating to credit grants or acks on the public API. The public API should only emit events relating to the actual requests and responses sent by client code. * Small cleanup * Cleanup * Update versions and changelogs. * Unreleased Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@ -25,9 +25,11 @@
|
||||
|
||||
# Version 0.32.0 [unreleased]
|
||||
|
||||
- Update `libp2p-request-response`.
|
||||
|
||||
- Update to `libp2p-mdns-0.26`.
|
||||
|
||||
- Update `libp2p-websocket`.
|
||||
- Update `libp2p-websocket` minimum patch version.
|
||||
|
||||
# Version 0.31.2 [2020-12-02]
|
||||
|
||||
|
@ -72,7 +72,7 @@ libp2p-noise = { version = "0.27.0", path = "protocols/noise", optional = true }
|
||||
libp2p-ping = { version = "0.25.0", path = "protocols/ping", optional = true }
|
||||
libp2p-plaintext = { version = "0.25.0", path = "protocols/plaintext", optional = true }
|
||||
libp2p-pnet = { version = "0.19.2", path = "protocols/pnet", optional = true }
|
||||
libp2p-request-response = { version = "0.6.0", path = "protocols/request-response", optional = true }
|
||||
libp2p-request-response = { version = "0.7.0", path = "protocols/request-response", optional = true }
|
||||
libp2p-swarm = { version = "0.25.0", path = "swarm" }
|
||||
libp2p-uds = { version = "0.25.0", path = "transports/uds", optional = true }
|
||||
libp2p-wasm-ext = { version = "0.25.0", path = "transports/wasm-ext", optional = true }
|
||||
|
@ -1,3 +1,11 @@
|
||||
# 0.7.0 [unreleased]
|
||||
|
||||
- Refine emitted events for inbound requests, introducing
|
||||
the `ResponseSent` event and the `ResponseOmission`
|
||||
inbound failures. This effectively removes previous
|
||||
support for one-way protocols without responses.
|
||||
[PR 1867](https://github.com/libp2p/rust-libp2p/pull/1867).
|
||||
|
||||
# 0.6.0 [2020-11-25]
|
||||
|
||||
- Update `libp2p-swarm` and `libp2p-core`.
|
||||
|
@ -2,7 +2,7 @@
|
||||
name = "libp2p-request-response"
|
||||
edition = "2018"
|
||||
description = "Generic Request/Response Protocols"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
|
@ -119,22 +119,29 @@ pub enum RequestResponseHandlerEvent<TCodec>
|
||||
where
|
||||
TCodec: RequestResponseCodec
|
||||
{
|
||||
/// An inbound request.
|
||||
/// A request has been received.
|
||||
Request {
|
||||
request_id: RequestId,
|
||||
request: TCodec::Request,
|
||||
sender: oneshot::Sender<TCodec::Response>
|
||||
},
|
||||
/// An inbound response.
|
||||
/// A response has been received.
|
||||
Response {
|
||||
request_id: RequestId,
|
||||
response: TCodec::Response
|
||||
},
|
||||
/// An outbound upgrade (i.e. request) timed out.
|
||||
/// A response to an inbound request has been sent.
|
||||
ResponseSent(RequestId),
|
||||
/// A response to an inbound request was omitted as a result
|
||||
/// of dropping the response `sender` of an inbound `Request`.
|
||||
ResponseOmission(RequestId),
|
||||
/// An outbound request timed out while sending the request
|
||||
/// or waiting for the response.
|
||||
OutboundTimeout(RequestId),
|
||||
/// An outbound request failed to negotiate a mutually supported protocol.
|
||||
OutboundUnsupportedProtocols(RequestId),
|
||||
/// An inbound request timed out.
|
||||
/// An inbound request timed out while waiting for the request
|
||||
/// or sending the response.
|
||||
InboundTimeout(RequestId),
|
||||
/// An inbound request failed to negotiate a mutually supported protocol.
|
||||
InboundUnsupportedProtocols(RequestId),
|
||||
@ -187,9 +194,16 @@ where
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
(): (),
|
||||
_: RequestId
|
||||
sent: bool,
|
||||
request_id: RequestId
|
||||
) {
|
||||
if sent {
|
||||
self.pending_events.push_back(
|
||||
RequestResponseHandlerEvent::ResponseSent(request_id))
|
||||
} else {
|
||||
self.pending_events.push_back(
|
||||
RequestResponseHandlerEvent::ResponseOmission(request_id))
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
|
@ -93,7 +93,7 @@ impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
|
||||
where
|
||||
TCodec: RequestResponseCodec + Send + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
type Output = bool;
|
||||
type Error = io::Error;
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
@ -105,10 +105,12 @@ where
|
||||
if let Ok(response) = self.response_receiver.await {
|
||||
let write = self.codec.write_response(&protocol, &mut io, response);
|
||||
write.await?;
|
||||
} else {
|
||||
return Ok(false)
|
||||
}
|
||||
}
|
||||
io.close().await?;
|
||||
Ok(())
|
||||
Ok(true)
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
@ -46,18 +46,6 @@
|
||||
//! For that purpose, [`RequestResponseCodec::Protocol`] is typically
|
||||
//! instantiated with a sum type.
|
||||
//!
|
||||
//! ## One-Way Protocols
|
||||
//!
|
||||
//! The implementation supports one-way protocols that do not
|
||||
//! have responses. In these cases the [`RequestResponseCodec::Response`] can
|
||||
//! be defined as `()` and [`RequestResponseCodec::read_response`] as well as
|
||||
//! [`RequestResponseCodec::write_response`] given the obvious implementations.
|
||||
//! Note that `RequestResponseMessage::Response` will still be emitted,
|
||||
//! immediately after the request has been sent, since `RequestResponseCodec::read_response`
|
||||
//! will not actually read anything from the given I/O stream.
|
||||
//! [`RequestResponse::send_response`] need not be called for one-way protocols,
|
||||
//! i.e. the [`ResponseChannel`] may just be dropped.
|
||||
//!
|
||||
//! ## Limited Protocol Support
|
||||
//!
|
||||
//! It is possible to only support inbound or outbound requests for
|
||||
@ -115,9 +103,11 @@ pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TRespons
|
||||
request_id: RequestId,
|
||||
/// The request message.
|
||||
request: TRequest,
|
||||
/// The sender of the request who is awaiting a response.
|
||||
/// The channel waiting for the response.
|
||||
///
|
||||
/// See [`RequestResponse::send_response`].
|
||||
/// If this channel is dropped instead of being used to send a response
|
||||
/// via [`RequestResponse::send_response`], a [`RequestResponseEvent::InboundFailure`]
|
||||
/// with [`InboundFailure::ResponseOmission`] is emitted.
|
||||
channel: ResponseChannel<TChannelResponse>,
|
||||
},
|
||||
/// A response message.
|
||||
@ -151,6 +141,14 @@ pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse>
|
||||
error: OutboundFailure,
|
||||
},
|
||||
/// An inbound request failed.
|
||||
///
|
||||
/// > **Note**: The case whereby a connection on which a response is sent
|
||||
/// > closes after [`RequestResponse::send_response`] already succeeded
|
||||
/// > but before the response could be sent on the connection is reflected
|
||||
/// > by there being no [`RequestResponseEvent::ResponseSent`] event.
|
||||
/// > Code interested in ensuring a response has been successfully
|
||||
/// > handed to the transport layer, e.g. before continuing with the next
|
||||
/// > step in a multi-step protocol, should listen to these events.
|
||||
InboundFailure {
|
||||
/// The peer from whom the request was received.
|
||||
peer: PeerId,
|
||||
@ -159,6 +157,16 @@ pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse>
|
||||
/// The error that occurred.
|
||||
error: InboundFailure,
|
||||
},
|
||||
/// A response to an inbound request has been sent.
|
||||
///
|
||||
/// When this event is received, the response has been flushed on
|
||||
/// the underlying transport connection.
|
||||
ResponseSent {
|
||||
/// The peer to whom the response was sent.
|
||||
peer: PeerId,
|
||||
/// The ID of the inbound request whose response was sent.
|
||||
request_id: RequestId,
|
||||
},
|
||||
}
|
||||
|
||||
/// Possible failures occurring in the context of sending
|
||||
@ -186,14 +194,17 @@ pub enum OutboundFailure {
|
||||
#[derive(Debug)]
|
||||
pub enum InboundFailure {
|
||||
/// The inbound request timed out, either while reading the
|
||||
/// incoming request or before a response is sent, i.e. if
|
||||
/// incoming request or before a response is sent, e.g. if
|
||||
/// [`RequestResponse::send_response`] is not called in a
|
||||
/// timely manner.
|
||||
Timeout,
|
||||
/// The local peer supports none of the requested protocols.
|
||||
/// The local peer supports none of the protocols requested
|
||||
/// by the remote.
|
||||
UnsupportedProtocols,
|
||||
/// The connection closed before a response was delivered.
|
||||
ConnectionClosed,
|
||||
/// The local peer failed to respond to an inbound request
|
||||
/// due to the [`ResponseChannel`] being dropped instead of
|
||||
/// being passed to [`RequestResponse::send_response`].
|
||||
ResponseOmission,
|
||||
}
|
||||
|
||||
/// A channel for sending a response to an inbound request.
|
||||
@ -379,17 +390,18 @@ where
|
||||
|
||||
/// Initiates sending a response to an inbound request.
|
||||
///
|
||||
/// If the `ResponseChannel` is already closed due to a timeout,
|
||||
/// the response is discarded and eventually [`RequestResponseEvent::InboundFailure`]
|
||||
/// is emitted by `RequestResponse::poll`.
|
||||
/// If the `ResponseChannel` is already closed due to a timeout or
|
||||
/// the connection being closed, the response is returned as an `Err`
|
||||
/// for further handling. Once the response has been successfully sent
|
||||
/// on the corresponding connection, [`RequestResponseEvent::ResponseSent`]
|
||||
/// is emitted.
|
||||
///
|
||||
/// The provided `ResponseChannel` is obtained from a
|
||||
/// The provided `ResponseChannel` is obtained from an inbound
|
||||
/// [`RequestResponseMessage::Request`].
|
||||
pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response) {
|
||||
// Fails only if the inbound upgrade timed out waiting for the response,
|
||||
// in which case the handler emits `RequestResponseHandlerEvent::InboundTimeout`
|
||||
// which in turn results in `RequestResponseEvent::InboundFailure`.
|
||||
let _ = ch.sender.send(rs);
|
||||
pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response)
|
||||
-> Result<(), TCodec::Response>
|
||||
{
|
||||
ch.sender.send(rs)
|
||||
}
|
||||
|
||||
/// Adds a known address for a peer that can be used for
|
||||
@ -577,6 +589,20 @@ where
|
||||
RequestResponseEvent::Message { peer, message }
|
||||
));
|
||||
}
|
||||
RequestResponseHandlerEvent::ResponseSent(request_id) => {
|
||||
self.pending_events.push_back(
|
||||
NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::ResponseSent { peer, request_id }));
|
||||
}
|
||||
RequestResponseHandlerEvent::ResponseOmission(request_id) => {
|
||||
self.pending_events.push_back(
|
||||
NetworkBehaviourAction::GenerateEvent(
|
||||
RequestResponseEvent::InboundFailure {
|
||||
peer,
|
||||
request_id,
|
||||
error: InboundFailure::ResponseOmission
|
||||
}));
|
||||
}
|
||||
RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
|
||||
if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) {
|
||||
self.pending_events.push_back(
|
||||
|
@ -42,7 +42,7 @@ use futures::ready;
|
||||
use libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId};
|
||||
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||
use lru::LruCache;
|
||||
use std::{collections::{HashMap, VecDeque}, task::{Context, Poll}};
|
||||
use std::{collections::{HashMap, HashSet, VecDeque}, task::{Context, Poll}};
|
||||
use std::{cmp::max, num::NonZeroU16};
|
||||
use super::{
|
||||
ProtocolSupport,
|
||||
@ -75,21 +75,20 @@ where
|
||||
limit_overrides: HashMap<PeerId, Limit>,
|
||||
/// Pending events to report in `Throttled::poll`.
|
||||
events: VecDeque<Event<C::Request, C::Response, Message<C::Response>>>,
|
||||
/// Current outbound credit grants in flight.
|
||||
credit_messages: HashMap<PeerId, Credit>,
|
||||
/// The current credit ID.
|
||||
credit_id: u64
|
||||
next_grant_id: u64
|
||||
}
|
||||
|
||||
/// Credit information that is sent to remote peers.
|
||||
/// Information about a credit grant that is sent to remote peers.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct Credit {
|
||||
/// A credit ID. Used to deduplicate retransmitted credit messages.
|
||||
id: u64,
|
||||
struct Grant {
|
||||
/// The grant ID. Used to deduplicate retransmitted credit grants.
|
||||
id: GrantId,
|
||||
/// The ID of the outbound credit grant message.
|
||||
request: RequestId,
|
||||
/// The number of requests the remote is allowed to send.
|
||||
amount: u16
|
||||
/// The credit given in this grant, i.e. the number of additional
|
||||
/// requests the remote is allowed to send.
|
||||
credit: u16
|
||||
}
|
||||
|
||||
/// Max. number of inbound requests that can be received.
|
||||
@ -130,28 +129,81 @@ impl Limit {
|
||||
}
|
||||
}
|
||||
|
||||
type GrantId = u64;
|
||||
|
||||
/// Information related to the current send budget with a peer.
|
||||
#[derive(Clone, Debug)]
|
||||
struct SendBudget {
|
||||
/// The last received credit grant.
|
||||
grant: Option<GrantId>,
|
||||
/// The remaining credit for requests to send.
|
||||
remaining: u16,
|
||||
/// Credit grant requests received and acknowledged where the outcome
|
||||
/// of the acknowledgement (i.e. response sent) is still undetermined.
|
||||
/// Used to avoid emitting events for successful (`ResponseSent`) or failed
|
||||
/// acknowledgements.
|
||||
received: HashSet<RequestId>,
|
||||
}
|
||||
|
||||
/// Information related to the current receive budget with a peer.
|
||||
#[derive(Clone, Debug)]
|
||||
struct RecvBudget {
|
||||
/// The grant currently given to the remote but yet to be acknowledged.
|
||||
///
|
||||
/// Set to `Some` when a new grant is sent to the remote, followed
|
||||
/// by `None` when an acknowledgment or a request is received. The
|
||||
/// latter is seen as an implicit acknowledgement.
|
||||
grant: Option<Grant>,
|
||||
/// The limit for new credit grants when the `remaining` credit is
|
||||
/// exhausted.
|
||||
limit: Limit,
|
||||
/// The remaining credit for requests to receive.
|
||||
remaining: u16,
|
||||
/// Credit grants sent whose outcome is still undetermined.
|
||||
/// Used to avoid emitting events for failed credit grants.
|
||||
///
|
||||
/// > **Note**: While receiving an inbound request is an implicit
|
||||
/// > acknowledgement for the last sent `grant`, the outcome of
|
||||
/// > the outbound request remains undetermined until a success or
|
||||
/// > failure event is received for that request or the corresponding
|
||||
/// > connection closes.
|
||||
sent: HashSet<RequestId>,
|
||||
}
|
||||
|
||||
/// Budget information about a peer.
|
||||
#[derive(Clone, Debug)]
|
||||
struct PeerInfo {
|
||||
/// Limit that applies to this peer.
|
||||
limit: Limit,
|
||||
/// Remaining number of outbound requests that can be sent.
|
||||
send_budget: u16,
|
||||
/// Remaining number of inbound requests that can be received.
|
||||
recv_budget: u16,
|
||||
/// The ID of the credit message that granted the current `send_budget`.
|
||||
send_budget_id: Option<u64>
|
||||
send_budget: SendBudget,
|
||||
recv_budget: RecvBudget,
|
||||
}
|
||||
|
||||
impl PeerInfo {
|
||||
fn new(limit: Limit) -> Self {
|
||||
fn new(recv_limit: Limit) -> Self {
|
||||
PeerInfo {
|
||||
limit,
|
||||
send_budget: 1,
|
||||
recv_budget: 1,
|
||||
send_budget_id: None
|
||||
send_budget: SendBudget {
|
||||
grant: None,
|
||||
remaining: 1,
|
||||
received: HashSet::new(),
|
||||
},
|
||||
recv_budget: RecvBudget {
|
||||
grant: None,
|
||||
limit: recv_limit,
|
||||
remaining: 1,
|
||||
sent: HashSet::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn into_disconnected(mut self) -> Self {
|
||||
self.send_budget.received = HashSet::new();
|
||||
self.send_budget.remaining = 1;
|
||||
self.recv_budget.sent = HashSet::new();
|
||||
self.recv_budget.remaining = max(1, self.recv_budget.remaining);
|
||||
// Since we potentially reset the remaining receive budget,
|
||||
// we forget about the potentially still unacknowledged last grant.
|
||||
self.recv_budget.grant = None;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Throttled<C>
|
||||
@ -180,8 +232,7 @@ where
|
||||
default_limit: Limit::new(NonZeroU16::new(1).expect("1 > 0")),
|
||||
limit_overrides: HashMap::new(),
|
||||
events: VecDeque::new(),
|
||||
credit_messages: HashMap::new(),
|
||||
credit_id: 0
|
||||
next_grant_id: 0
|
||||
}
|
||||
}
|
||||
|
||||
@ -195,9 +246,9 @@ where
|
||||
pub fn override_receive_limit(&mut self, p: &PeerId, limit: NonZeroU16) {
|
||||
log::debug!("{:08x}: override limit for {}: {:?}", self.id, p, limit);
|
||||
if let Some(info) = self.peer_info.get_mut(p) {
|
||||
info.limit.set(limit)
|
||||
info.recv_budget.limit.set(limit)
|
||||
} else if let Some(info) = self.offline_peer_info.get_mut(p) {
|
||||
info.limit.set(limit)
|
||||
info.recv_budget.limit.set(limit)
|
||||
}
|
||||
self.limit_overrides.insert(p.clone(), Limit::new(limit));
|
||||
}
|
||||
@ -210,7 +261,7 @@ where
|
||||
|
||||
/// Has the limit of outbound requests been reached for the given peer?
|
||||
pub fn can_send(&mut self, p: &PeerId) -> bool {
|
||||
self.peer_info.get(p).map(|i| i.send_budget > 0).unwrap_or(true)
|
||||
self.peer_info.get(p).map(|i| i.send_budget.remaining > 0).unwrap_or(true)
|
||||
}
|
||||
|
||||
/// Send a request to a peer.
|
||||
@ -219,33 +270,32 @@ where
|
||||
/// returned. Sending more outbound requests should only be attempted
|
||||
/// once [`Event::ResumeSending`] has been received from [`NetworkBehaviour::poll`].
|
||||
pub fn send_request(&mut self, p: &PeerId, req: C::Request) -> Result<RequestId, C::Request> {
|
||||
let info =
|
||||
if let Some(info) = self.peer_info.get_mut(p) {
|
||||
info
|
||||
} else if let Some(info) = self.offline_peer_info.pop(p) {
|
||||
if info.recv_budget > 1 {
|
||||
self.send_credit(p, info.recv_budget - 1)
|
||||
let connected = &mut self.peer_info;
|
||||
let disconnected = &mut self.offline_peer_info;
|
||||
let remaining =
|
||||
if let Some(info) = connected.get_mut(p).or_else(|| disconnected.get_mut(p)) {
|
||||
if info.send_budget.remaining == 0 {
|
||||
log::trace!("{:08x}: no more budget to send another request to {}", self.id, p);
|
||||
return Err(req)
|
||||
}
|
||||
self.peer_info.entry(p.clone()).or_insert(info)
|
||||
info.send_budget.remaining -= 1;
|
||||
info.send_budget.remaining
|
||||
} else {
|
||||
let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
|
||||
self.peer_info.entry(p.clone()).or_insert(PeerInfo::new(limit))
|
||||
let mut info = PeerInfo::new(limit);
|
||||
info.send_budget.remaining -= 1;
|
||||
let remaining = info.send_budget.remaining;
|
||||
self.offline_peer_info.put(p.clone(), info);
|
||||
remaining
|
||||
};
|
||||
|
||||
if info.send_budget == 0 {
|
||||
log::trace!("{:08x}: no more budget to send another request to {}", self.id, p);
|
||||
return Err(req)
|
||||
}
|
||||
|
||||
info.send_budget -= 1;
|
||||
|
||||
let rid = self.behaviour.send_request(p, Message::request(req));
|
||||
|
||||
log::trace! { "{:08x}: sending request {} to {} (send budget = {})",
|
||||
log::trace! { "{:08x}: sending request {} to {} (budget remaining = {})",
|
||||
self.id,
|
||||
rid,
|
||||
p,
|
||||
info.send_budget + 1
|
||||
remaining
|
||||
};
|
||||
|
||||
Ok(rid)
|
||||
@ -254,16 +304,21 @@ where
|
||||
/// Answer an inbound request with a response.
|
||||
///
|
||||
/// See [`RequestResponse::send_response`] for details.
|
||||
pub fn send_response(&mut self, ch: ResponseChannel<Message<C::Response>>, res: C::Response) {
|
||||
pub fn send_response(&mut self, ch: ResponseChannel<Message<C::Response>>, res: C::Response)
|
||||
-> Result<(), C::Response>
|
||||
{
|
||||
log::trace!("{:08x}: sending response {} to peer {}", self.id, ch.request_id(), &ch.peer);
|
||||
if let Some(info) = self.peer_info.get_mut(&ch.peer) {
|
||||
if info.recv_budget == 0 { // need to send more credit to the remote peer
|
||||
let crd = info.limit.switch();
|
||||
info.recv_budget = info.limit.max_recv.get();
|
||||
self.send_credit(&ch.peer, crd)
|
||||
if info.recv_budget.remaining == 0 { // need to send more credit to the remote peer
|
||||
let crd = info.recv_budget.limit.switch();
|
||||
info.recv_budget.remaining = info.recv_budget.limit.max_recv.get();
|
||||
self.send_credit(&ch.peer, crd);
|
||||
}
|
||||
}
|
||||
self.behaviour.send_response(ch, Message::response(res))
|
||||
match self.behaviour.send_response(ch, Message::response(res)) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(m) => Err(m.into_parts().1.expect("Missing response data.")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a known peer address.
|
||||
@ -295,19 +350,16 @@ where
|
||||
}
|
||||
|
||||
/// Send a credit grant to the given peer.
|
||||
fn send_credit(&mut self, p: &PeerId, amount: u16) {
|
||||
let cid = self.next_credit_id();
|
||||
let rid = self.behaviour.send_request(p, Message::credit(amount, cid));
|
||||
log::trace!("{:08x}: sending {} as credit {} to {}", self.id, amount, cid, p);
|
||||
let credit = Credit { id: cid, request: rid, amount };
|
||||
self.credit_messages.insert(p.clone(), credit);
|
||||
}
|
||||
|
||||
/// Create a new credit message ID.
|
||||
fn next_credit_id(&mut self) -> u64 {
|
||||
let n = self.credit_id;
|
||||
self.credit_id += 1;
|
||||
n
|
||||
fn send_credit(&mut self, p: &PeerId, credit: u16) {
|
||||
if let Some(info) = self.peer_info.get_mut(p) {
|
||||
let cid = self.next_grant_id;
|
||||
self.next_grant_id += 1;
|
||||
let rid = self.behaviour.send_request(p, Message::credit(credit, cid));
|
||||
log::trace!("{:08x}: sending {} credit as grant {} to {}", self.id, credit, cid, p);
|
||||
let grant = Grant { id: cid, request: rid, credit };
|
||||
info.recv_budget.grant = Some(grant);
|
||||
info.recv_budget.sent.insert(rid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -346,15 +398,15 @@ where
|
||||
|
||||
fn inject_connection_closed(&mut self, peer: &PeerId, id: &ConnectionId, end: &ConnectedPoint) {
|
||||
self.behaviour.inject_connection_closed(peer, id, end);
|
||||
if self.is_connected(peer) {
|
||||
if let Some(credit) = self.credit_messages.get_mut(peer) {
|
||||
if let Some(info) = self.peer_info.get_mut(peer) {
|
||||
if let Some(grant) = &mut info.recv_budget.grant {
|
||||
log::debug! { "{:08x}: resending credit grant {} to {} after connection closed",
|
||||
self.id,
|
||||
credit.id,
|
||||
grant.id,
|
||||
peer
|
||||
};
|
||||
let msg = Message::credit(credit.amount, credit.id);
|
||||
credit.request = self.behaviour.send_request(peer, msg)
|
||||
let msg = Message::credit(grant.credit, grant.id);
|
||||
grant.request = self.behaviour.send_request(peer, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -364,28 +416,24 @@ where
|
||||
self.behaviour.inject_connected(p);
|
||||
// The limit may have been added by `Throttled::send_request` already.
|
||||
if !self.peer_info.contains_key(p) {
|
||||
let info =
|
||||
if let Some(info) = self.offline_peer_info.pop(p) {
|
||||
if info.recv_budget > 1 {
|
||||
self.send_credit(p, info.recv_budget - 1)
|
||||
}
|
||||
info
|
||||
} else {
|
||||
let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
|
||||
PeerInfo::new(limit)
|
||||
};
|
||||
self.peer_info.insert(p.clone(), info);
|
||||
if let Some(info) = self.offline_peer_info.pop(p) {
|
||||
let recv_budget = info.recv_budget.remaining;
|
||||
self.peer_info.insert(p.clone(), info);
|
||||
if recv_budget > 1 {
|
||||
self.send_credit(p, recv_budget - 1);
|
||||
}
|
||||
} else {
|
||||
let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit);
|
||||
self.peer_info.insert(p.clone(), PeerInfo::new(limit));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_disconnected(&mut self, p: &PeerId) {
|
||||
log::trace!("{:08x}: disconnected from {}", self.id, p);
|
||||
if let Some(mut info) = self.peer_info.remove(p) {
|
||||
info.send_budget = 1;
|
||||
info.recv_budget = max(1, info.recv_budget);
|
||||
self.offline_peer_info.put(p.clone(), info);
|
||||
if let Some(info) = self.peer_info.remove(p) {
|
||||
self.offline_peer_info.put(p.clone(), info.into_disconnected());
|
||||
}
|
||||
self.credit_messages.remove(p);
|
||||
self.behaviour.inject_disconnected(p)
|
||||
}
|
||||
|
||||
@ -413,11 +461,14 @@ where
|
||||
| RequestResponseMessage::Response { request_id, response } =>
|
||||
match &response.header().typ {
|
||||
| Some(Type::Ack) => {
|
||||
if let Some(id) = self.credit_messages.get(&peer).map(|c| c.id) {
|
||||
if Some(id) == response.header().ident {
|
||||
log::trace!("{:08x}: received ack {} from {}", self.id, id, peer);
|
||||
self.credit_messages.remove(&peer);
|
||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
||||
if let Some(id) = info.recv_budget.grant.as_ref().map(|c| c.id) {
|
||||
if Some(id) == response.header().ident {
|
||||
log::trace!("{:08x}: received ack {} from {}", self.id, id, peer);
|
||||
info.recv_budget.grant = None;
|
||||
}
|
||||
}
|
||||
info.recv_budget.sent.remove(&request_id);
|
||||
}
|
||||
continue
|
||||
}
|
||||
@ -464,15 +515,23 @@ where
|
||||
id,
|
||||
peer
|
||||
};
|
||||
if info.send_budget_id < Some(id) {
|
||||
if info.send_budget == 0 && credit > 0 {
|
||||
if info.send_budget.grant < Some(id) {
|
||||
if info.send_budget.remaining == 0 && credit > 0 {
|
||||
log::trace!("{:08x}: sending to peer {} can resume", self.id, peer);
|
||||
self.events.push_back(Event::ResumeSending(peer.clone()))
|
||||
}
|
||||
info.send_budget += credit;
|
||||
info.send_budget_id = Some(id)
|
||||
info.send_budget.remaining += credit;
|
||||
info.send_budget.grant = Some(id);
|
||||
}
|
||||
match self.behaviour.send_response(channel, Message::ack(id)) {
|
||||
Err(_) => log::debug! {
|
||||
"{:08x}: Failed to send ack for credit grant {}.",
|
||||
self.id, id
|
||||
},
|
||||
Ok(()) => {
|
||||
info.send_budget.received.insert(request_id);
|
||||
}
|
||||
}
|
||||
self.behaviour.send_response(channel, Message::ack(id))
|
||||
}
|
||||
continue
|
||||
}
|
||||
@ -481,18 +540,18 @@ where
|
||||
log::trace! { "{:08x}: received request {} (recv. budget = {})",
|
||||
self.id,
|
||||
request_id,
|
||||
info.recv_budget
|
||||
info.recv_budget.remaining
|
||||
};
|
||||
if info.recv_budget == 0 {
|
||||
if info.recv_budget.remaining == 0 {
|
||||
log::debug!("{:08x}: peer {} exceeds its budget", self.id, peer);
|
||||
self.events.push_back(Event::TooManyInboundRequests(peer.clone()));
|
||||
continue
|
||||
}
|
||||
info.recv_budget -= 1;
|
||||
info.recv_budget.remaining -= 1;
|
||||
// We consider a request as proof that our credit grant has
|
||||
// reached the peer. Usually, an ACK has already been
|
||||
// received.
|
||||
self.credit_messages.remove(&peer);
|
||||
info.recv_budget.grant = None;
|
||||
}
|
||||
if let Some(rq) = request.into_parts().1 {
|
||||
RequestResponseMessage::Request { request_id, request: rq, channel }
|
||||
@ -524,16 +583,25 @@ where
|
||||
request_id,
|
||||
error
|
||||
}) => {
|
||||
if let Some(credit) = self.credit_messages.get_mut(&peer) {
|
||||
if credit.request == request_id {
|
||||
log::debug! { "{:08x}: failed to send {} as credit {} to {}; retrying...",
|
||||
self.id,
|
||||
credit.amount,
|
||||
credit.id,
|
||||
peer
|
||||
};
|
||||
let msg = Message::credit(credit.amount, credit.id);
|
||||
credit.request = self.behaviour.send_request(&peer, msg)
|
||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
||||
if let Some(grant) = info.recv_budget.grant.as_mut() {
|
||||
if grant.request == request_id {
|
||||
log::debug! {
|
||||
"{:08x}: failed to send {} as credit {} to {}; retrying...",
|
||||
self.id,
|
||||
grant.credit,
|
||||
grant.id,
|
||||
peer
|
||||
};
|
||||
let msg = Message::credit(grant.credit, grant.id);
|
||||
grant.request = self.behaviour.send_request(&peer, msg);
|
||||
}
|
||||
}
|
||||
|
||||
// If the outbound failure was for a credit message, don't report it on
|
||||
// the public API and retry the sending.
|
||||
if info.recv_budget.sent.remove(&request_id) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
let event = RequestResponseEvent::OutboundFailure { peer, request_id, error };
|
||||
@ -544,9 +612,39 @@ where
|
||||
request_id,
|
||||
error
|
||||
}) => {
|
||||
// If the inbound failure occurred in the context of responding to a
|
||||
// credit grant, don't report it on the public API.
|
||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
||||
if info.send_budget.received.remove(&request_id) {
|
||||
log::debug! {
|
||||
"{:08}: failed to acknowledge credit grant from {}: {:?}",
|
||||
self.id, peer, error
|
||||
};
|
||||
continue
|
||||
}
|
||||
}
|
||||
let event = RequestResponseEvent::InboundFailure { peer, request_id, error };
|
||||
NetworkBehaviourAction::GenerateEvent(Event::Event(event))
|
||||
}
|
||||
| NetworkBehaviourAction::GenerateEvent(RequestResponseEvent::ResponseSent {
|
||||
peer,
|
||||
request_id
|
||||
}) => {
|
||||
// If this event is for an ACK response that was sent for
|
||||
// the last received credit grant, skip it.
|
||||
if let Some(info) = self.peer_info.get_mut(&peer) {
|
||||
if info.send_budget.received.remove(&request_id) {
|
||||
log::trace! {
|
||||
"{:08}: successfully sent ACK for credit grant {:?}.",
|
||||
self.id,
|
||||
info.send_budget.grant,
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
NetworkBehaviourAction::GenerateEvent(Event::Event(
|
||||
RequestResponseEvent::ResponseSent { peer, request_id }))
|
||||
}
|
||||
| NetworkBehaviourAction::DialAddress { address } =>
|
||||
NetworkBehaviourAction::DialAddress { address },
|
||||
| NetworkBehaviourAction::DialPeer { peer_id, condition } =>
|
||||
|
@ -77,8 +77,13 @@ fn ping_protocol() {
|
||||
} => {
|
||||
assert_eq!(&request, &expected_ping);
|
||||
assert_eq!(&peer, &peer2_id);
|
||||
swarm1.send_response(channel, pong.clone());
|
||||
swarm1.send_response(channel, pong.clone()).unwrap();
|
||||
},
|
||||
RequestResponseEvent::ResponseSent {
|
||||
peer, ..
|
||||
} => {
|
||||
assert_eq!(&peer, &peer2_id);
|
||||
}
|
||||
e => panic!("Peer1: Unexpected event: {:?}", e)
|
||||
}
|
||||
}
|
||||
@ -159,8 +164,13 @@ fn ping_protocol_throttled() {
|
||||
}) => {
|
||||
assert_eq!(&request, &expected_ping);
|
||||
assert_eq!(&peer, &peer2_id);
|
||||
swarm1.send_response(channel, pong.clone());
|
||||
swarm1.send_response(channel, pong.clone()).unwrap();
|
||||
},
|
||||
throttled::Event::Event(RequestResponseEvent::ResponseSent {
|
||||
peer, ..
|
||||
}) => {
|
||||
assert_eq!(&peer, &peer2_id);
|
||||
}
|
||||
e => panic!("Peer1: Unexpected event: {:?}", e)
|
||||
}
|
||||
if i % 31 == 0 {
|
||||
|
Reference in New Issue
Block a user