diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index f962088e..21acc15c 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,7 +1,7 @@ # 0.3.0 [unreleased] - Add support for opt-in request-based flow-control to any - reqyest-response protocol via `RequestResponse::throttled()`. + request-response protocol via `RequestResponse::throttled()`. [PR 1726](https://github.com/libp2p/rust-libp2p/pull/1726). - Update `libp2p-swarm` and `libp2p-core`. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index ed04eb1e..5660a6f7 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -16,6 +16,7 @@ futures = "0.3.1" libp2p-core = { version = "0.22.0", path = "../../core" } libp2p-swarm = { version = "0.22.0", path = "../../swarm" } log = "0.4.11" +lru = "0.6" minicbor = { version = "0.5", features = ["std", "derive"] } rand = "0.7" smallvec = "1.4" diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs index 1214ce4b..3b67328f 100644 --- a/protocols/request-response/src/throttled.rs +++ b/protocols/request-response/src/throttled.rs @@ -41,8 +41,9 @@ use crate::handler::{RequestProtocol, RequestResponseHandler, RequestResponseHan use futures::ready; use libp2p_core::{ConnectedPoint, connection::ConnectionId, Multiaddr, PeerId}; use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use lru::LruCache; use std::{collections::{HashMap, VecDeque}, task::{Context, Poll}}; -use std::num::NonZeroU16; +use std::{cmp::max, num::NonZeroU16}; use super::{ ProtocolSupport, RequestId, @@ -66,6 +67,8 @@ where behaviour: RequestResponse>, /// Information per peer. peer_info: HashMap, + /// Information about previously connected peers. + offline_peer_info: LruCache, /// The default limit applies to all peers unless overriden. default_limit: Limit, /// Permanent limit overrides per peer. @@ -173,6 +176,7 @@ where id: rand::random(), behaviour, peer_info: HashMap::new(), + offline_peer_info: LruCache::new(8192), default_limit: Limit::new(NonZeroU16::new(1).expect("1 > 0")), limit_overrides: HashMap::new(), events: VecDeque::new(), @@ -192,6 +196,8 @@ where log::debug!("{:08x}: override limit for {}: {:?}", self.id, p, limit); if let Some(info) = self.peer_info.get_mut(p) { info.limit.set(limit) + } else if let Some(info) = self.offline_peer_info.get_mut(p) { + info.limit.set(limit) } self.limit_overrides.insert(p.clone(), Limit::new(limit)); } @@ -216,6 +222,11 @@ where let info = if let Some(info) = self.peer_info.get_mut(p) { info + } else if let Some(info) = self.offline_peer_info.pop(p) { + if info.recv_budget > 1 { + self.send_credit(p, info.recv_budget - 1) + } + self.peer_info.entry(p.clone()).or_insert(info) } else { let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); self.peer_info.entry(p.clone()).or_insert(PeerInfo::new(limit)) @@ -249,11 +260,7 @@ where if info.recv_budget == 0 { // need to send more credit to the remote peer let crd = info.limit.switch(); info.recv_budget = info.limit.max_recv.get(); - let cid = self.next_credit_id(); - let rid = self.behaviour.send_request(&ch.peer, Message::credit(crd, cid)); - log::trace!("{:08x}: sending {} as credit {} to {}", self.id, crd, cid, ch.peer); - let credit = Credit { id: cid, request: rid, amount: crd }; - self.credit_messages.insert(ch.peer.clone(), credit); + self.send_credit(&ch.peer, crd) } } self.behaviour.send_response(ch, Message::response(res)) @@ -287,6 +294,15 @@ where self.behaviour.is_pending_outbound(p) } + /// Send a credit grant to the given peer. + fn send_credit(&mut self, p: &PeerId, amount: u16) { + let cid = self.next_credit_id(); + let rid = self.behaviour.send_request(p, Message::credit(amount, cid)); + log::trace!("{:08x}: sending {} as credit {} to {}", self.id, amount, cid, p); + let credit = Credit { id: cid, request: rid, amount }; + self.credit_messages.insert(p.clone(), credit); + } + /// Create a new credit message ID. fn next_credit_id(&mut self) -> u64 { let n = self.credit_id; @@ -348,14 +364,27 @@ where self.behaviour.inject_connected(p); // The limit may have been added by `Throttled::send_request` already. if !self.peer_info.contains_key(p) { - let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); - self.peer_info.insert(p.clone(), PeerInfo::new(limit)); + let info = + if let Some(info) = self.offline_peer_info.pop(p) { + if info.recv_budget > 1 { + self.send_credit(p, info.recv_budget - 1) + } + info + } else { + let limit = self.limit_overrides.get(p).copied().unwrap_or(self.default_limit); + PeerInfo::new(limit) + }; + self.peer_info.insert(p.clone(), info); } } fn inject_disconnected(&mut self, p: &PeerId) { log::trace!("{:08x}: disconnected from {}", self.id, p); - self.peer_info.remove(p); + if let Some(mut info) = self.peer_info.remove(p) { + info.send_budget = 1; + info.recv_budget = max(1, info.recv_budget); + self.offline_peer_info.put(p.clone(), info); + } self.credit_messages.remove(p); self.behaviour.inject_disconnected(p) } diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 6735fd2e..3125a917 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -151,7 +151,7 @@ fn ping_protocol_throttled() { let l = Swarm::listeners(&swarm1).next().unwrap(); tx.send(l.clone()).await.unwrap(); - for i in 1.. { + for i in 1 .. { match swarm1.next().await { throttled::Event::Event(RequestResponseEvent::Message { peer,