swarm/src/protocols_handler: Use FuturesUnordered in NodeHandlerWrapper (#1775)

> Futures managed by FuturesUnordered will only be polled when they
generate wake-up notifications. This reduces the required amount of work
needed to poll large numbers of futures.

https://docs.rs/futures/0.3.5/futures/stream/struct.FuturesUnordered.html

Instead of iterating each inbound and outbound upgrade looking for one
to make progress, use a `FuturesUnordered` for both pending inbound and
pending outbound upgrades. As a result only those upgrades are polled
that are ready to progress.
This commit is contained in:
Max Inden 2020-10-09 19:37:17 +02:00 committed by GitHub
parent b74285a8fb
commit 0c02a8ace7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 70 deletions

View File

@ -27,7 +27,7 @@
- Update `libp2p-core`, `libp2p-floodsub`, `libp2p-gossipsub`, `libp2p-mplex`,
`libp2p-noise`, `libp2p-plaintext`, `libp2p-pnet`, `libp2p-request-response`,
`libp2p-tcp`, `libp2p-websocket` and `parity-multiaddr`.
`libp2p-swarm`, `libp2p-tcp`, `libp2p-websocket` and `parity-multiaddr`.
# Version 0.28.1 [2020-09-10]

View File

@ -74,7 +74,7 @@ libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.19.2", path = "protocols/pnet", optional = true }
libp2p-request-response = { version = "0.4.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.22.0", path = "swarm" }
libp2p-swarm = { version = "0.22.1", path = "swarm" }
libp2p-uds = { version = "0.22.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.22.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.25.0", path = "muxers/yamux", optional = true }

View File

@ -1,3 +1,16 @@
# 0.22.1 [unreleased]
- Instead of iterating each inbound and outbound substream upgrade looking for
one to make progress, use a `FuturesUnordered` for both pending inbound and
pending outbound upgrades. As a result only those upgrades are polled that are
ready to progress.
Implementors of `InboundUpgrade` and `OutboundUpgrade` need to ensure to wake
up the underlying task once they are ready to make progress as they won't be
polled otherwise.
[PR 1775](https://github.com/libp2p/rust-libp2p/pull/1775)
# 0.22.0 [2020-09-09]
- Bump `libp2p-core` dependency.

View File

@ -2,7 +2,7 @@
name = "libp2p-swarm"
edition = "2018"
description = "The libp2p swarm"
version = "0.22.0"
version = "0.22.1"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -28,6 +28,7 @@ use crate::protocols_handler::{
};
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use libp2p_core::{
Multiaddr,
PeerId,
@ -41,7 +42,7 @@ use libp2p_core::{
SubstreamEndpoint,
},
muxing::StreamMuxerBox,
upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply}
upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}
};
use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::{Delay, Instant};
@ -76,8 +77,8 @@ where
fn into_handler(self, connected: &Connected<TConnInfo>) -> Self::Handler {
NodeHandlerWrapper {
handler: self.handler.into_handler(connected.peer_id(), &connected.endpoint),
negotiating_in: Vec::new(),
negotiating_out: Vec::new(),
negotiating_in: Default::default(),
negotiating_out: Default::default(),
queued_dial_upgrades: Vec::new(),
unique_dial_upgrade_id: 0,
shutdown: Shutdown::None,
@ -95,18 +96,15 @@ where
/// The underlying handler.
handler: TProtoHandler,
/// Futures that upgrade incoming substreams.
negotiating_in: Vec<(
negotiating_in: FuturesUnordered<SubstreamUpgrade<
TProtoHandler::InboundOpenInfo,
InboundUpgradeApply<Substream<StreamMuxerBox>, SendWrapper<TProtoHandler::InboundProtocol>>,
Delay
)>,
/// Futures that upgrade outgoing substreams. The first element of the tuple is the userdata
/// to pass back once successfully opened.
negotiating_out: Vec<(
>>,
/// Futures that upgrade outgoing substreams.
negotiating_out: FuturesUnordered<SubstreamUpgrade<
TProtoHandler::OutboundOpenInfo,
OutboundUpgradeApply<Substream<StreamMuxerBox>, SendWrapper<TProtoHandler::OutboundProtocol>>,
Delay,
)>,
>>,
/// For each outbound substream request, how to upgrade it. The first element of the tuple
/// is the unique identifier (see `unique_dial_upgrade_id`).
queued_dial_upgrades: Vec<(u64, (upgrade::Version, SendWrapper<TProtoHandler::OutboundProtocol>))>,
@ -116,6 +114,48 @@ where
shutdown: Shutdown,
}
struct SubstreamUpgrade<UserData, Upgrade> {
user_data: Option<UserData>,
timeout: Delay,
upgrade: Upgrade,
}
impl<UserData, Upgrade> Unpin for SubstreamUpgrade<UserData, Upgrade> {}
impl<UserData, Upgrade, UpgradeOutput, TUpgradeError> Future for SubstreamUpgrade<UserData, Upgrade>
where
Upgrade: Future<Output = Result<UpgradeOutput, UpgradeError<TUpgradeError>>> + Unpin,
{
type Output = (UserData, Result<UpgradeOutput, ProtocolsHandlerUpgrErr<TUpgradeError>>);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.timeout.poll_unpin(cx) {
Poll::Ready(Ok(_)) => return Poll::Ready((
self.user_data.take().expect("Future not to be polled again once ready."),
Err(ProtocolsHandlerUpgrErr::Timeout)),
),
Poll::Ready(Err(_)) => return Poll::Ready((
self.user_data.take().expect("Future not to be polled again once ready."),
Err(ProtocolsHandlerUpgrErr::Timer)),
),
Poll::Pending => {},
}
match self.upgrade.poll_unpin(cx) {
Poll::Ready(Ok(upgrade)) => Poll::Ready((
self.user_data.take().expect("Future not to be polled again once ready."),
Ok(upgrade),
)),
Poll::Ready(Err(err)) => Poll::Ready((
self.user_data.take().expect("Future not to be polled again once ready."),
Err(ProtocolsHandlerUpgrErr::Upgrade(err)),
)),
Poll::Pending => Poll::Pending,
}
}
}
/// The options for a planned connection & handler shutdown.
///
/// A shutdown is planned anew based on the the return value of
@ -195,10 +235,14 @@ where
SubstreamEndpoint::Listener => {
let protocol = self.handler.listen_protocol();
let timeout = protocol.timeout().clone();
let (_, upgrade, info) = protocol.into_upgrade();
let (_, upgrade, user_data) = protocol.into_upgrade();
let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade));
let timeout = Delay::new(timeout);
self.negotiating_in.push((info, upgrade, timeout));
self.negotiating_in.push(SubstreamUpgrade {
user_data: Some(user_data),
timeout,
upgrade,
});
}
SubstreamEndpoint::Dialer((upgrade_id, user_data, timeout)) => {
let pos = match self
@ -216,7 +260,11 @@ where
let (_, (version, upgrade)) = self.queued_dial_upgrades.remove(pos);
let upgrade = upgrade::apply_outbound(substream, upgrade, version);
let timeout = Delay::new(timeout);
self.negotiating_out.push((user_data, upgrade, timeout));
self.negotiating_out.push(SubstreamUpgrade {
user_data: Some(user_data),
timeout,
upgrade,
});
}
}
}
@ -232,62 +280,17 @@ where
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<
Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>
> {
// Continue negotiation of newly-opened substreams on the listening side.
// We remove each element from `negotiating_in` one by one and add them back if not ready.
for n in (0..self.negotiating_in.len()).rev() {
let (info, mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n);
match Future::poll(Pin::new(&mut timeout), cx) {
Poll::Ready(Ok(_)) => {
let err = ProtocolsHandlerUpgrErr::Timeout;
self.handler.inject_listen_upgrade_error(info, err);
continue
}
Poll::Ready(Err(_)) => {
let err = ProtocolsHandlerUpgrErr::Timer;
self.handler.inject_listen_upgrade_error(info, err);
continue;
}
Poll::Pending => {},
}
match Future::poll(Pin::new(&mut in_progress), cx) {
Poll::Ready(Ok(upgrade)) =>
self.handler.inject_fully_negotiated_inbound(upgrade, info),
Poll::Pending => self.negotiating_in.push((info, in_progress, timeout)),
Poll::Ready(Err(err)) => {
let err = ProtocolsHandlerUpgrErr::Upgrade(err);
self.handler.inject_listen_upgrade_error(info, err);
}
while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) {
match res {
Ok(upgrade) => self.handler.inject_fully_negotiated_inbound(upgrade, user_data),
Err(err) => self.handler.inject_listen_upgrade_error(user_data, err),
}
}
// Continue negotiation of newly-opened substreams.
// We remove each element from `negotiating_out` one by one and add them back if not ready.
for n in (0..self.negotiating_out.len()).rev() {
let (upgr_info, mut in_progress, mut timeout) = self.negotiating_out.swap_remove(n);
match Future::poll(Pin::new(&mut timeout), cx) {
Poll::Ready(Ok(_)) => {
let err = ProtocolsHandlerUpgrErr::Timeout;
self.handler.inject_dial_upgrade_error(upgr_info, err);
continue;
},
Poll::Ready(Err(_)) => {
let err = ProtocolsHandlerUpgrErr::Timer;
self.handler.inject_dial_upgrade_error(upgr_info, err);
continue;
},
Poll::Pending => {},
}
match Future::poll(Pin::new(&mut in_progress), cx) {
Poll::Ready(Ok(upgrade)) => {
self.handler.inject_fully_negotiated_outbound(upgrade, upgr_info);
}
Poll::Pending => {
self.negotiating_out.push((upgr_info, in_progress, timeout));
}
Poll::Ready(Err(err)) => {
let err = ProtocolsHandlerUpgrErr::Upgrade(err);
self.handler.inject_dial_upgrade_error(upgr_info, err);
}
while let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) {
match res {
Ok(upgrade) => self.handler.inject_fully_negotiated_outbound(upgrade, user_data),
Err(err) => self.handler.inject_dial_upgrade_error(user_data, err),
}
}