diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eadbcd6..6c49a234 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ - Update `libp2p-core`, `libp2p-floodsub`, `libp2p-gossipsub`, `libp2p-mplex`, `libp2p-noise`, `libp2p-plaintext`, `libp2p-pnet`, `libp2p-request-response`, - `libp2p-tcp`, `libp2p-websocket` and `parity-multiaddr`. + `libp2p-swarm`, `libp2p-tcp`, `libp2p-websocket` and `parity-multiaddr`. # Version 0.28.1 [2020-09-10] diff --git a/Cargo.toml b/Cargo.toml index b494405a..a1560841 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true } libp2p-pnet = { version = "0.19.2", path = "protocols/pnet", optional = true } libp2p-request-response = { version = "0.4.0", path = "protocols/request-response", optional = true } -libp2p-swarm = { version = "0.22.0", path = "swarm" } +libp2p-swarm = { version = "0.22.1", path = "swarm" } libp2p-uds = { version = "0.22.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.22.0", path = "transports/wasm-ext", optional = true } libp2p-yamux = { version = "0.25.0", path = "muxers/yamux", optional = true } diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 926c7ecd..1ebf11b0 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,16 @@ +# 0.22.1 [unreleased] + +- Instead of iterating each inbound and outbound substream upgrade looking for + one to make progress, use a `FuturesUnordered` for both pending inbound and + pending outbound upgrades. As a result only those upgrades are polled that are + ready to progress. + + Implementors of `InboundUpgrade` and `OutboundUpgrade` need to ensure to wake + up the underlying task once they are ready to make progress as they won't be + polled otherwise. + + [PR 1775](https://github.com/libp2p/rust-libp2p/pull/1775) + # 0.22.0 [2020-09-09] - Bump `libp2p-core` dependency. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 94dde7da..4909e734 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-swarm" edition = "2018" description = "The libp2p swarm" -version = "0.22.0" +version = "0.22.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 5bf0a581..84fc7dfe 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -28,6 +28,7 @@ use crate::protocols_handler::{ }; use futures::prelude::*; +use futures::stream::FuturesUnordered; use libp2p_core::{ Multiaddr, PeerId, @@ -41,7 +42,7 @@ use libp2p_core::{ SubstreamEndpoint, }, muxing::StreamMuxerBox, - upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply} + upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError} }; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::{Delay, Instant}; @@ -76,8 +77,8 @@ where fn into_handler(self, connected: &Connected) -> Self::Handler { NodeHandlerWrapper { handler: self.handler.into_handler(connected.peer_id(), &connected.endpoint), - negotiating_in: Vec::new(), - negotiating_out: Vec::new(), + negotiating_in: Default::default(), + negotiating_out: Default::default(), queued_dial_upgrades: Vec::new(), unique_dial_upgrade_id: 0, shutdown: Shutdown::None, @@ -95,18 +96,15 @@ where /// The underlying handler. handler: TProtoHandler, /// Futures that upgrade incoming substreams. - negotiating_in: Vec<( + negotiating_in: FuturesUnordered, SendWrapper>, - Delay - )>, - /// Futures that upgrade outgoing substreams. The first element of the tuple is the userdata - /// to pass back once successfully opened. - negotiating_out: Vec<( + >>, + /// Futures that upgrade outgoing substreams. + negotiating_out: FuturesUnordered, SendWrapper>, - Delay, - )>, + >>, /// For each outbound substream request, how to upgrade it. The first element of the tuple /// is the unique identifier (see `unique_dial_upgrade_id`). queued_dial_upgrades: Vec<(u64, (upgrade::Version, SendWrapper))>, @@ -116,6 +114,48 @@ where shutdown: Shutdown, } +struct SubstreamUpgrade { + user_data: Option, + timeout: Delay, + upgrade: Upgrade, +} + +impl Unpin for SubstreamUpgrade {} + +impl Future for SubstreamUpgrade +where + Upgrade: Future>> + Unpin, +{ + type Output = (UserData, Result>); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.timeout.poll_unpin(cx) { + Poll::Ready(Ok(_)) => return Poll::Ready(( + self.user_data.take().expect("Future not to be polled again once ready."), + Err(ProtocolsHandlerUpgrErr::Timeout)), + ), + Poll::Ready(Err(_)) => return Poll::Ready(( + self.user_data.take().expect("Future not to be polled again once ready."), + Err(ProtocolsHandlerUpgrErr::Timer)), + ), + Poll::Pending => {}, + } + + match self.upgrade.poll_unpin(cx) { + Poll::Ready(Ok(upgrade)) => Poll::Ready(( + self.user_data.take().expect("Future not to be polled again once ready."), + Ok(upgrade), + )), + Poll::Ready(Err(err)) => Poll::Ready(( + self.user_data.take().expect("Future not to be polled again once ready."), + Err(ProtocolsHandlerUpgrErr::Upgrade(err)), + )), + Poll::Pending => Poll::Pending, + } + } +} + + /// The options for a planned connection & handler shutdown. /// /// A shutdown is planned anew based on the the return value of @@ -195,10 +235,14 @@ where SubstreamEndpoint::Listener => { let protocol = self.handler.listen_protocol(); let timeout = protocol.timeout().clone(); - let (_, upgrade, info) = protocol.into_upgrade(); + let (_, upgrade, user_data) = protocol.into_upgrade(); let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); let timeout = Delay::new(timeout); - self.negotiating_in.push((info, upgrade, timeout)); + self.negotiating_in.push(SubstreamUpgrade { + user_data: Some(user_data), + timeout, + upgrade, + }); } SubstreamEndpoint::Dialer((upgrade_id, user_data, timeout)) => { let pos = match self @@ -216,7 +260,11 @@ where let (_, (version, upgrade)) = self.queued_dial_upgrades.remove(pos); let upgrade = upgrade::apply_outbound(substream, upgrade, version); let timeout = Delay::new(timeout); - self.negotiating_out.push((user_data, upgrade, timeout)); + self.negotiating_out.push(SubstreamUpgrade { + user_data: Some(user_data), + timeout, + upgrade, + }); } } } @@ -232,62 +280,17 @@ where fn poll(&mut self, cx: &mut Context<'_>) -> Poll< Result, Self::Error> > { - // Continue negotiation of newly-opened substreams on the listening side. - // We remove each element from `negotiating_in` one by one and add them back if not ready. - for n in (0..self.negotiating_in.len()).rev() { - let (info, mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n); - match Future::poll(Pin::new(&mut timeout), cx) { - Poll::Ready(Ok(_)) => { - let err = ProtocolsHandlerUpgrErr::Timeout; - self.handler.inject_listen_upgrade_error(info, err); - continue - } - Poll::Ready(Err(_)) => { - let err = ProtocolsHandlerUpgrErr::Timer; - self.handler.inject_listen_upgrade_error(info, err); - continue; - } - Poll::Pending => {}, - } - match Future::poll(Pin::new(&mut in_progress), cx) { - Poll::Ready(Ok(upgrade)) => - self.handler.inject_fully_negotiated_inbound(upgrade, info), - Poll::Pending => self.negotiating_in.push((info, in_progress, timeout)), - Poll::Ready(Err(err)) => { - let err = ProtocolsHandlerUpgrErr::Upgrade(err); - self.handler.inject_listen_upgrade_error(info, err); - } + while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { + match res { + Ok(upgrade) => self.handler.inject_fully_negotiated_inbound(upgrade, user_data), + Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), } } - // Continue negotiation of newly-opened substreams. - // We remove each element from `negotiating_out` one by one and add them back if not ready. - for n in (0..self.negotiating_out.len()).rev() { - let (upgr_info, mut in_progress, mut timeout) = self.negotiating_out.swap_remove(n); - match Future::poll(Pin::new(&mut timeout), cx) { - Poll::Ready(Ok(_)) => { - let err = ProtocolsHandlerUpgrErr::Timeout; - self.handler.inject_dial_upgrade_error(upgr_info, err); - continue; - }, - Poll::Ready(Err(_)) => { - let err = ProtocolsHandlerUpgrErr::Timer; - self.handler.inject_dial_upgrade_error(upgr_info, err); - continue; - }, - Poll::Pending => {}, - } - match Future::poll(Pin::new(&mut in_progress), cx) { - Poll::Ready(Ok(upgrade)) => { - self.handler.inject_fully_negotiated_outbound(upgrade, upgr_info); - } - Poll::Pending => { - self.negotiating_out.push((upgr_info, in_progress, timeout)); - } - Poll::Ready(Err(err)) => { - let err = ProtocolsHandlerUpgrErr::Upgrade(err); - self.handler.inject_dial_upgrade_error(upgr_info, err); - } + while let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { + match res { + Ok(upgrade) => self.handler.inject_fully_negotiated_outbound(upgrade, user_data), + Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), } }