mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-19 13:01:22 +00:00
protocols/identify: Allow at most one inbound identify push stream (#2694)
An identify push contains the whole identify information of a remote peer. Upgrading multiple inbound identify push streams is useless. Instead older streams are dropped in favor of newer streams.
This commit is contained in:
@ -1,3 +1,7 @@
|
|||||||
|
# 0.36.1 - unreleased
|
||||||
|
|
||||||
|
- Allow at most one inbound identify push stream.
|
||||||
|
|
||||||
# 0.36.0
|
# 0.36.0
|
||||||
|
|
||||||
- Update to `libp2p-core` `v0.33.0`.
|
- Update to `libp2p-core` `v0.33.0`.
|
||||||
|
@ -3,7 +3,7 @@ name = "libp2p-identify"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
rust-version = "1.56.1"
|
rust-version = "1.56.1"
|
||||||
description = "Nodes identifcation protocol for libp2p"
|
description = "Nodes identifcation protocol for libp2p"
|
||||||
version = "0.36.0"
|
version = "0.36.1"
|
||||||
authors = ["Parity Technologies <admin@parity.io>"]
|
authors = ["Parity Technologies <admin@parity.io>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
repository = "https://github.com/libp2p/rust-libp2p"
|
repository = "https://github.com/libp2p/rust-libp2p"
|
||||||
@ -22,6 +22,7 @@ prost-codec = { version = "0.1", path = "../../misc/prost-codec" }
|
|||||||
prost = "0.10"
|
prost = "0.10"
|
||||||
smallvec = "1.6.1"
|
smallvec = "1.6.1"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
|
void = "1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
async-std = "1.6.2"
|
async-std = "1.6.2"
|
||||||
|
@ -22,6 +22,7 @@ use crate::protocol::{
|
|||||||
IdentifyInfo, IdentifyProtocol, IdentifyPushProtocol, InboundPush, OutboundPush,
|
IdentifyInfo, IdentifyProtocol, IdentifyPushProtocol, InboundPush, OutboundPush,
|
||||||
ReplySubstream, UpgradeError,
|
ReplySubstream, UpgradeError,
|
||||||
};
|
};
|
||||||
|
use futures::future::BoxFuture;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures_timer::Delay;
|
use futures_timer::Delay;
|
||||||
use libp2p_core::either::{EitherError, EitherOutput};
|
use libp2p_core::either::{EitherError, EitherOutput};
|
||||||
@ -30,6 +31,7 @@ use libp2p_swarm::{
|
|||||||
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
|
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
|
||||||
NegotiatedSubstream, SubstreamProtocol,
|
NegotiatedSubstream, SubstreamProtocol,
|
||||||
};
|
};
|
||||||
|
use log::warn;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
|
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
|
||||||
|
|
||||||
@ -39,6 +41,7 @@ use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
|
|||||||
/// at least one identification request to be answered by the remote before
|
/// at least one identification request to be answered by the remote before
|
||||||
/// permitting the underlying connection to be closed.
|
/// permitting the underlying connection to be closed.
|
||||||
pub struct IdentifyHandler {
|
pub struct IdentifyHandler {
|
||||||
|
inbound_identify_push: Option<BoxFuture<'static, Result<IdentifyInfo, UpgradeError>>>,
|
||||||
/// Pending events to yield.
|
/// Pending events to yield.
|
||||||
events: SmallVec<
|
events: SmallVec<
|
||||||
[ConnectionHandlerEvent<
|
[ConnectionHandlerEvent<
|
||||||
@ -80,6 +83,7 @@ impl IdentifyHandler {
|
|||||||
/// Creates a new `IdentifyHandler`.
|
/// Creates a new `IdentifyHandler`.
|
||||||
pub fn new(initial_delay: Duration, interval: Duration) -> Self {
|
pub fn new(initial_delay: Duration, interval: Duration) -> Self {
|
||||||
IdentifyHandler {
|
IdentifyHandler {
|
||||||
|
inbound_identify_push: Default::default(),
|
||||||
events: SmallVec::new(),
|
events: SmallVec::new(),
|
||||||
trigger_next_identify: Delay::new(initial_delay),
|
trigger_next_identify: Delay::new(initial_delay),
|
||||||
keep_alive: KeepAlive::Yes,
|
keep_alive: KeepAlive::Yes,
|
||||||
@ -113,9 +117,14 @@ impl ConnectionHandler for IdentifyHandler {
|
|||||||
EitherOutput::First(substream) => self.events.push(ConnectionHandlerEvent::Custom(
|
EitherOutput::First(substream) => self.events.push(ConnectionHandlerEvent::Custom(
|
||||||
IdentifyHandlerEvent::Identify(substream),
|
IdentifyHandlerEvent::Identify(substream),
|
||||||
)),
|
)),
|
||||||
EitherOutput::Second(info) => self.events.push(ConnectionHandlerEvent::Custom(
|
EitherOutput::Second(fut) => {
|
||||||
IdentifyHandlerEvent::Identified(info),
|
if self.inbound_identify_push.replace(fut).is_some() {
|
||||||
)),
|
warn!(
|
||||||
|
"New inbound identify push stream while still upgrading previous one. \
|
||||||
|
Replacing previous with new.",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,14 +198,30 @@ impl ConnectionHandler for IdentifyHandler {
|
|||||||
|
|
||||||
// Poll the future that fires when we need to identify the node again.
|
// Poll the future that fires when we need to identify the node again.
|
||||||
match Future::poll(Pin::new(&mut self.trigger_next_identify), cx) {
|
match Future::poll(Pin::new(&mut self.trigger_next_identify), cx) {
|
||||||
Poll::Pending => Poll::Pending,
|
Poll::Pending => {}
|
||||||
Poll::Ready(()) => {
|
Poll::Ready(()) => {
|
||||||
self.trigger_next_identify.reset(self.interval);
|
self.trigger_next_identify.reset(self.interval);
|
||||||
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
|
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
|
||||||
protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ()),
|
protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ()),
|
||||||
};
|
};
|
||||||
Poll::Ready(ev)
|
return Poll::Ready(ev);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(Poll::Ready(res)) = self
|
||||||
|
.inbound_identify_push
|
||||||
|
.as_mut()
|
||||||
|
.map(|f| f.poll_unpin(cx))
|
||||||
|
{
|
||||||
|
self.inbound_identify_push.take();
|
||||||
|
|
||||||
|
if let Ok(info) = res {
|
||||||
|
return Poll::Ready(ConnectionHandlerEvent::Custom(
|
||||||
|
IdentifyHandlerEvent::Identified(info),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
use crate::structs_proto;
|
use crate::structs_proto;
|
||||||
use asynchronous_codec::{FramedRead, FramedWrite};
|
use asynchronous_codec::{FramedRead, FramedWrite};
|
||||||
use futures::prelude::*;
|
use futures::{future::BoxFuture, prelude::*};
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
identity, multiaddr,
|
identity, multiaddr,
|
||||||
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
|
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
|
||||||
@ -30,6 +30,7 @@ use log::trace;
|
|||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::{fmt, io, iter, pin::Pin};
|
use std::{fmt, io, iter, pin::Pin};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use void::Void;
|
||||||
|
|
||||||
const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
|
const MAX_MESSAGE_SIZE_BYTES: usize = 4096;
|
||||||
|
|
||||||
@ -143,12 +144,13 @@ impl<C> InboundUpgrade<C> for IdentifyPushProtocol<InboundPush>
|
|||||||
where
|
where
|
||||||
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
type Output = IdentifyInfo;
|
type Output = BoxFuture<'static, Result<IdentifyInfo, UpgradeError>>;
|
||||||
type Error = UpgradeError;
|
type Error = Void;
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
type Future = future::Ready<Result<Self::Output, Self::Error>>;
|
||||||
|
|
||||||
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
|
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
|
||||||
recv(socket).boxed()
|
// Lazily upgrade stream, thus allowing upgrade to happen within identify's handler.
|
||||||
|
future::ok(recv(socket).boxed())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user