[swarm] Remove substream-specific protocol negotiation version. (#1962)

* Remove substream-specific protocol negotiation version.

Remove the option for a substream-specific multistream select protocol override.
The override at this granularity is no longer deemed useful, in particular because
it can usually not be configured for existing protocols like `libp2p-kad` and others.
There is a `Swarm`-scoped configuration for this version available since
[1858](https://github.com/libp2p/rust-libp2p/pull/1858).

* Update protocol crate versions and changelogs.

* Clean up documentation.
This commit is contained in:
Roman Borschel
2021-02-25 11:35:52 +01:00
committed by GitHub
parent 6a7576afec
commit 7dd42fcaaf
21 changed files with 80 additions and 72 deletions

View File

@ -55,7 +55,7 @@ use libp2p_core::{
ConnectedPoint,
Multiaddr,
PeerId,
upgrade::{self, UpgradeError},
upgrade::UpgradeError,
};
use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant;
@ -242,7 +242,6 @@ pub trait ProtocolsHandler: Send + 'static {
pub struct SubstreamProtocol<TUpgrade, TInfo> {
upgrade: TUpgrade,
info: TInfo,
upgrade_protocol: upgrade::Version,
timeout: Duration,
}
@ -255,18 +254,10 @@ impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
SubstreamProtocol {
upgrade,
info,
upgrade_protocol: upgrade::Version::V1,
timeout: Duration::from_secs(10),
}
}
/// Sets the multistream-select protocol (version) to use for negotiating
/// protocols upgrades on outbound substreams.
pub fn with_upgrade_protocol(mut self, version: upgrade::Version) -> Self {
self.upgrade_protocol = version;
self
}
/// Maps a function over the protocol upgrade.
pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U, TInfo>
where
@ -275,7 +266,6 @@ impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
SubstreamProtocol {
upgrade: f(self.upgrade),
info: self.info,
upgrade_protocol: self.upgrade_protocol,
timeout: self.timeout,
}
}
@ -288,7 +278,6 @@ impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
SubstreamProtocol {
upgrade: self.upgrade,
info: f(self.info),
upgrade_protocol: self.upgrade_protocol,
timeout: self.timeout,
}
}
@ -315,8 +304,8 @@ impl<TUpgrade, TInfo> SubstreamProtocol<TUpgrade, TInfo> {
}
/// Converts the substream protocol configuration into the contained upgrade.
pub fn into_upgrade(self) -> (upgrade::Version, TUpgrade, TInfo) {
(self.upgrade_protocol, self.upgrade, self.info)
pub fn into_upgrade(self) -> (TUpgrade, TInfo) {
(self.upgrade, self.info)
}
}
@ -512,7 +501,7 @@ where T: ProtocolsHandler
}
fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
self.listen_protocol().into_upgrade().1
self.listen_protocol().into_upgrade().0
}
}

View File

