mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 03:02:12 +00:00
[swarm] MultiHandler: Respect inbound timeouts and upgrade versions. (#1786)
* Respect inbound timeouts and upgrade versions. * Update CHANGELOG
This commit is contained in:
parent
6528114fe8
commit
6ed92ab4be
@ -1,5 +1,8 @@
|
|||||||
# 0.22.1 [unreleased]
|
# 0.22.1 [unreleased]
|
||||||
|
|
||||||
|
- Respect inbound timeouts and upgrade versions in the `MultiHandler`.
|
||||||
|
[PR 1786](https://github.com/libp2p/rust-libp2p/pull/1786).
|
||||||
|
|
||||||
- Instead of iterating each inbound and outbound substream upgrade looking for
|
- Instead of iterating each inbound and outbound substream upgrade looking for
|
||||||
one to make progress, use a `FuturesUnordered` for both pending inbound and
|
one to make progress, use a `FuturesUnordered` for both pending inbound and
|
||||||
pending outbound upgrades. As a result only those upgrades are polled that are
|
pending outbound upgrades. As a result only those upgrades are polled that are
|
||||||
|
@ -37,18 +37,20 @@ use crate::upgrade::{
|
|||||||
};
|
};
|
||||||
use futures::{future::BoxFuture, prelude::*};
|
use futures::{future::BoxFuture, prelude::*};
|
||||||
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
|
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
|
||||||
use libp2p_core::upgrade::{ProtocolName, UpgradeError, NegotiationError, ProtocolError};
|
use libp2p_core::upgrade::{self, ProtocolName, UpgradeError, NegotiationError, ProtocolError};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use std::{
|
use std::{
|
||||||
|
cmp,
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
error,
|
error,
|
||||||
fmt,
|
fmt,
|
||||||
hash::Hash,
|
hash::Hash,
|
||||||
iter::{self, FromIterator},
|
iter::{self, FromIterator},
|
||||||
task::{Context, Poll}
|
task::{Context, Poll},
|
||||||
|
time::Duration
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A [`ProtocolsHandler`] for multiple other `ProtocolsHandler`s.
|
/// A [`ProtocolsHandler`] for multiple `ProtocolsHandler`s of the same type.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MultiHandler<K, H> {
|
pub struct MultiHandler<K, H> {
|
||||||
handlers: HashMap<K, H>
|
handlers: HashMap<K, H>
|
||||||
@ -74,6 +76,9 @@ where
|
|||||||
/// Create and populate a `MultiHandler` from the given handler iterator.
|
/// Create and populate a `MultiHandler` from the given handler iterator.
|
||||||
///
|
///
|
||||||
/// It is an error for any two protocols handlers to share the same protocol name.
|
/// It is an error for any two protocols handlers to share the same protocol name.
|
||||||
|
///
|
||||||
|
/// > **Note**: All handlers should use the same [`upgrade::Version`] for
|
||||||
|
/// > the inbound and outbound [`SubstreamProtocol`]s.
|
||||||
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
|
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = (K, H)>
|
I: IntoIterator<Item = (K, H)>
|
||||||
@ -100,17 +105,34 @@ where
|
|||||||
type OutboundOpenInfo = (K, <H as ProtocolsHandler>::OutboundOpenInfo);
|
type OutboundOpenInfo = (K, <H as ProtocolsHandler>::OutboundOpenInfo);
|
||||||
|
|
||||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||||
let (upgrade, info) = self.handlers.iter()
|
let (upgrade, info, timeout, version) = self.handlers.iter()
|
||||||
.map(|(k, h)| {
|
.map(|(k, h)| {
|
||||||
let (_, u, i) = h.listen_protocol().into_upgrade();
|
let p = h.listen_protocol();
|
||||||
(k.clone(), (u, i))
|
let t = *p.timeout();
|
||||||
|
let (v, u, i) = p.into_upgrade();
|
||||||
|
(k.clone(), (v, u, i, t))
|
||||||
})
|
})
|
||||||
.fold((Upgrade::new(), Info::new()), |(mut upg, mut inf), (k, (u, i))| {
|
.fold((Upgrade::new(), Info::new(), Duration::from_secs(0), None),
|
||||||
upg.upgrades.push((k.clone(), u));
|
|(mut upg, mut inf, mut timeout, mut version), (k, (v, u, i, t))| {
|
||||||
inf.infos.push((k, i));
|
upg.upgrades.push((k.clone(), u));
|
||||||
(upg, inf)
|
inf.infos.push((k, i));
|
||||||
});
|
timeout = cmp::max(timeout, t);
|
||||||
|
version = version.map_or(Some(v), |vv|
|
||||||
|
if v != vv {
|
||||||
|
// Different upgrade (i.e. protocol negotiation) protocol
|
||||||
|
// versions are usually incompatible and not negotiated
|
||||||
|
// themselves, so a protocol upgrade may fail.
|
||||||
|
log::warn!("Differing upgrade versions. Defaulting to V1.");
|
||||||
|
Some(upgrade::Version::V1)
|
||||||
|
} else {
|
||||||
|
Some(v)
|
||||||
|
});
|
||||||
|
(upg, inf, timeout, version)
|
||||||
|
}
|
||||||
|
);
|
||||||
SubstreamProtocol::new(upgrade, info)
|
SubstreamProtocol::new(upgrade, info)
|
||||||
|
.with_timeout(timeout)
|
||||||
|
.with_upgrade_protocol(version.unwrap_or(upgrade::Version::V1))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn inject_fully_negotiated_outbound (
|
fn inject_fully_negotiated_outbound (
|
||||||
@ -293,6 +315,9 @@ where
|
|||||||
/// Create and populate an `IntoMultiHandler` from the given iterator.
|
/// Create and populate an `IntoMultiHandler` from the given iterator.
|
||||||
///
|
///
|
||||||
/// It is an error for any two protocols handlers to share the same protocol name.
|
/// It is an error for any two protocols handlers to share the same protocol name.
|
||||||
|
///
|
||||||
|
/// > **Note**: All handlers should use the same [`upgrade::Version`] for
|
||||||
|
/// > the inbound and outbound [`SubstreamProtocol`]s.
|
||||||
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
|
pub fn try_from_iter<I>(iter: I) -> Result<Self, DuplicateProtonameError>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = (K, H)>
|
I: IntoIterator<Item = (K, H)>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user