mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-24 07:11:38 +00:00
fix(gossipsub): gracefully disable handler on stream errors
Previously, we closed the entire connection upon receiving too many upgrade errors. This is unnecessarily aggressive. For example, an upgrade error may be caused by the remote dropping a stream during the initial handshake which is completely isolated from other protocols running on the same connection. Instead of closing the connection, set `KeepAlive::No`. Related: #3591. Resolves: #3690. Pull-Request: #3625.
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -2438,6 +2438,7 @@ dependencies = [
|
|||||||
"base64 0.21.0",
|
"base64 0.21.0",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"either",
|
||||||
"env_logger 0.10.0",
|
"env_logger 0.10.0",
|
||||||
"fnv",
|
"fnv",
|
||||||
"futures",
|
"futures",
|
||||||
@ -2462,6 +2463,7 @@ dependencies = [
|
|||||||
"smallvec",
|
"smallvec",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"unsigned-varint",
|
"unsigned-varint",
|
||||||
|
"void",
|
||||||
"wasm-timer",
|
"wasm-timer",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -2841,7 +2843,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libp2p-swarm"
|
name = "libp2p-swarm"
|
||||||
version = "0.42.1"
|
version = "0.42.2"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-std",
|
"async-std",
|
||||||
"either",
|
"either",
|
||||||
|
@ -2,7 +2,11 @@
|
|||||||
|
|
||||||
- Fix erroneously duplicate message IDs. See [PR 3716].
|
- Fix erroneously duplicate message IDs. See [PR 3716].
|
||||||
|
|
||||||
|
- Gracefully disable handler on stream errors. Deprecate a few variants of `HandlerError`.
|
||||||
|
See [PR 3625].
|
||||||
|
|
||||||
[PR 3716]: https://github.com/libp2p/rust-libp2p/pull/3716
|
[PR 3716]: https://github.com/libp2p/rust-libp2p/pull/3716
|
||||||
|
[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3325
|
||||||
|
|
||||||
## 0.44.2
|
## 0.44.2
|
||||||
|
|
||||||
|
@ -11,7 +11,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
|
|||||||
categories = ["network-programming", "asynchronous"]
|
categories = ["network-programming", "asynchronous"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
libp2p-swarm = { version = "0.42.1", path = "../../swarm" }
|
either = "1.5"
|
||||||
|
libp2p-swarm = { version = "0.42.2", path = "../../swarm" }
|
||||||
libp2p-core = { version = "0.39.0", path = "../../core" }
|
libp2p-core = { version = "0.39.0", path = "../../core" }
|
||||||
libp2p-identity = { version = "0.1.2", path = "../../identity" }
|
libp2p-identity = { version = "0.1.2", path = "../../identity" }
|
||||||
bytes = "1.4"
|
bytes = "1.4"
|
||||||
@ -33,6 +34,7 @@ serde = { version = "1", optional = true, features = ["derive"] }
|
|||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
wasm-timer = "0.2.5"
|
wasm-timer = "0.2.5"
|
||||||
instant = "0.1.11"
|
instant = "0.1.11"
|
||||||
|
void = "1.0.2"
|
||||||
# Metrics dependencies
|
# Metrics dependencies
|
||||||
prometheus-client = "0.19.0"
|
prometheus-client = "0.19.0"
|
||||||
|
|
||||||
|
@ -30,16 +30,10 @@ pub type PublishError = crate::error_priv::PublishError;
|
|||||||
)]
|
)]
|
||||||
pub type SubscriptionError = crate::error_priv::SubscriptionError;
|
pub type SubscriptionError = crate::error_priv::SubscriptionError;
|
||||||
|
|
||||||
#[deprecated(
|
#[deprecated(note = "This error will no longer be emitted")]
|
||||||
since = "0.44.0",
|
|
||||||
note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::HandlerError"
|
|
||||||
)]
|
|
||||||
pub type GossipsubHandlerError = crate::error_priv::HandlerError;
|
pub type GossipsubHandlerError = crate::error_priv::HandlerError;
|
||||||
|
|
||||||
#[deprecated(
|
#[deprecated(note = "This error will no longer be emitted")]
|
||||||
since = "0.44.0",
|
|
||||||
note = "Use `libp2p::gossipsub::HandlerError` instead, as the `error` module will become crate-private in the future."
|
|
||||||
)]
|
|
||||||
pub type HandlerError = crate::error_priv::HandlerError;
|
pub type HandlerError = crate::error_priv::HandlerError;
|
||||||
|
|
||||||
#[deprecated(
|
#[deprecated(
|
||||||
|
@ -134,12 +134,6 @@ impl std::fmt::Display for ValidationError {
|
|||||||
|
|
||||||
impl std::error::Error for ValidationError {}
|
impl std::error::Error for ValidationError {}
|
||||||
|
|
||||||
impl From<std::io::Error> for HandlerError {
|
|
||||||
fn from(error: std::io::Error) -> HandlerError {
|
|
||||||
HandlerError::Codec(quick_protobuf_codec::Error::from(error))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<std::io::Error> for PublishError {
|
impl From<std::io::Error> for PublishError {
|
||||||
fn from(error: std::io::Error) -> PublishError {
|
fn from(error: std::io::Error) -> PublishError {
|
||||||
PublishError::TransformFailed(error)
|
PublishError::TransformFailed(error)
|
||||||
|
@ -21,30 +21,26 @@
|
|||||||
use crate::protocol::{GossipsubCodec, ProtocolConfig};
|
use crate::protocol::{GossipsubCodec, ProtocolConfig};
|
||||||
use crate::rpc_proto::proto;
|
use crate::rpc_proto::proto;
|
||||||
use crate::types::{PeerKind, RawMessage, Rpc};
|
use crate::types::{PeerKind, RawMessage, Rpc};
|
||||||
use crate::{HandlerError, ValidationError};
|
use crate::ValidationError;
|
||||||
use asynchronous_codec::Framed;
|
use asynchronous_codec::Framed;
|
||||||
|
use futures::future::Either;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use instant::Instant;
|
use instant::Instant;
|
||||||
use libp2p_core::upgrade::{NegotiationError, UpgradeError};
|
use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError};
|
||||||
use libp2p_swarm::handler::{
|
use libp2p_swarm::handler::{
|
||||||
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
|
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
|
||||||
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive,
|
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive,
|
||||||
SubstreamProtocol,
|
SubstreamProtocol,
|
||||||
};
|
};
|
||||||
use libp2p_swarm::NegotiatedSubstream;
|
use libp2p_swarm::NegotiatedSubstream;
|
||||||
use log::{error, trace, warn};
|
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
|
||||||
io,
|
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
use void::Void;
|
||||||
/// The initial time (in seconds) we set the keep alive for protocol negotiations to occur.
|
|
||||||
const INITIAL_KEEP_ALIVE: u64 = 30;
|
|
||||||
|
|
||||||
/// The event emitted by the Handler. This informs the behaviour of various events created
|
/// The event emitted by the Handler. This informs the behaviour of various events created
|
||||||
/// by the handler.
|
/// by the handler.
|
||||||
@ -75,17 +71,23 @@ pub enum HandlerIn {
|
|||||||
LeftMesh,
|
LeftMesh,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The maximum number of substreams we accept or create before disconnecting from the peer.
|
/// The maximum number of inbound or outbound substreams attempts we allow.
|
||||||
///
|
///
|
||||||
/// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we
|
/// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we
|
||||||
/// attempt to recreate these. This imposes an upper bound of new substreams before we consider the
|
/// attempt to recreate these. This imposes an upper bound of new substreams before we consider the
|
||||||
/// connection faulty and disconnect. This also prevents against potential substream creation loops.
|
/// connection faulty and disable the handler. This also prevents against potential substream
|
||||||
const MAX_SUBSTREAM_CREATION: usize = 5;
|
/// creation loops.
|
||||||
|
const MAX_SUBSTREAM_ATTEMPTS: usize = 5;
|
||||||
|
|
||||||
|
pub enum Handler {
|
||||||
|
Enabled(EnabledHandler),
|
||||||
|
Disabled(DisabledHandler),
|
||||||
|
}
|
||||||
|
|
||||||
/// Protocol Handler that manages a single long-lived substream with a peer.
|
/// Protocol Handler that manages a single long-lived substream with a peer.
|
||||||
pub struct Handler {
|
pub struct EnabledHandler {
|
||||||
/// Upgrade configuration for the gossipsub protocol.
|
/// Upgrade configuration for the gossipsub protocol.
|
||||||
listen_protocol: SubstreamProtocol<ProtocolConfig, ()>,
|
listen_protocol: ProtocolConfig,
|
||||||
|
|
||||||
/// The single long-lived outbound substream.
|
/// The single long-lived outbound substream.
|
||||||
outbound_substream: Option<OutboundSubstreamState>,
|
outbound_substream: Option<OutboundSubstreamState>,
|
||||||
@ -100,11 +102,11 @@ pub struct Handler {
|
|||||||
/// requests.
|
/// requests.
|
||||||
outbound_substream_establishing: bool,
|
outbound_substream_establishing: bool,
|
||||||
|
|
||||||
/// The number of outbound substreams we have created.
|
/// The number of outbound substreams we have requested.
|
||||||
outbound_substreams_created: usize,
|
outbound_substream_attempts: usize,
|
||||||
|
|
||||||
/// The number of inbound substreams that have been created by the peer.
|
/// The number of inbound substreams that have been created by the peer.
|
||||||
inbound_substreams_created: usize,
|
inbound_substream_attempts: usize,
|
||||||
|
|
||||||
/// The type of peer this handler is associated to.
|
/// The type of peer this handler is associated to.
|
||||||
peer_kind: Option<PeerKind>,
|
peer_kind: Option<PeerKind>,
|
||||||
@ -114,27 +116,29 @@ pub struct Handler {
|
|||||||
// NOTE: Use this flag rather than checking the substream count each poll.
|
// NOTE: Use this flag rather than checking the substream count each poll.
|
||||||
peer_kind_sent: bool,
|
peer_kind_sent: bool,
|
||||||
|
|
||||||
/// If the peer doesn't support the gossipsub protocol we do not immediately disconnect.
|
last_io_activity: Instant,
|
||||||
/// Rather, we disable the handler and prevent any incoming or outgoing substreams from being
|
|
||||||
/// established.
|
|
||||||
///
|
|
||||||
/// This value is set to true to indicate the peer doesn't support gossipsub.
|
|
||||||
protocol_unsupported: bool,
|
|
||||||
|
|
||||||
/// The amount of time we allow idle connections before disconnecting.
|
/// The amount of time we keep an idle connection alive.
|
||||||
idle_timeout: Duration,
|
idle_timeout: Duration,
|
||||||
|
|
||||||
/// Collection of errors from attempting an upgrade.
|
|
||||||
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<HandlerError>>,
|
|
||||||
|
|
||||||
/// Flag determining whether to maintain the connection to the peer.
|
|
||||||
keep_alive: KeepAlive,
|
|
||||||
|
|
||||||
/// Keeps track of whether this connection is for a peer in the mesh. This is used to make
|
/// Keeps track of whether this connection is for a peer in the mesh. This is used to make
|
||||||
/// decisions about the keep alive state for this connection.
|
/// decisions about the keep alive state for this connection.
|
||||||
in_mesh: bool,
|
in_mesh: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum DisabledHandler {
|
||||||
|
/// If the peer doesn't support the gossipsub protocol we do not immediately disconnect.
|
||||||
|
/// Rather, we disable the handler and prevent any incoming or outgoing substreams from being
|
||||||
|
/// established.
|
||||||
|
ProtocolUnsupported {
|
||||||
|
/// Keeps track on whether we have sent the peer kind to the behaviour.
|
||||||
|
peer_kind_sent: bool,
|
||||||
|
},
|
||||||
|
/// The maximum number of inbound or outbound substream attempts have happened and thereby the
|
||||||
|
/// handler has been disabled.
|
||||||
|
MaxSubstreamAttempts,
|
||||||
|
}
|
||||||
|
|
||||||
/// State of the inbound substream, opened either by us or by the remote.
|
/// State of the inbound substream, opened either by us or by the remote.
|
||||||
enum InboundSubstreamState {
|
enum InboundSubstreamState {
|
||||||
/// Waiting for a message from the remote. The idle state for an inbound substream.
|
/// Waiting for a message from the remote. The idle state for an inbound substream.
|
||||||
@ -153,8 +157,6 @@ enum OutboundSubstreamState {
|
|||||||
PendingSend(Framed<NegotiatedSubstream, GossipsubCodec>, proto::RPC),
|
PendingSend(Framed<NegotiatedSubstream, GossipsubCodec>, proto::RPC),
|
||||||
/// Waiting to flush the substream so that the data arrives to the remote.
|
/// Waiting to flush the substream so that the data arrives to the remote.
|
||||||
PendingFlush(Framed<NegotiatedSubstream, GossipsubCodec>),
|
PendingFlush(Framed<NegotiatedSubstream, GossipsubCodec>),
|
||||||
/// The substream is being closed. Used by either substream.
|
|
||||||
_Closing(Framed<NegotiatedSubstream, GossipsubCodec>),
|
|
||||||
/// An error occurred during processing.
|
/// An error occurred during processing.
|
||||||
Poisoned,
|
Poisoned,
|
||||||
}
|
}
|
||||||
@ -162,120 +164,57 @@ enum OutboundSubstreamState {
|
|||||||
impl Handler {
|
impl Handler {
|
||||||
/// Builds a new [`Handler`].
|
/// Builds a new [`Handler`].
|
||||||
pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self {
|
pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self {
|
||||||
Handler {
|
Handler::Enabled(EnabledHandler {
|
||||||
listen_protocol: SubstreamProtocol::new(protocol_config, ()),
|
listen_protocol: protocol_config,
|
||||||
inbound_substream: None,
|
inbound_substream: None,
|
||||||
outbound_substream: None,
|
outbound_substream: None,
|
||||||
outbound_substream_establishing: false,
|
outbound_substream_establishing: false,
|
||||||
outbound_substreams_created: 0,
|
outbound_substream_attempts: 0,
|
||||||
inbound_substreams_created: 0,
|
inbound_substream_attempts: 0,
|
||||||
send_queue: SmallVec::new(),
|
send_queue: SmallVec::new(),
|
||||||
peer_kind: None,
|
peer_kind: None,
|
||||||
peer_kind_sent: false,
|
peer_kind_sent: false,
|
||||||
protocol_unsupported: false,
|
last_io_activity: Instant::now(),
|
||||||
idle_timeout,
|
idle_timeout,
|
||||||
upgrade_errors: VecDeque::new(),
|
|
||||||
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)),
|
|
||||||
in_mesh: false,
|
in_mesh: false,
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EnabledHandler {
|
||||||
fn on_fully_negotiated_inbound(
|
fn on_fully_negotiated_inbound(
|
||||||
&mut self,
|
&mut self,
|
||||||
FullyNegotiatedInbound { protocol, .. }: FullyNegotiatedInbound<
|
(substream, peer_kind): (Framed<NegotiatedSubstream, GossipsubCodec>, PeerKind),
|
||||||
<Self as ConnectionHandler>::InboundProtocol,
|
|
||||||
<Self as ConnectionHandler>::InboundOpenInfo,
|
|
||||||
>,
|
|
||||||
) {
|
) {
|
||||||
let (substream, peer_kind) = protocol;
|
|
||||||
|
|
||||||
// If the peer doesn't support the protocol, reject all substreams
|
|
||||||
if self.protocol_unsupported {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.inbound_substreams_created += 1;
|
|
||||||
|
|
||||||
// update the known kind of peer
|
// update the known kind of peer
|
||||||
if self.peer_kind.is_none() {
|
if self.peer_kind.is_none() {
|
||||||
self.peer_kind = Some(peer_kind);
|
self.peer_kind = Some(peer_kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
// new inbound substream. Replace the current one, if it exists.
|
// new inbound substream. Replace the current one, if it exists.
|
||||||
trace!("New inbound substream request");
|
log::trace!("New inbound substream request");
|
||||||
self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
|
self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_fully_negotiated_outbound(
|
fn on_fully_negotiated_outbound(
|
||||||
&mut self,
|
&mut self,
|
||||||
FullyNegotiatedOutbound {
|
FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound<
|
||||||
protocol,
|
<Handler as ConnectionHandler>::OutboundProtocol,
|
||||||
info: message,
|
<Handler as ConnectionHandler>::OutboundOpenInfo,
|
||||||
}: FullyNegotiatedOutbound<
|
|
||||||
<Self as ConnectionHandler>::OutboundProtocol,
|
|
||||||
<Self as ConnectionHandler>::OutboundOpenInfo,
|
|
||||||
>,
|
>,
|
||||||
) {
|
) {
|
||||||
let (substream, peer_kind) = protocol;
|
let (substream, peer_kind) = protocol;
|
||||||
|
|
||||||
// If the peer doesn't support the protocol, reject all substreams
|
|
||||||
if self.protocol_unsupported {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.outbound_substream_establishing = false;
|
|
||||||
self.outbound_substreams_created += 1;
|
|
||||||
|
|
||||||
// update the known kind of peer
|
// update the known kind of peer
|
||||||
if self.peer_kind.is_none() {
|
if self.peer_kind.is_none() {
|
||||||
self.peer_kind = Some(peer_kind);
|
self.peer_kind = Some(peer_kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Should never establish a new outbound substream if one already exists.
|
assert!(
|
||||||
// If this happens, an outbound message is not sent.
|
self.outbound_substream.is_none(),
|
||||||
if self.outbound_substream.is_some() {
|
"Established an outbound substream with one already available"
|
||||||
warn!("Established an outbound substream with one already available");
|
);
|
||||||
// Add the message back to the send queue
|
self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream));
|
||||||
self.send_queue.push(message);
|
|
||||||
} else {
|
|
||||||
self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectionHandler for Handler {
|
|
||||||
type InEvent = HandlerIn;
|
|
||||||
type OutEvent = HandlerEvent;
|
|
||||||
type Error = HandlerError;
|
|
||||||
type InboundOpenInfo = ();
|
|
||||||
type InboundProtocol = ProtocolConfig;
|
|
||||||
type OutboundOpenInfo = proto::RPC;
|
|
||||||
type OutboundProtocol = ProtocolConfig;
|
|
||||||
|
|
||||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
|
||||||
self.listen_protocol.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_behaviour_event(&mut self, message: HandlerIn) {
|
|
||||||
if !self.protocol_unsupported {
|
|
||||||
match message {
|
|
||||||
HandlerIn::Message(m) => self.send_queue.push(m),
|
|
||||||
// If we have joined the mesh, keep the connection alive.
|
|
||||||
HandlerIn::JoinedMesh => {
|
|
||||||
self.in_mesh = true;
|
|
||||||
self.keep_alive = KeepAlive::Yes;
|
|
||||||
}
|
|
||||||
// If we have left the mesh, start the idle timer.
|
|
||||||
HandlerIn::LeftMesh => {
|
|
||||||
self.in_mesh = false;
|
|
||||||
self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive {
|
|
||||||
self.keep_alive
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll(
|
fn poll(
|
||||||
@ -283,52 +222,12 @@ impl ConnectionHandler for Handler {
|
|||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<
|
) -> Poll<
|
||||||
ConnectionHandlerEvent<
|
ConnectionHandlerEvent<
|
||||||
Self::OutboundProtocol,
|
<Handler as ConnectionHandler>::OutboundProtocol,
|
||||||
Self::OutboundOpenInfo,
|
<Handler as ConnectionHandler>::OutboundOpenInfo,
|
||||||
Self::OutEvent,
|
<Handler as ConnectionHandler>::OutEvent,
|
||||||
Self::Error,
|
<Handler as ConnectionHandler>::Error,
|
||||||
>,
|
>,
|
||||||
> {
|
> {
|
||||||
// Handle any upgrade errors
|
|
||||||
if let Some(error) = self.upgrade_errors.pop_front() {
|
|
||||||
let reported_error = match error {
|
|
||||||
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
|
|
||||||
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => {
|
|
||||||
Some(HandlerError::NegotiationTimeout)
|
|
||||||
}
|
|
||||||
// There was an error post negotiation, close the connection.
|
|
||||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e),
|
|
||||||
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => {
|
|
||||||
match negotiation_error {
|
|
||||||
NegotiationError::Failed => {
|
|
||||||
// The protocol is not supported
|
|
||||||
self.protocol_unsupported = true;
|
|
||||||
if !self.peer_kind_sent {
|
|
||||||
self.peer_kind_sent = true;
|
|
||||||
// clear all substreams so the keep alive returns false
|
|
||||||
self.inbound_substream = None;
|
|
||||||
self.outbound_substream = None;
|
|
||||||
self.keep_alive = KeepAlive::No;
|
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Custom(
|
|
||||||
HandlerEvent::PeerKind(PeerKind::NotSupported),
|
|
||||||
));
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
NegotiationError::ProtocolError(e) => {
|
|
||||||
Some(HandlerError::NegotiationProtocolError(e))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// If there was a fatal error, close the connection.
|
|
||||||
if let Some(error) = reported_error {
|
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Close(error));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !self.peer_kind_sent {
|
if !self.peer_kind_sent {
|
||||||
if let Some(peer_kind) = self.peer_kind.as_ref() {
|
if let Some(peer_kind) = self.peer_kind.as_ref() {
|
||||||
self.peer_kind_sent = true;
|
self.peer_kind_sent = true;
|
||||||
@ -338,28 +237,14 @@ impl ConnectionHandler for Handler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION {
|
// determine if we need to create the outbound stream
|
||||||
// Too many inbound substreams have been created, end the connection.
|
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Close(
|
|
||||||
HandlerError::MaxInboundSubstreams,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// determine if we need to create the stream
|
|
||||||
if !self.send_queue.is_empty()
|
if !self.send_queue.is_empty()
|
||||||
&& self.outbound_substream.is_none()
|
&& self.outbound_substream.is_none()
|
||||||
&& !self.outbound_substream_establishing
|
&& !self.outbound_substream_establishing
|
||||||
{
|
{
|
||||||
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION {
|
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Close(
|
|
||||||
HandlerError::MaxOutboundSubstreams,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
let message = self.send_queue.remove(0);
|
|
||||||
self.send_queue.shrink_to_fit();
|
|
||||||
self.outbound_substream_establishing = true;
|
self.outbound_substream_establishing = true;
|
||||||
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
|
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
|
||||||
protocol: self.listen_protocol.clone().map_info(|()| message),
|
protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -372,34 +257,22 @@ impl ConnectionHandler for Handler {
|
|||||||
Some(InboundSubstreamState::WaitingInput(mut substream)) => {
|
Some(InboundSubstreamState::WaitingInput(mut substream)) => {
|
||||||
match substream.poll_next_unpin(cx) {
|
match substream.poll_next_unpin(cx) {
|
||||||
Poll::Ready(Some(Ok(message))) => {
|
Poll::Ready(Some(Ok(message))) => {
|
||||||
if !self.in_mesh {
|
self.last_io_activity = Instant::now();
|
||||||
self.keep_alive =
|
|
||||||
KeepAlive::Until(Instant::now() + self.idle_timeout);
|
|
||||||
}
|
|
||||||
self.inbound_substream =
|
self.inbound_substream =
|
||||||
Some(InboundSubstreamState::WaitingInput(substream));
|
Some(InboundSubstreamState::WaitingInput(substream));
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Custom(message));
|
return Poll::Ready(ConnectionHandlerEvent::Custom(message));
|
||||||
}
|
}
|
||||||
Poll::Ready(Some(Err(error))) => {
|
Poll::Ready(Some(Err(error))) => {
|
||||||
match error {
|
log::debug!("Failed to read from inbound stream: {error}");
|
||||||
HandlerError::MaxTransmissionSize => {
|
// Close this side of the stream. If the
|
||||||
warn!("Message exceeded the maximum transmission size");
|
// peer is still around, they will re-establish their
|
||||||
self.inbound_substream =
|
// outbound stream i.e. our inbound stream.
|
||||||
Some(InboundSubstreamState::WaitingInput(substream));
|
self.inbound_substream =
|
||||||
}
|
Some(InboundSubstreamState::Closing(substream));
|
||||||
_ => {
|
|
||||||
warn!("Inbound stream error: {}", error);
|
|
||||||
// More serious errors, close this side of the stream. If the
|
|
||||||
// peer is still around, they will re-establish their
|
|
||||||
// connection
|
|
||||||
self.inbound_substream =
|
|
||||||
Some(InboundSubstreamState::Closing(substream));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// peer closed the stream
|
// peer closed the stream
|
||||||
Poll::Ready(None) => {
|
Poll::Ready(None) => {
|
||||||
warn!("Peer closed their outbound stream");
|
log::debug!("Inbound stream closed by remote");
|
||||||
self.inbound_substream =
|
self.inbound_substream =
|
||||||
Some(InboundSubstreamState::Closing(substream));
|
Some(InboundSubstreamState::Closing(substream));
|
||||||
}
|
}
|
||||||
@ -417,12 +290,9 @@ impl ConnectionHandler for Handler {
|
|||||||
// Don't close the connection but just drop the inbound substream.
|
// Don't close the connection but just drop the inbound substream.
|
||||||
// In case the remote has more to send, they will open up a new
|
// In case the remote has more to send, they will open up a new
|
||||||
// substream.
|
// substream.
|
||||||
warn!("Inbound substream error while closing: {:?}", e);
|
log::debug!("Inbound substream error while closing: {e}");
|
||||||
}
|
}
|
||||||
self.inbound_substream = None;
|
self.inbound_substream = None;
|
||||||
if self.outbound_substream.is_none() {
|
|
||||||
self.keep_alive = KeepAlive::No;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
@ -469,23 +339,19 @@ impl ConnectionHandler for Handler {
|
|||||||
self.outbound_substream =
|
self.outbound_substream =
|
||||||
Some(OutboundSubstreamState::PendingFlush(substream))
|
Some(OutboundSubstreamState::PendingFlush(substream))
|
||||||
}
|
}
|
||||||
Err(HandlerError::MaxTransmissionSize) => {
|
|
||||||
error!("Message exceeded the maximum transmission size and was not sent.");
|
|
||||||
self.outbound_substream =
|
|
||||||
Some(OutboundSubstreamState::WaitingOutput(substream));
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error sending message: {}", e);
|
log::debug!("Failed to send message on outbound stream: {e}");
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Close(e));
|
self.outbound_substream = None;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(e)) => {
|
Poll::Ready(Err(e)) => {
|
||||||
error!("Outbound substream error while sending output: {:?}", e);
|
log::debug!("Failed to send message on outbound stream: {e}");
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Close(e));
|
self.outbound_substream = None;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
self.keep_alive = KeepAlive::Yes;
|
|
||||||
self.outbound_substream =
|
self.outbound_substream =
|
||||||
Some(OutboundSubstreamState::PendingSend(substream, message));
|
Some(OutboundSubstreamState::PendingSend(substream, message));
|
||||||
break;
|
break;
|
||||||
@ -495,53 +361,22 @@ impl ConnectionHandler for Handler {
|
|||||||
Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
|
Some(OutboundSubstreamState::PendingFlush(mut substream)) => {
|
||||||
match Sink::poll_flush(Pin::new(&mut substream), cx) {
|
match Sink::poll_flush(Pin::new(&mut substream), cx) {
|
||||||
Poll::Ready(Ok(())) => {
|
Poll::Ready(Ok(())) => {
|
||||||
if !self.in_mesh {
|
self.last_io_activity = Instant::now();
|
||||||
// if not in the mesh, reset the idle timeout
|
|
||||||
self.keep_alive =
|
|
||||||
KeepAlive::Until(Instant::now() + self.idle_timeout);
|
|
||||||
}
|
|
||||||
self.outbound_substream =
|
self.outbound_substream =
|
||||||
Some(OutboundSubstreamState::WaitingOutput(substream))
|
Some(OutboundSubstreamState::WaitingOutput(substream))
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(e)) => {
|
Poll::Ready(Err(e)) => {
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Close(e))
|
log::debug!("Failed to flush outbound stream: {e}");
|
||||||
|
self.outbound_substream = None;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
self.keep_alive = KeepAlive::Yes;
|
|
||||||
self.outbound_substream =
|
self.outbound_substream =
|
||||||
Some(OutboundSubstreamState::PendingFlush(substream));
|
Some(OutboundSubstreamState::PendingFlush(substream));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Currently never used - manual shutdown may implement this in the future
|
|
||||||
Some(OutboundSubstreamState::_Closing(mut substream)) => {
|
|
||||||
match Sink::poll_close(Pin::new(&mut substream), cx) {
|
|
||||||
Poll::Ready(Ok(())) => {
|
|
||||||
self.outbound_substream = None;
|
|
||||||
if self.inbound_substream.is_none() {
|
|
||||||
self.keep_alive = KeepAlive::No;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Poll::Ready(Err(e)) => {
|
|
||||||
warn!("Outbound substream error while closing: {:?}", e);
|
|
||||||
return Poll::Ready(ConnectionHandlerEvent::Close(
|
|
||||||
io::Error::new(
|
|
||||||
io::ErrorKind::BrokenPipe,
|
|
||||||
"Failed to close outbound substream",
|
|
||||||
)
|
|
||||||
.into(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Poll::Pending => {
|
|
||||||
self.keep_alive = KeepAlive::No;
|
|
||||||
self.outbound_substream =
|
|
||||||
Some(OutboundSubstreamState::_Closing(substream));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {
|
None => {
|
||||||
self.outbound_substream = None;
|
self.outbound_substream = None;
|
||||||
break;
|
break;
|
||||||
@ -554,6 +389,92 @@ impl ConnectionHandler for Handler {
|
|||||||
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionHandler for Handler {
|
||||||
|
type InEvent = HandlerIn;
|
||||||
|
type OutEvent = HandlerEvent;
|
||||||
|
type Error = Void;
|
||||||
|
type InboundOpenInfo = ();
|
||||||
|
type InboundProtocol = either::Either<ProtocolConfig, DeniedUpgrade>;
|
||||||
|
type OutboundOpenInfo = ();
|
||||||
|
type OutboundProtocol = ProtocolConfig;
|
||||||
|
|
||||||
|
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||||
|
match self {
|
||||||
|
Handler::Enabled(handler) => {
|
||||||
|
SubstreamProtocol::new(either::Either::Left(handler.listen_protocol.clone()), ())
|
||||||
|
}
|
||||||
|
Handler::Disabled(_) => {
|
||||||
|
SubstreamProtocol::new(either::Either::Right(DeniedUpgrade), ())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_behaviour_event(&mut self, message: HandlerIn) {
|
||||||
|
match self {
|
||||||
|
Handler::Enabled(handler) => match message {
|
||||||
|
HandlerIn::Message(m) => handler.send_queue.push(m),
|
||||||
|
HandlerIn::JoinedMesh => {
|
||||||
|
handler.in_mesh = true;
|
||||||
|
}
|
||||||
|
HandlerIn::LeftMesh => {
|
||||||
|
handler.in_mesh = false;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Handler::Disabled(_) => {
|
||||||
|
log::debug!("Handler is disabled. Dropping message {:?}", message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connection_keep_alive(&self) -> KeepAlive {
|
||||||
|
match self {
|
||||||
|
Handler::Enabled(handler) => {
|
||||||
|
if handler.in_mesh {
|
||||||
|
return KeepAlive::Yes;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(
|
||||||
|
OutboundSubstreamState::PendingSend(_, _)
|
||||||
|
| OutboundSubstreamState::PendingFlush(_),
|
||||||
|
) = handler.outbound_substream
|
||||||
|
{
|
||||||
|
return KeepAlive::Yes;
|
||||||
|
}
|
||||||
|
|
||||||
|
KeepAlive::Until(handler.last_io_activity + handler.idle_timeout)
|
||||||
|
}
|
||||||
|
Handler::Disabled(_) => KeepAlive::No,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<
|
||||||
|
ConnectionHandlerEvent<
|
||||||
|
Self::OutboundProtocol,
|
||||||
|
Self::OutboundOpenInfo,
|
||||||
|
Self::OutEvent,
|
||||||
|
Self::Error,
|
||||||
|
>,
|
||||||
|
> {
|
||||||
|
match self {
|
||||||
|
Handler::Enabled(handler) => handler.poll(cx),
|
||||||
|
Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => {
|
||||||
|
if !*peer_kind_sent {
|
||||||
|
*peer_kind_sent = true;
|
||||||
|
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind(
|
||||||
|
PeerKind::NotSupported,
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
Handler::Disabled(DisabledHandler::MaxSubstreamAttempts) => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn on_connection_event(
|
fn on_connection_event(
|
||||||
&mut self,
|
&mut self,
|
||||||
@ -564,19 +485,83 @@ impl ConnectionHandler for Handler {
|
|||||||
Self::OutboundOpenInfo,
|
Self::OutboundOpenInfo,
|
||||||
>,
|
>,
|
||||||
) {
|
) {
|
||||||
match event {
|
match self {
|
||||||
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
|
Handler::Enabled(handler) => {
|
||||||
self.on_fully_negotiated_inbound(fully_negotiated_inbound)
|
if event.is_inbound() {
|
||||||
|
handler.inbound_substream_attempts += 1;
|
||||||
|
|
||||||
|
if handler.inbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
|
||||||
|
log::warn!(
|
||||||
|
"The maximum number of inbound substreams attempts has been exceeded"
|
||||||
|
);
|
||||||
|
*self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.is_outbound() {
|
||||||
|
handler.outbound_substream_establishing = false;
|
||||||
|
|
||||||
|
handler.outbound_substream_attempts += 1;
|
||||||
|
|
||||||
|
if handler.outbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS {
|
||||||
|
log::warn!(
|
||||||
|
"The maximum number of outbound substream attempts has been exceeded"
|
||||||
|
);
|
||||||
|
*self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match event {
|
||||||
|
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
|
||||||
|
protocol,
|
||||||
|
..
|
||||||
|
}) => match protocol {
|
||||||
|
Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol),
|
||||||
|
Either::Right(v) => void::unreachable(v),
|
||||||
|
},
|
||||||
|
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
|
||||||
|
handler.on_fully_negotiated_outbound(fully_negotiated_outbound)
|
||||||
|
}
|
||||||
|
ConnectionEvent::DialUpgradeError(DialUpgradeError {
|
||||||
|
error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer,
|
||||||
|
..
|
||||||
|
}) => {
|
||||||
|
log::debug!("Dial upgrade error: Protocol negotiation timeout");
|
||||||
|
}
|
||||||
|
ConnectionEvent::DialUpgradeError(DialUpgradeError {
|
||||||
|
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
|
||||||
|
..
|
||||||
|
}) => void::unreachable(e),
|
||||||
|
ConnectionEvent::DialUpgradeError(DialUpgradeError {
|
||||||
|
error:
|
||||||
|
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
|
||||||
|
NegotiationError::Failed,
|
||||||
|
)),
|
||||||
|
..
|
||||||
|
}) => {
|
||||||
|
// The protocol is not supported
|
||||||
|
log::debug!(
|
||||||
|
"The remote peer does not support gossipsub on this connection"
|
||||||
|
);
|
||||||
|
*self = Handler::Disabled(DisabledHandler::ProtocolUnsupported {
|
||||||
|
peer_kind_sent: false,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
ConnectionEvent::DialUpgradeError(DialUpgradeError {
|
||||||
|
error:
|
||||||
|
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
|
||||||
|
NegotiationError::ProtocolError(e),
|
||||||
|
)),
|
||||||
|
..
|
||||||
|
}) => {
|
||||||
|
log::debug!("Protocol negotiation failed: {e}")
|
||||||
|
}
|
||||||
|
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
|
Handler::Disabled(_) => {}
|
||||||
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
|
|
||||||
}
|
|
||||||
ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => {
|
|
||||||
self.outbound_substream_establishing = false;
|
|
||||||
warn!("Dial upgrade error {:?}", e);
|
|
||||||
self.upgrade_errors.push_back(e);
|
|
||||||
}
|
|
||||||
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,9 +158,12 @@ mod types;
|
|||||||
|
|
||||||
mod rpc_proto;
|
mod rpc_proto;
|
||||||
|
|
||||||
|
#[deprecated(note = "This error will no longer be emitted")]
|
||||||
|
pub type HandlerError = error_priv::HandlerError;
|
||||||
|
|
||||||
pub use self::behaviour::{Behaviour, Event, MessageAuthenticity};
|
pub use self::behaviour::{Behaviour, Event, MessageAuthenticity};
|
||||||
pub use self::config::{Config, ConfigBuilder, ValidationMode, Version};
|
pub use self::config::{Config, ConfigBuilder, ValidationMode, Version};
|
||||||
pub use self::error_priv::{HandlerError, PublishError, SubscriptionError, ValidationError};
|
pub use self::error_priv::{PublishError, SubscriptionError, ValidationError};
|
||||||
pub use self::peer_score::{
|
pub use self::peer_score::{
|
||||||
score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
|
score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
|
||||||
TopicScoreParams,
|
TopicScoreParams,
|
||||||
|
@ -24,8 +24,8 @@ use crate::topic::TopicHash;
|
|||||||
use crate::types::{
|
use crate::types::{
|
||||||
ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction,
|
ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction,
|
||||||
};
|
};
|
||||||
|
use crate::ValidationError;
|
||||||
use crate::{rpc_proto::proto, Config};
|
use crate::{rpc_proto::proto, Config};
|
||||||
use crate::{HandlerError, ValidationError};
|
|
||||||
use asynchronous_codec::{Decoder, Encoder, Framed};
|
use asynchronous_codec::{Decoder, Encoder, Framed};
|
||||||
use byteorder::{BigEndian, ByteOrder};
|
use byteorder::{BigEndian, ByteOrder};
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
@ -37,6 +37,7 @@ use log::{debug, warn};
|
|||||||
use quick_protobuf::Writer;
|
use quick_protobuf::Writer;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use unsigned_varint::codec;
|
use unsigned_varint::codec;
|
||||||
|
use void::Void;
|
||||||
|
|
||||||
pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
|
pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
|
||||||
|
|
||||||
@ -147,7 +148,7 @@ where
|
|||||||
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
|
type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
|
||||||
type Error = HandlerError;
|
type Error = Void;
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||||
|
|
||||||
fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
|
fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
|
||||||
@ -168,7 +169,7 @@ where
|
|||||||
TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
|
TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
|
type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
|
||||||
type Error = HandlerError;
|
type Error = Void;
|
||||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||||
|
|
||||||
fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
|
fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
|
||||||
@ -268,18 +269,18 @@ impl GossipsubCodec {
|
|||||||
|
|
||||||
impl Encoder for GossipsubCodec {
|
impl Encoder for GossipsubCodec {
|
||||||
type Item = proto::RPC;
|
type Item = proto::RPC;
|
||||||
type Error = HandlerError;
|
type Error = quick_protobuf_codec::Error;
|
||||||
|
|
||||||
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), HandlerError> {
|
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||||
Ok(self.codec.encode(item, dst)?)
|
self.codec.encode(item, dst)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Decoder for GossipsubCodec {
|
impl Decoder for GossipsubCodec {
|
||||||
type Item = HandlerEvent;
|
type Item = HandlerEvent;
|
||||||
type Error = HandlerError;
|
type Error = quick_protobuf_codec::Error;
|
||||||
|
|
||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, HandlerError> {
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
let rpc = match self.codec.decode(src)? {
|
let rpc = match self.codec.decode(src)? {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => return Ok(None),
|
None => return Ok(None),
|
||||||
|
@ -1,3 +1,9 @@
|
|||||||
|
## 0.42.2 - unreleased
|
||||||
|
|
||||||
|
- Add `ConnectionEvent::{is_outbound,is_inbound}`. See [PR 3625].
|
||||||
|
|
||||||
|
[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3625
|
||||||
|
|
||||||
## 0.42.1
|
## 0.42.1
|
||||||
|
|
||||||
- Deprecate `ConnectionLimits` in favor of `libp2p::connection_limits`.
|
- Deprecate `ConnectionLimits` in favor of `libp2p::connection_limits`.
|
||||||
|
@ -3,7 +3,7 @@ name = "libp2p-swarm"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
rust-version = "1.62.0"
|
rust-version = "1.62.0"
|
||||||
description = "The libp2p swarm"
|
description = "The libp2p swarm"
|
||||||
version = "0.42.1"
|
version = "0.42.2"
|
||||||
authors = ["Parity Technologies <admin@parity.io>"]
|
authors = ["Parity Technologies <admin@parity.io>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
repository = "https://github.com/libp2p/rust-libp2p"
|
repository = "https://github.com/libp2p/rust-libp2p"
|
||||||
|
@ -212,6 +212,38 @@ pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IO
|
|||||||
ListenUpgradeError(ListenUpgradeError<IOI, IP>),
|
ListenUpgradeError(ListenUpgradeError<IOI, IP>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI>
|
||||||
|
ConnectionEvent<'a, IP, OP, IOI, OOI>
|
||||||
|
{
|
||||||
|
/// Whether the event concerns an outbound stream.
|
||||||
|
pub fn is_outbound(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
ConnectionEvent::FullyNegotiatedInbound(_)
|
||||||
|
| ConnectionEvent::AddressChange(_)
|
||||||
|
| ConnectionEvent::ListenUpgradeError(_) => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Whether the event concerns an inbound stream.
|
||||||
|
pub fn is_inbound(&self) -> bool {
|
||||||
|
// Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605.
|
||||||
|
match self {
|
||||||
|
ConnectionEvent::FullyNegotiatedInbound(_)
|
||||||
|
| ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
|
||||||
|
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too.
|
||||||
|
..
|
||||||
|
}) => true,
|
||||||
|
ConnectionEvent::FullyNegotiatedOutbound(_)
|
||||||
|
| ConnectionEvent::ListenUpgradeError(_)
|
||||||
|
| ConnectionEvent::AddressChange(_)
|
||||||
|
| ConnectionEvent::DialUpgradeError(_) => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// [`ConnectionEvent`] variant that informs the handler about
|
/// [`ConnectionEvent`] variant that informs the handler about
|
||||||
/// the output of a successful upgrade on a new inbound substream.
|
/// the output of a successful upgrade on a new inbound substream.
|
||||||
///
|
///
|
||||||
|
Reference in New Issue
Block a user