@ -37,7 +37,7 @@ use crate::upgrade::{
};
use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::upgrade::{self, ProtocolName, UpgradeError, NegotiationError, ProtocolError};
use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError};
use rand::Rng;
use std::{
cmp,
@ -76,15 +76,12 @@ where
/// Create and populate a `MultiHandler` from the given handler iterator.
///
/// It is an error for any two protocols handlers to share the same protocol name.
///
/// > **Note**: All handlers should use the same [`upgrade::Version`] for
/// > the inbound and outbound [`SubstreamProtocol`]s.
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
where
I: IntoIterator<Item = (K, H)>
{
let m = MultiHandler { handlers: HashMap::from_iter(iter) };
uniq_proto_names(m.handlers.values().map(|h| h.listen_protocol().into_upgrade().1))?;
uniq_proto_names(m.handlers.values().map(|h| h.listen_protocol().into_upgrade().0))?;
Ok(m)
}
}
@ -105,34 +102,22 @@ where
type OutboundOpenInfo = (K, <H as ProtocolsHandler>::OutboundOpenInfo);
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
let (upgrade, info, timeout, version) = self.handlers.iter()
let (upgrade, info, timeout) = self.handlers.iter()
.map(|(key, handler)| {
let proto = handler.listen_protocol();
let timeout = *proto.timeout();
let (version, upgrade, info) = proto.into_upgrade();
(key.clone(), (version, upgrade, info, timeout))
let (upgrade, info) = proto.into_upgrade();
(key.clone(), (upgrade, info, timeout))
})
.fold((Upgrade::new(), Info::new(), Duration::from_secs(0), None),
|(mut upg, mut inf, mut timeout, mut version), (k, (v, u, i, t))| {
.fold((Upgrade::new(), Info::new(), Duration::from_secs(0)),
|(mut upg, mut inf, mut timeout), (k, (u, i, t))| {
upg.upgrades.push((k.clone(), u));
inf.infos.push((k, i));
timeout = cmp::max(timeout, t);
version = version.map_or(Some(v), |vv|
if v != vv {
// Different upgrade (i.e. protocol negotiation) protocol
// versions are usually incompatible and not negotiated
// themselves, so a protocol upgrade may fail.
log::warn!("Differing upgrade versions. Defaulting to V1.");
Some(upgrade::Version::V1)
} else {
Some(v)
});
(upg, inf, timeout, version)
(upg, inf, timeout)
}
);
SubstreamProtocol::new(upgrade, info)
.with_timeout(timeout)
.with_upgrade_protocol(version.unwrap_or(upgrade::Version::V1))
SubstreamProtocol::new(upgrade, info).with_timeout(timeout)
}
fn inject_fully_negotiated_outbound (
@ -315,9 +300,6 @@ where
/// Create and populate an `IntoMultiHandler` from the given iterator.
///
/// It is an error for any two protocols handlers to share the same protocol name.
///
/// > **Note**: All handlers should use the same [`upgrade::Version`] for
/// > the inbound and outbound [`SubstreamProtocol`]s.
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
where
I: IntoIterator<Item = (K, H)>

View File

@ -116,7 +116,7 @@ where
>>,
/// For each outbound substream request, how to upgrade it. The first element of the tuple
/// is the unique identifier (see `unique_dial_upgrade_id`).
queued_dial_upgrades: Vec<(u64, (upgrade::Version, SendWrapper<TProtoHandler::OutboundProtocol>))>,
queued_dial_upgrades: Vec<(u64, SendWrapper<TProtoHandler::OutboundProtocol>)>,
/// Unique identifier assigned to each queued dial upgrade.
unique_dial_upgrade_id: u64,
/// The currently planned connection & handler shutdown.
@ -246,7 +246,7 @@ where
SubstreamEndpoint::Listener => {
let protocol = self.handler.listen_protocol();
let timeout = *protocol.timeout();
let (_, upgrade, user_data) = protocol.into_upgrade();
let (upgrade, user_data) = protocol.into_upgrade();
let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade));
let timeout = Delay::new(timeout);
self.negotiating_in.push(SubstreamUpgrade {
@ -268,7 +268,8 @@ where
}
};
let (_, (mut version, upgrade)) = self.queued_dial_upgrades.remove(pos);
let (_, upgrade) = self.queued_dial_upgrades.remove(pos);
let mut version = upgrade::Version::default();
if let Some(v) = self.substream_upgrade_protocol_override {
if v != version {
log::debug!("Substream upgrade protocol override: {:?} -> {:?}", version, v);
@ -336,8 +337,8 @@ where
let id = self.unique_dial_upgrade_id;
let timeout = *protocol.timeout();
self.unique_dial_upgrade_id += 1;
let (version, upgrade, info) = protocol.into_upgrade();
self.queued_dial_upgrades.push((id, (version, SendWrapper(upgrade))));
let (upgrade, info) = protocol.into_upgrade();
self.queued_dial_upgrades.push((id, SendWrapper(upgrade)));
return Poll::Ready(Ok(
ConnectionHandlerEvent::OutboundSubstreamRequest((id, info, timeout)),
));

View File

@ -111,8 +111,8 @@ where
let proto1 = self.proto1.listen_protocol();
let proto2 = self.proto2.listen_protocol();
let timeout = *std::cmp::max(proto1.timeout(), proto2.timeout());
let (_, u1, i1) = proto1.into_upgrade();
let (_, u2, i2) = proto2.into_upgrade();
let (u1, i1) = proto1.into_upgrade();
let (u2, i2) = proto2.into_upgrade();
let choice = SelectUpgrade::new(SendWrapper(u1), SendWrapper(u2));
SubstreamProtocol::new(choice, (i1, i2)).with_timeout(timeout)
}