protocols/kad: Limit # of inbound substreams to 32 (#2699)

* protocols/kad/: Split into outbound and inbound substreams

* protocols/kad: Limit # of inbound substreams to 32

A remote node may still send more than 32 requests in parallel by using more
than one connection or by sending more than one request per stream.

* protocols/kad: Favor new substreams over old ones waiting for reuse

When a new inbound substream comes in and the limit of total inbound substreams
is hit, try to find an old inbound substream waiting to be reused. In such case,
replace the old with the new. In case no such old substream exists, drop the new
one.
This commit is contained in:
Max Inden
2022-06-09 15:12:03 +02:00
committed by GitHub
parent 0d3787ed04
commit 5cb4886ab2
3 changed files with 334 additions and 171 deletions

View File

@ -1,3 +1,9 @@
# 0.37.1 - unreleased
- Limit # of inbound streams to 32. [See PR 2699].
[PR 2699]: https://github.com/libp2p/rust-libp2p/pull/2699
# 0.37.0 # 0.37.0
- Update to `libp2p-core` `v0.33.0`. - Update to `libp2p-core` `v0.33.0`.

View File

@ -3,7 +3,7 @@ name = "libp2p-kad"
edition = "2021" edition = "2021"
rust-version = "1.56.1" rust-version = "1.56.1"
description = "Kademlia protocol for libp2p" description = "Kademlia protocol for libp2p"
version = "0.37.0" version = "0.37.1"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -39,6 +39,8 @@ use std::{
error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration, error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration,
}; };
const MAX_NUM_INBOUND_SUBSTREAMS: usize = 32;
/// A prototype from which [`KademliaHandler`]s can be constructed. /// A prototype from which [`KademliaHandler`]s can be constructed.
pub struct KademliaHandlerProto<T> { pub struct KademliaHandlerProto<T> {
config: KademliaHandlerConfig, config: KademliaHandlerConfig,
@ -57,8 +59,8 @@ impl<T> KademliaHandlerProto<T> {
impl<T: Clone + fmt::Debug + Send + 'static> IntoConnectionHandler for KademliaHandlerProto<T> { impl<T: Clone + fmt::Debug + Send + 'static> IntoConnectionHandler for KademliaHandlerProto<T> {
type Handler = KademliaHandler<T>; type Handler = KademliaHandler<T>;
fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler { fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
KademliaHandler::new(self.config, endpoint.clone()) KademliaHandler::new(self.config, endpoint.clone(), *remote_peer_id)
} }
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol { fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
@ -84,8 +86,11 @@ pub struct KademliaHandler<TUserData> {
/// Next unique ID of a connection. /// Next unique ID of a connection.
next_connec_unique_id: UniqueConnecId, next_connec_unique_id: UniqueConnecId,
/// List of active substreams with the state they are in. /// List of active outbound substreams with the state they are in.
substreams: Vec<SubstreamState<TUserData>>, outbound_substreams: Vec<OutboundSubstreamState<TUserData>>,
/// List of active inbound substreams with the state they are in.
inbound_substreams: Vec<InboundSubstreamState>,
/// Until when to keep the connection alive. /// Until when to keep the connection alive.
keep_alive: KeepAlive, keep_alive: KeepAlive,
@ -94,6 +99,9 @@ pub struct KademliaHandler<TUserData> {
/// is associated with. /// is associated with.
endpoint: ConnectedPoint, endpoint: ConnectedPoint,
/// The [`PeerId`] of the remote.
remote_peer_id: PeerId,
/// The current state of protocol confirmation. /// The current state of protocol confirmation.
protocol_status: ProtocolStatus, protocol_status: ProtocolStatus,
} }
@ -125,65 +133,86 @@ pub struct KademliaHandlerConfig {
pub idle_timeout: Duration, pub idle_timeout: Duration,
} }
/// State of an active substream, opened either by us or by the remote. /// State of an active outbound substream.
enum SubstreamState<TUserData> { enum OutboundSubstreamState<TUserData> {
/// We haven't started opening the outgoing substream yet. /// We haven't started opening the outgoing substream yet.
/// Contains the request we want to send, and the user data if we expect an answer. /// Contains the request we want to send, and the user data if we expect an answer.
OutPendingOpen(KadRequestMsg, Option<TUserData>), PendingOpen(KadRequestMsg, Option<TUserData>),
/// Waiting to send a message to the remote. /// Waiting to send a message to the remote.
OutPendingSend( PendingSend(
KadOutStreamSink<NegotiatedSubstream>, KadOutStreamSink<NegotiatedSubstream>,
KadRequestMsg, KadRequestMsg,
Option<TUserData>, Option<TUserData>,
), ),
/// Waiting to flush the substream so that the data arrives to the remote. /// Waiting to flush the substream so that the data arrives to the remote.
OutPendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>), PendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
/// Waiting for an answer back from the remote. /// Waiting for an answer back from the remote.
// TODO: add timeout // TODO: add timeout
OutWaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData), WaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
/// An error happened on the substream and we should report the error to the user. /// An error happened on the substream and we should report the error to the user.
OutReportError(KademliaHandlerQueryErr, TUserData), ReportError(KademliaHandlerQueryErr, TUserData),
/// The substream is being closed. /// The substream is being closed.
OutClosing(KadOutStreamSink<NegotiatedSubstream>), Closing(KadOutStreamSink<NegotiatedSubstream>),
}
/// State of an active inbound substream.
enum InboundSubstreamState {
/// Waiting for a request from the remote. /// Waiting for a request from the remote.
InWaitingMessage(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>), WaitingMessage {
/// Waiting for the user to send a `KademliaHandlerIn` event containing the response. /// Whether it is the first message to be awaited on this stream.
InWaitingUser(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>), first: bool,
connection_id: UniqueConnecId,
substream: KadInStreamSink<NegotiatedSubstream>,
},
/// Waiting for the user to send a [`KademliaHandlerIn`] event containing the response.
WaitingUser(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
/// Waiting to send an answer back to the remote. /// Waiting to send an answer back to the remote.
InPendingSend( PendingSend(
UniqueConnecId, UniqueConnecId,
KadInStreamSink<NegotiatedSubstream>, KadInStreamSink<NegotiatedSubstream>,
KadResponseMsg, KadResponseMsg,
), ),
/// Waiting to flush an answer back to the remote. /// Waiting to flush an answer back to the remote.
InPendingFlush(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>), PendingFlush(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
/// The substream is being closed. /// The substream is being closed.
InClosing(KadInStreamSink<NegotiatedSubstream>), Closing(KadInStreamSink<NegotiatedSubstream>),
} }
impl<TUserData> SubstreamState<TUserData> { impl<TUserData> OutboundSubstreamState<TUserData> {
/// Tries to close the substream. /// Tries to close the substream.
/// ///
/// If the substream is not ready to be closed, returns it back. /// If the substream is not ready to be closed, returns it back.
fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self { match self {
SubstreamState::OutPendingOpen(_, _) | SubstreamState::OutReportError(_, _) => { OutboundSubstreamState::PendingOpen(_, _)
Poll::Ready(()) | OutboundSubstreamState::ReportError(_, _) => Poll::Ready(()),
} OutboundSubstreamState::PendingSend(ref mut stream, _, _)
SubstreamState::OutPendingSend(ref mut stream, _, _) | OutboundSubstreamState::PendingFlush(ref mut stream, _)
| SubstreamState::OutPendingFlush(ref mut stream, _) | OutboundSubstreamState::WaitingAnswer(ref mut stream, _)
| SubstreamState::OutWaitingAnswer(ref mut stream, _) | OutboundSubstreamState::Closing(ref mut stream) => {
| SubstreamState::OutClosing(ref mut stream) => {
match Sink::poll_close(Pin::new(stream), cx) { match Sink::poll_close(Pin::new(stream), cx) {
Poll::Ready(_) => Poll::Ready(()), Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} }
} }
SubstreamState::InWaitingMessage(_, ref mut stream) }
| SubstreamState::InWaitingUser(_, ref mut stream) }
| SubstreamState::InPendingSend(_, ref mut stream, _) }
| SubstreamState::InPendingFlush(_, ref mut stream)
| SubstreamState::InClosing(ref mut stream) => { impl InboundSubstreamState {
/// Tries to close the substream.
///
/// If the substream is not ready to be closed, returns it back.
fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self {
InboundSubstreamState::WaitingMessage {
substream: ref mut stream,
..
}
| InboundSubstreamState::WaitingUser(_, ref mut stream)
| InboundSubstreamState::PendingSend(_, ref mut stream, _)
| InboundSubstreamState::PendingFlush(_, ref mut stream)
| InboundSubstreamState::Closing(ref mut stream) => {
match Sink::poll_close(Pin::new(stream), cx) { match Sink::poll_close(Pin::new(stream), cx) {
Poll::Ready(_) => Poll::Ready(()), Poll::Ready(_) => Poll::Ready(()),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
@ -452,14 +481,20 @@ struct UniqueConnecId(u64);
impl<TUserData> KademliaHandler<TUserData> { impl<TUserData> KademliaHandler<TUserData> {
/// Create a [`KademliaHandler`] using the given configuration. /// Create a [`KademliaHandler`] using the given configuration.
pub fn new(config: KademliaHandlerConfig, endpoint: ConnectedPoint) -> Self { pub fn new(
config: KademliaHandlerConfig,
endpoint: ConnectedPoint,
remote_peer_id: PeerId,
) -> Self {
let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout);
KademliaHandler { KademliaHandler {
config, config,
endpoint, endpoint,
remote_peer_id,
next_connec_unique_id: UniqueConnecId(0), next_connec_unique_id: UniqueConnecId(0),
substreams: Vec::new(), inbound_substreams: Default::default(),
outbound_substreams: Default::default(),
keep_alive, keep_alive,
protocol_status: ProtocolStatus::Unconfirmed, protocol_status: ProtocolStatus::Unconfirmed,
} }
@ -493,8 +528,10 @@ where
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
(msg, user_data): Self::OutboundOpenInfo, (msg, user_data): Self::OutboundOpenInfo,
) { ) {
self.substreams self.outbound_substreams
.push(SubstreamState::OutPendingSend(protocol, msg, user_data)); .push(OutboundSubstreamState::PendingSend(
protocol, msg, user_data,
));
if let ProtocolStatus::Unconfirmed = self.protocol_status { if let ProtocolStatus::Unconfirmed = self.protocol_status {
// Upon the first successfully negotiated substream, we know that the // Upon the first successfully negotiated substream, we know that the
// remote is configured with the same protocol name and we want // remote is configured with the same protocol name and we want
@ -515,84 +552,126 @@ where
EitherOutput::Second(p) => void::unreachable(p), EitherOutput::Second(p) => void::unreachable(p),
}; };
debug_assert!(self.config.allow_listening);
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.substreams
.push(SubstreamState::InWaitingMessage(connec_unique_id, protocol));
if let ProtocolStatus::Unconfirmed = self.protocol_status { if let ProtocolStatus::Unconfirmed = self.protocol_status {
// Upon the first successfully negotiated substream, we know that the // Upon the first successfully negotiated substream, we know that the
// remote is configured with the same protocol name and we want // remote is configured with the same protocol name and we want
// the behaviour to add this peer to the routing table, if possible. // the behaviour to add this peer to the routing table, if possible.
self.protocol_status = ProtocolStatus::Confirmed; self.protocol_status = ProtocolStatus::Confirmed;
} }
if self.inbound_substreams.len() == MAX_NUM_INBOUND_SUBSTREAMS {
if let Some(position) = self.inbound_substreams.iter().position(|s| {
matches!(
s,
// An inbound substream waiting to be reused.
InboundSubstreamState::WaitingMessage { first: false, .. }
)
}) {
self.inbound_substreams.remove(position);
log::warn!(
"New inbound substream to {:?} exceeds inbound substream limit. \
Removed older substream waiting to be reused.",
self.remote_peer_id,
)
} else {
log::warn!(
"New inbound substream to {:?} exceeds inbound substream limit. \
No older substream waiting to be reused. Dropping new substream.",
self.remote_peer_id,
);
return;
}
}
debug_assert!(self.config.allow_listening);
let connec_unique_id = self.next_connec_unique_id;
self.next_connec_unique_id.0 += 1;
self.inbound_substreams
.push(InboundSubstreamState::WaitingMessage {
first: true,
connection_id: connec_unique_id,
substream: protocol,
});
} }
fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) { fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) {
match message { match message {
KademliaHandlerIn::Reset(request_id) => { KademliaHandlerIn::Reset(request_id) => {
let pos = self.substreams.iter().position(|state| match state { let pos = self
SubstreamState::InWaitingUser(conn_id, _) => { .inbound_substreams
conn_id == &request_id.connec_unique_id .iter()
} .position(|state| match state {
_ => false, InboundSubstreamState::WaitingUser(conn_id, _) => {
}); conn_id == &request_id.connec_unique_id
}
_ => false,
});
if let Some(pos) = pos { if let Some(pos) = pos {
// TODO: we don't properly close down the substream // TODO: we don't properly close down the substream
let waker = futures::task::noop_waker(); let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker); let mut cx = Context::from_waker(&waker);
let _ = self.substreams.remove(pos).try_close(&mut cx); let _ = self.inbound_substreams.remove(pos).try_close(&mut cx);
} }
} }
KademliaHandlerIn::FindNodeReq { key, user_data } => { KademliaHandlerIn::FindNodeReq { key, user_data } => {
let msg = KadRequestMsg::FindNode { key }; let msg = KadRequestMsg::FindNode { key };
self.substreams self.outbound_substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data))); .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data)));
} }
KademliaHandlerIn::FindNodeRes { KademliaHandlerIn::FindNodeRes {
closer_peers, closer_peers,
request_id, request_id,
} => { } => {
let pos = self.substreams.iter().position(|state| match state { let pos = self
SubstreamState::InWaitingUser(ref conn_id, _) => { .inbound_substreams
conn_id == &request_id.connec_unique_id .iter()
} .position(|state| match state {
_ => false, InboundSubstreamState::WaitingUser(ref conn_id, _) => {
}); conn_id == &request_id.connec_unique_id
}
_ => false,
});
if let Some(pos) = pos { if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) { let (conn_id, substream) = match self.inbound_substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), InboundSubstreamState::WaitingUser(conn_id, substream) => {
(conn_id, substream)
}
_ => unreachable!(), _ => unreachable!(),
}; };
let msg = KadResponseMsg::FindNode { closer_peers }; let msg = KadResponseMsg::FindNode { closer_peers };
self.substreams self.inbound_substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg)); .push(InboundSubstreamState::PendingSend(conn_id, substream, msg));
} }
} }
KademliaHandlerIn::GetProvidersReq { key, user_data } => { KademliaHandlerIn::GetProvidersReq { key, user_data } => {
let msg = KadRequestMsg::GetProviders { key }; let msg = KadRequestMsg::GetProviders { key };
self.substreams self.outbound_substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data))); .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data)));
} }
KademliaHandlerIn::GetProvidersRes { KademliaHandlerIn::GetProvidersRes {
closer_peers, closer_peers,
provider_peers, provider_peers,
request_id, request_id,
} => { } => {
let pos = self.substreams.iter().position(|state| match state { let pos = self
SubstreamState::InWaitingUser(ref conn_id, _) .inbound_substreams
if conn_id == &request_id.connec_unique_id => .iter()
{ .position(|state| match state {
true InboundSubstreamState::WaitingUser(ref conn_id, _)
} if conn_id == &request_id.connec_unique_id =>
_ => false, {
}); true
}
_ => false,
});
if let Some(pos) = pos { if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) { let (conn_id, substream) = match self.inbound_substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), InboundSubstreamState::WaitingUser(conn_id, substream) => {
(conn_id, substream)
}
_ => unreachable!(), _ => unreachable!(),
}; };
@ -600,40 +679,45 @@ where
closer_peers, closer_peers,
provider_peers, provider_peers,
}; };
self.substreams self.inbound_substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg)); .push(InboundSubstreamState::PendingSend(conn_id, substream, msg));
} }
} }
KademliaHandlerIn::AddProvider { key, provider } => { KademliaHandlerIn::AddProvider { key, provider } => {
let msg = KadRequestMsg::AddProvider { key, provider }; let msg = KadRequestMsg::AddProvider { key, provider };
self.substreams self.outbound_substreams
.push(SubstreamState::OutPendingOpen(msg, None)); .push(OutboundSubstreamState::PendingOpen(msg, None));
} }
KademliaHandlerIn::GetRecord { key, user_data } => { KademliaHandlerIn::GetRecord { key, user_data } => {
let msg = KadRequestMsg::GetValue { key }; let msg = KadRequestMsg::GetValue { key };
self.substreams self.outbound_substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data))); .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data)));
} }
KademliaHandlerIn::PutRecord { record, user_data } => { KademliaHandlerIn::PutRecord { record, user_data } => {
let msg = KadRequestMsg::PutValue { record }; let msg = KadRequestMsg::PutValue { record };
self.substreams self.outbound_substreams
.push(SubstreamState::OutPendingOpen(msg, Some(user_data))); .push(OutboundSubstreamState::PendingOpen(msg, Some(user_data)));
} }
KademliaHandlerIn::GetRecordRes { KademliaHandlerIn::GetRecordRes {
record, record,
closer_peers, closer_peers,
request_id, request_id,
} => { } => {
let pos = self.substreams.iter().position(|state| match state { let pos = self
SubstreamState::InWaitingUser(ref conn_id, _) => { .inbound_substreams
conn_id == &request_id.connec_unique_id .iter()
} .position(|state| match state {
_ => false, InboundSubstreamState::WaitingUser(ref conn_id, _) => {
}); conn_id == &request_id.connec_unique_id
}
_ => false,
});
if let Some(pos) = pos { if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) { let (conn_id, substream) = match self.inbound_substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), InboundSubstreamState::WaitingUser(conn_id, substream) => {
(conn_id, substream)
}
_ => unreachable!(), _ => unreachable!(),
}; };
@ -641,8 +725,8 @@ where
record, record,
closer_peers, closer_peers,
}; };
self.substreams self.inbound_substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg)); .push(InboundSubstreamState::PendingSend(conn_id, substream, msg));
} }
} }
KademliaHandlerIn::PutRecordRes { KademliaHandlerIn::PutRecordRes {
@ -650,24 +734,29 @@ where
request_id, request_id,
value, value,
} => { } => {
let pos = self.substreams.iter().position(|state| match state { let pos = self
SubstreamState::InWaitingUser(ref conn_id, _) .inbound_substreams
if conn_id == &request_id.connec_unique_id => .iter()
{ .position(|state| match state {
true InboundSubstreamState::WaitingUser(ref conn_id, _)
} if conn_id == &request_id.connec_unique_id =>
_ => false, {
}); true
}
_ => false,
});
if let Some(pos) = pos { if let Some(pos) = pos {
let (conn_id, substream) = match self.substreams.remove(pos) { let (conn_id, substream) = match self.inbound_substreams.remove(pos) {
SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), InboundSubstreamState::WaitingUser(conn_id, substream) => {
(conn_id, substream)
}
_ => unreachable!(), _ => unreachable!(),
}; };
let msg = KadResponseMsg::PutValue { key, value }; let msg = KadResponseMsg::PutValue { key, value };
self.substreams self.inbound_substreams
.push(SubstreamState::InPendingSend(conn_id, substream, msg)); .push(InboundSubstreamState::PendingSend(conn_id, substream, msg));
} }
} }
} }
@ -681,8 +770,8 @@ where
// TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't
// continue trying // continue trying
if let Some(user_data) = user_data { if let Some(user_data) = user_data {
self.substreams self.outbound_substreams
.push(SubstreamState::OutReportError(error.into(), user_data)); .push(OutboundSubstreamState::ReportError(error.into(), user_data));
} }
} }
@ -701,7 +790,7 @@ where
Self::Error, Self::Error,
>, >,
> { > {
if self.substreams.is_empty() { if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
return Poll::Pending; return Poll::Pending;
} }
@ -714,25 +803,26 @@ where
)); ));
} }
// We remove each element from `substreams` one by one and add them back. // We remove each element from `outbound_substreams` one by one and add them back.
for n in (0..self.substreams.len()).rev() { for n in (0..self.outbound_substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n); let mut substream = self.outbound_substreams.swap_remove(n);
loop { loop {
match advance_substream(substream, self.config.protocol_config.clone(), cx) { match advance_outbound_substream(substream, self.config.protocol_config.clone(), cx)
{
(Some(new_state), Some(event), _) => { (Some(new_state), Some(event), _) => {
self.substreams.push(new_state); self.outbound_substreams.push(new_state);
return Poll::Ready(event); return Poll::Ready(event);
} }
(None, Some(event), _) => { (None, Some(event), _) => {
if self.substreams.is_empty() { if self.outbound_substreams.is_empty() {
self.keep_alive = self.keep_alive =
KeepAlive::Until(Instant::now() + self.config.idle_timeout); KeepAlive::Until(Instant::now() + self.config.idle_timeout);
} }
return Poll::Ready(event); return Poll::Ready(event);
} }
(Some(new_state), None, false) => { (Some(new_state), None, false) => {
self.substreams.push(new_state); self.outbound_substreams.push(new_state);
break; break;
} }
(Some(new_state), None, true) => { (Some(new_state), None, true) => {
@ -746,7 +836,39 @@ where
} }
} }
if self.substreams.is_empty() { // We remove each element from `inbound_substreams` one by one and add them back.
for n in (0..self.inbound_substreams.len()).rev() {
let mut substream = self.inbound_substreams.swap_remove(n);
loop {
match advance_inbound_substream(substream, cx) {
(Some(new_state), Some(event), _) => {
self.inbound_substreams.push(new_state);
return Poll::Ready(event);
}
(None, Some(event), _) => {
if self.inbound_substreams.is_empty() {
self.keep_alive =
KeepAlive::Until(Instant::now() + self.config.idle_timeout);
}
return Poll::Ready(event);
}
(Some(new_state), None, false) => {
self.inbound_substreams.push(new_state);
break;
}
(Some(new_state), None, true) => {
substream = new_state;
continue;
}
(None, None, _) => {
break;
}
}
}
}
if self.outbound_substreams.is_empty() && self.inbound_substreams.is_empty() {
// We destroyed all substreams in this function. // We destroyed all substreams in this function.
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
} else { } else {
@ -767,16 +889,16 @@ impl Default for KademliaHandlerConfig {
} }
} }
/// Advances one substream. /// Advances one outbound substream.
/// ///
/// Returns the new state for that substream, an event to generate, and whether the substream /// Returns the new state for that substream, an event to generate, and whether the substream
/// should be polled again. /// should be polled again.
fn advance_substream<TUserData>( fn advance_outbound_substream<TUserData>(
state: SubstreamState<TUserData>, state: OutboundSubstreamState<TUserData>,
upgrade: KademliaProtocolConfig, upgrade: KademliaProtocolConfig,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> ( ) -> (
Option<SubstreamState<TUserData>>, Option<OutboundSubstreamState<TUserData>>,
Option< Option<
ConnectionHandlerEvent< ConnectionHandlerEvent<
KademliaProtocolConfig, KademliaProtocolConfig,
@ -788,17 +910,17 @@ fn advance_substream<TUserData>(
bool, bool,
) { ) {
match state { match state {
SubstreamState::OutPendingOpen(msg, user_data) => { OutboundSubstreamState::PendingOpen(msg, user_data) => {
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(upgrade, (msg, user_data)), protocol: SubstreamProtocol::new(upgrade, (msg, user_data)),
}; };
(None, Some(ev), false) (None, Some(ev), false)
} }
SubstreamState::OutPendingSend(mut substream, msg, user_data) => { OutboundSubstreamState::PendingSend(mut substream, msg, user_data) => {
match Sink::poll_ready(Pin::new(&mut substream), cx) { match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) {
Ok(()) => ( Ok(()) => (
Some(SubstreamState::OutPendingFlush(substream, user_data)), Some(OutboundSubstreamState::PendingFlush(substream, user_data)),
None, None,
true, true,
), ),
@ -814,7 +936,9 @@ fn advance_substream<TUserData>(
} }
}, },
Poll::Pending => ( Poll::Pending => (
Some(SubstreamState::OutPendingSend(substream, msg, user_data)), Some(OutboundSubstreamState::PendingSend(
substream, msg, user_data,
)),
None, None,
false, false,
), ),
@ -830,21 +954,21 @@ fn advance_substream<TUserData>(
} }
} }
} }
SubstreamState::OutPendingFlush(mut substream, user_data) => { OutboundSubstreamState::PendingFlush(mut substream, user_data) => {
match Sink::poll_flush(Pin::new(&mut substream), cx) { match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => { Poll::Ready(Ok(())) => {
if let Some(user_data) = user_data { if let Some(user_data) = user_data {
( (
Some(SubstreamState::OutWaitingAnswer(substream, user_data)), Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)),
None, None,
true, true,
) )
} else { } else {
(Some(SubstreamState::OutClosing(substream)), None, true) (Some(OutboundSubstreamState::Closing(substream)), None, true)
} }
} }
Poll::Pending => ( Poll::Pending => (
Some(SubstreamState::OutPendingFlush(substream, user_data)), Some(OutboundSubstreamState::PendingFlush(substream, user_data)),
None, None,
false, false,
), ),
@ -860,10 +984,10 @@ fn advance_substream<TUserData>(
} }
} }
} }
SubstreamState::OutWaitingAnswer(mut substream, user_data) => { OutboundSubstreamState::WaitingAnswer(mut substream, user_data) => {
match Stream::poll_next(Pin::new(&mut substream), cx) { match Stream::poll_next(Pin::new(&mut substream), cx) {
Poll::Ready(Some(Ok(msg))) => { Poll::Ready(Some(Ok(msg))) => {
let new_state = SubstreamState::OutClosing(substream); let new_state = OutboundSubstreamState::Closing(substream);
let event = process_kad_response(msg, user_data); let event = process_kad_response(msg, user_data);
( (
Some(new_state), Some(new_state),
@ -872,7 +996,7 @@ fn advance_substream<TUserData>(
) )
} }
Poll::Pending => ( Poll::Pending => (
Some(SubstreamState::OutWaitingAnswer(substream, user_data)), Some(OutboundSubstreamState::WaitingAnswer(substream, user_data)),
None, None,
false, false,
), ),
@ -892,86 +1016,119 @@ fn advance_substream<TUserData>(
} }
} }
} }
SubstreamState::OutReportError(error, user_data) => { OutboundSubstreamState::ReportError(error, user_data) => {
let event = KademliaHandlerEvent::QueryError { error, user_data }; let event = KademliaHandlerEvent::QueryError { error, user_data };
(None, Some(ConnectionHandlerEvent::Custom(event)), false) (None, Some(ConnectionHandlerEvent::Custom(event)), false)
} }
SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) OutboundSubstreamState::Closing(mut stream) => {
{ match Sink::poll_close(Pin::new(&mut stream), cx) {
Poll::Ready(Ok(())) => (None, None, false), Poll::Ready(Ok(())) => (None, None, false),
Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false), Poll::Pending => (Some(OutboundSubstreamState::Closing(stream)), None, false),
Poll::Ready(Err(_)) => (None, None, false), Poll::Ready(Err(_)) => (None, None, false),
},
SubstreamState::InWaitingMessage(id, mut substream) => {
match Stream::poll_next(Pin::new(&mut substream), cx) {
Poll::Ready(Some(Ok(msg))) => {
if let Ok(ev) = process_kad_request(msg, id) {
(
Some(SubstreamState::InWaitingUser(id, substream)),
Some(ConnectionHandlerEvent::Custom(ev)),
false,
)
} else {
(Some(SubstreamState::InClosing(substream)), None, true)
}
}
Poll::Pending => (
Some(SubstreamState::InWaitingMessage(id, substream)),
None,
false,
),
Poll::Ready(None) => {
trace!("Inbound substream: EOF");
(None, None, false)
}
Poll::Ready(Some(Err(e))) => {
trace!("Inbound substream error: {:?}", e);
(None, None, false)
}
} }
} }
SubstreamState::InWaitingUser(id, substream) => ( }
Some(SubstreamState::InWaitingUser(id, substream)), }
/// Advances one inbound substream.
///
/// Returns the new state for that substream, an event to generate, and whether the substream
/// should be polled again.
fn advance_inbound_substream<TUserData>(
state: InboundSubstreamState,
cx: &mut Context<'_>,
) -> (
Option<InboundSubstreamState>,
Option<
ConnectionHandlerEvent<
KademliaProtocolConfig,
(KadRequestMsg, Option<TUserData>),
KademliaHandlerEvent<TUserData>,
io::Error,
>,
>,
bool,
) {
match state {
InboundSubstreamState::WaitingMessage {
first,
connection_id,
mut substream,
} => match Stream::poll_next(Pin::new(&mut substream), cx) {
Poll::Ready(Some(Ok(msg))) => {
if let Ok(ev) = process_kad_request(msg, connection_id) {
(
Some(InboundSubstreamState::WaitingUser(connection_id, substream)),
Some(ConnectionHandlerEvent::Custom(ev)),
false,
)
} else {
(Some(InboundSubstreamState::Closing(substream)), None, true)
}
}
Poll::Pending => (
Some(InboundSubstreamState::WaitingMessage {
first,
connection_id,
substream,
}),
None,
false,
),
Poll::Ready(None) => {
trace!("Inbound substream: EOF");
(None, None, false)
}
Poll::Ready(Some(Err(e))) => {
trace!("Inbound substream error: {:?}", e);
(None, None, false)
}
},
InboundSubstreamState::WaitingUser(id, substream) => (
Some(InboundSubstreamState::WaitingUser(id, substream)),
None, None,
false, false,
), ),
SubstreamState::InPendingSend(id, mut substream, msg) => { InboundSubstreamState::PendingSend(id, mut substream, msg) => {
match Sink::poll_ready(Pin::new(&mut substream), cx) { match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) {
Ok(()) => ( Ok(()) => (
Some(SubstreamState::InPendingFlush(id, substream)), Some(InboundSubstreamState::PendingFlush(id, substream)),
None, None,
true, true,
), ),
Err(_) => (None, None, false), Err(_) => (None, None, false),
}, },
Poll::Pending => ( Poll::Pending => (
Some(SubstreamState::InPendingSend(id, substream, msg)), Some(InboundSubstreamState::PendingSend(id, substream, msg)),
None, None,
false, false,
), ),
Poll::Ready(Err(_)) => (None, None, false), Poll::Ready(Err(_)) => (None, None, false),
} }
} }
SubstreamState::InPendingFlush(id, mut substream) => { InboundSubstreamState::PendingFlush(id, mut substream) => {
match Sink::poll_flush(Pin::new(&mut substream), cx) { match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => ( Poll::Ready(Ok(())) => (
Some(SubstreamState::InWaitingMessage(id, substream)), Some(InboundSubstreamState::WaitingMessage {
first: false,
connection_id: id,
substream,
}),
None, None,
true, true,
), ),
Poll::Pending => ( Poll::Pending => (
Some(SubstreamState::InPendingFlush(id, substream)), Some(InboundSubstreamState::PendingFlush(id, substream)),
None, None,
false, false,
), ),
Poll::Ready(Err(_)) => (None, None, false), Poll::Ready(Err(_)) => (None, None, false),
} }
} }
SubstreamState::InClosing(mut stream) => { InboundSubstreamState::Closing(mut stream) => {
match Sink::poll_close(Pin::new(&mut stream), cx) { match Sink::poll_close(Pin::new(&mut stream), cx) {
Poll::Ready(Ok(())) => (None, None, false), Poll::Ready(Ok(())) => (None, None, false),
Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false), Poll::Pending => (Some(InboundSubstreamState::Closing(stream)), None, false),
Poll::Ready(Err(_)) => (None, None, false), Poll::Ready(Err(_)) => (None, None, false),
} }
} }