mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-21 22:01:34 +00:00
protocols/gossipsub: Add Gossipsub v1.1 support
This commit upgrades the current gossipsub implementation to support the [v1.1 spec](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md). It adds a number of features, bug fixes and performance improvements. Besides support for all new 1.1 features, other improvements that are of particular note: - Improved duplicate LRU-time cache (this was previously a severe bottleneck for large message throughput topics) - Extended message validation configuration options - Arbitrary topics (users can now implement their own hashing schemes) - Improved message validation handling - Invalid messages are no longer dropped but sent to the behaviour for application-level processing (including scoring) - Support for floodsub, gossipsub v1 and gossipsub v2 - Protobuf encoding has been shifted into the behaviour. This has permitted two improvements: 1. Message size verification during publishing (report to the user if the message is too large before attempting to send). 2. Message fragmentation. If an RPC is too large it is fragmented into its sub components and sent in smaller chunks. Additional Notes The peer eXchange protocol defined in the v1.1 spec is inactive in its current form. The current implementation permits sending `PeerId` in `PRUNE` messages, however a `PeerId` is not sufficient to form a new connection to a peer. A `Signed Address Record` is required to safely transmit peer identity information. Once these are confirmed (https://github.com/libp2p/specs/pull/217) a future PR will implement these and make PX usable. Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: Rüdiger Klaehn <rklaehn@protonmail.com> Co-authored-by: blacktemplar <blacktemplar@a1.net> Co-authored-by: Rüdiger Klaehn <rklaehn@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Roman S. Borschel <roman@parity.io> Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> Co-authored-by: David Craven <david@craven.ch>
This commit is contained in:
@ -18,24 +18,56 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::behaviour::GossipsubRpc;
|
||||
use crate::config::ValidationMode;
|
||||
use crate::error::{GossipsubHandlerError, ValidationError};
|
||||
use crate::protocol::{GossipsubCodec, ProtocolConfig};
|
||||
use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage};
|
||||
use futures::prelude::*;
|
||||
use futures::StreamExt;
|
||||
use futures_codec::Framed;
|
||||
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade};
|
||||
use libp2p_core::upgrade::{InboundUpgrade, NegotiationError, OutboundUpgrade, UpgradeError};
|
||||
use libp2p_swarm::protocols_handler::{
|
||||
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
|
||||
};
|
||||
use libp2p_swarm::NegotiatedSubstream;
|
||||
use log::{debug, error, trace, warn};
|
||||
use log::{error, trace, warn};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::VecDeque,
|
||||
io,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
/// The initial time (in seconds) we set the keep alive for protocol negotiations to occur.
|
||||
const INITIAL_KEEP_ALIVE: u64 = 30;
|
||||
|
||||
/// The event emitted by the Handler. This informs the behaviour of various events created
|
||||
/// by the handler.
|
||||
#[derive(Debug)]
|
||||
pub enum HandlerEvent {
|
||||
/// A GossipsubRPC message has been received. This also contains a list of invalid messages (if
|
||||
/// any) that were received.
|
||||
Message {
|
||||
/// The GossipsubRPC message excluding any invalid messages.
|
||||
rpc: GossipsubRpc,
|
||||
/// Any invalid messages that were received in the RPC, along with the associated
|
||||
/// validation error.
|
||||
invalid_messages: Vec<(RawGossipsubMessage, ValidationError)>,
|
||||
},
|
||||
/// An inbound or outbound substream has been established with the peer and this informs over
|
||||
/// which protocol. This message only occurs once per connection.
|
||||
PeerKind(PeerKind),
|
||||
}
|
||||
|
||||
/// The maximum number of substreams we accept or create before disconnecting from the peer.
|
||||
///
|
||||
/// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we
|
||||
/// attempt to recreate these. This imposes an upper bound of new substreams before we consider the
|
||||
/// connection faulty and disconnect. This also prevents against potential substream creation loops.
|
||||
const MAX_SUBSTREAM_CREATION: usize = 5;
|
||||
|
||||
/// Protocol Handler that manages a single long-lived substream with a peer.
|
||||
pub struct GossipsubHandler {
|
||||
@ -49,12 +81,36 @@ pub struct GossipsubHandler {
|
||||
inbound_substream: Option<InboundSubstreamState>,
|
||||
|
||||
/// Queue of values that we want to send to the remote.
|
||||
send_queue: SmallVec<[GossipsubRpc; 16]>,
|
||||
send_queue: SmallVec<[crate::rpc_proto::Rpc; 16]>,
|
||||
|
||||
/// Flag indicating that an outbound substream is being established to prevent duplicate
|
||||
/// requests.
|
||||
outbound_substream_establishing: bool,
|
||||
|
||||
/// The number of outbound substreams we have created.
|
||||
outbound_substreams_created: usize,
|
||||
|
||||
/// The number of inbound substreams that have been created by the peer.
|
||||
inbound_substreams_created: usize,
|
||||
|
||||
/// The type of peer this handler is associated to.
|
||||
peer_kind: Option<PeerKind>,
|
||||
|
||||
/// Keeps track on whether we have sent the peer kind to the behaviour.
|
||||
//
|
||||
// NOTE: Use this flag rather than checking the substream count each poll.
|
||||
peer_kind_sent: bool,
|
||||
|
||||
/// If the peer doesn't support the gossipsub protocol we do not immediately disconnect.
|
||||
/// Rather, we disable the handler and prevent any incoming or outgoing substreams from being
|
||||
/// established.
|
||||
///
|
||||
/// This value is set to true to indicate the peer doesn't support gossipsub.
|
||||
protocol_unsupported: bool,
|
||||
|
||||
/// Collection of errors from attempting an upgrade.
|
||||
upgrade_errors: VecDeque<ProtocolsHandlerUpgrErr<GossipsubHandlerError>>,
|
||||
|
||||
/// Flag determining whether to maintain the connection to the peer.
|
||||
keep_alive: KeepAlive,
|
||||
}
|
||||
@ -74,7 +130,10 @@ enum OutboundSubstreamState {
|
||||
/// Waiting for the user to send a message. The idle state for an outbound substream.
|
||||
WaitingOutput(Framed<NegotiatedSubstream, GossipsubCodec>),
|
||||
/// Waiting to send a message to the remote.
|
||||
PendingSend(Framed<NegotiatedSubstream, GossipsubCodec>, GossipsubRpc),
|
||||
PendingSend(
|
||||
Framed<NegotiatedSubstream, GossipsubCodec>,
|
||||
crate::rpc_proto::Rpc,
|
||||
),
|
||||
/// Waiting to flush the substream so that the data arrives to the remote.
|
||||
PendingFlush(Framed<NegotiatedSubstream, GossipsubCodec>),
|
||||
/// The substream is being closed. Used by either substream.
|
||||
@ -84,35 +143,46 @@ enum OutboundSubstreamState {
|
||||
}
|
||||
|
||||
impl GossipsubHandler {
|
||||
/// Builds a new `GossipsubHandler`.
|
||||
/// Builds a new [`GossipsubHandler`].
|
||||
pub fn new(
|
||||
protocol_id: impl Into<Cow<'static, [u8]>>,
|
||||
protocol_id_prefix: std::borrow::Cow<'static, str>,
|
||||
max_transmit_size: usize,
|
||||
validation_mode: ValidationMode,
|
||||
support_floodsub: bool,
|
||||
) -> Self {
|
||||
GossipsubHandler {
|
||||
listen_protocol: SubstreamProtocol::new(ProtocolConfig::new(
|
||||
protocol_id,
|
||||
max_transmit_size,
|
||||
validation_mode,
|
||||
), ()),
|
||||
listen_protocol: SubstreamProtocol::new(
|
||||
ProtocolConfig::new(
|
||||
protocol_id_prefix,
|
||||
max_transmit_size,
|
||||
validation_mode,
|
||||
support_floodsub,
|
||||
),
|
||||
(),
|
||||
),
|
||||
inbound_substream: None,
|
||||
outbound_substream: None,
|
||||
outbound_substream_establishing: false,
|
||||
outbound_substreams_created: 0,
|
||||
inbound_substreams_created: 0,
|
||||
send_queue: SmallVec::new(),
|
||||
keep_alive: KeepAlive::Yes,
|
||||
peer_kind: None,
|
||||
peer_kind_sent: false,
|
||||
protocol_unsupported: false,
|
||||
upgrade_errors: VecDeque::new(),
|
||||
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProtocolsHandler for GossipsubHandler {
|
||||
type InEvent = GossipsubRpc;
|
||||
type OutEvent = GossipsubRpc;
|
||||
type Error = io::Error;
|
||||
type InboundProtocol = ProtocolConfig;
|
||||
type OutboundProtocol = ProtocolConfig;
|
||||
type OutboundOpenInfo = GossipsubRpc;
|
||||
type InEvent = crate::rpc_proto::Rpc;
|
||||
type OutEvent = HandlerEvent;
|
||||
type Error = GossipsubHandlerError;
|
||||
type InboundOpenInfo = ();
|
||||
type InboundProtocol = ProtocolConfig;
|
||||
type OutboundOpenInfo = Self::InEvent;
|
||||
type OutboundProtocol = ProtocolConfig;
|
||||
|
||||
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
|
||||
self.listen_protocol.clone()
|
||||
@ -120,9 +190,21 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
|
||||
fn inject_fully_negotiated_inbound(
|
||||
&mut self,
|
||||
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
_info: Self::InboundOpenInfo
|
||||
(substream, peer_kind): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
_info: Self::InboundOpenInfo,
|
||||
) {
|
||||
// If the peer doesn't support the protocol, reject all substreams
|
||||
if self.protocol_unsupported {
|
||||
return;
|
||||
}
|
||||
|
||||
self.inbound_substreams_created += 1;
|
||||
|
||||
// update the known kind of peer
|
||||
if self.peer_kind.is_none() {
|
||||
self.peer_kind = Some(peer_kind);
|
||||
}
|
||||
|
||||
// new inbound substream. Replace the current one, if it exists.
|
||||
trace!("New inbound substream request");
|
||||
self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
|
||||
@ -130,10 +212,22 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
|
||||
fn inject_fully_negotiated_outbound(
|
||||
&mut self,
|
||||
substream: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
(substream, peer_kind): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
|
||||
message: Self::OutboundOpenInfo,
|
||||
) {
|
||||
// If the peer doesn't support the protocol, reject all substreams
|
||||
if self.protocol_unsupported {
|
||||
return;
|
||||
}
|
||||
|
||||
self.outbound_substream_establishing = false;
|
||||
self.outbound_substreams_created += 1;
|
||||
|
||||
// update the known kind of peer
|
||||
if self.peer_kind.is_none() {
|
||||
self.peer_kind = Some(peer_kind);
|
||||
}
|
||||
|
||||
// Should never establish a new outbound substream if one already exists.
|
||||
// If this happens, an outbound message is not sent.
|
||||
if self.outbound_substream.is_some() {
|
||||
@ -145,21 +239,22 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_event(&mut self, message: GossipsubRpc) {
|
||||
self.send_queue.push(message);
|
||||
fn inject_event(&mut self, message: crate::rpc_proto::Rpc) {
|
||||
if !self.protocol_unsupported {
|
||||
self.send_queue.push(message);
|
||||
}
|
||||
}
|
||||
|
||||
fn inject_dial_upgrade_error(
|
||||
&mut self,
|
||||
_: Self::OutboundOpenInfo,
|
||||
_: ProtocolsHandlerUpgrErr<
|
||||
e: ProtocolsHandlerUpgrErr<
|
||||
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
|
||||
>,
|
||||
) {
|
||||
self.outbound_substream_establishing = false;
|
||||
// Ignore upgrade errors for now.
|
||||
// If a peer doesn't support this protocol, this will just ignore them, but not disconnect
|
||||
// them.
|
||||
warn!("Dial upgrade error {:?}", e);
|
||||
self.upgrade_errors.push_back(e);
|
||||
}
|
||||
|
||||
fn connection_keep_alive(&self) -> KeepAlive {
|
||||
@ -177,16 +272,77 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
Self::Error,
|
||||
>,
|
||||
> {
|
||||
// Handle any upgrade errors
|
||||
if let Some(error) = self.upgrade_errors.pop_front() {
|
||||
let reported_error = match error {
|
||||
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
|
||||
ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {
|
||||
Some(GossipsubHandlerError::NegotiationTimeout)
|
||||
}
|
||||
// There was an error post negotiation, close the connection.
|
||||
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e),
|
||||
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => {
|
||||
match negotiation_error {
|
||||
NegotiationError::Failed => {
|
||||
// The protocol is not supported
|
||||
self.protocol_unsupported = true;
|
||||
if !self.peer_kind_sent {
|
||||
self.peer_kind_sent = true;
|
||||
// clear all substreams so the keep alive returns false
|
||||
self.inbound_substream = None;
|
||||
self.outbound_substream = None;
|
||||
self.keep_alive = KeepAlive::No;
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(
|
||||
HandlerEvent::PeerKind(PeerKind::NotSupported),
|
||||
));
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
NegotiationError::ProtocolError(e) => {
|
||||
Some(GossipsubHandlerError::NegotiationProtocolError(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// If there was a fatal error, close the connection.
|
||||
if let Some(error) = reported_error {
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(error));
|
||||
}
|
||||
}
|
||||
|
||||
if !self.peer_kind_sent {
|
||||
if let Some(peer_kind) = self.peer_kind.as_ref() {
|
||||
self.peer_kind_sent = true;
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(HandlerEvent::PeerKind(
|
||||
peer_kind.clone(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION {
|
||||
// Too many inbound substreams have been created, end the connection.
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(
|
||||
GossipsubHandlerError::MaxInboundSubstreams,
|
||||
));
|
||||
}
|
||||
|
||||
// determine if we need to create the stream
|
||||
if !self.send_queue.is_empty()
|
||||
&& self.outbound_substream.is_none()
|
||||
&& !self.outbound_substream_establishing
|
||||
{
|
||||
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION {
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(
|
||||
GossipsubHandlerError::MaxOutboundSubstreams,
|
||||
));
|
||||
}
|
||||
let message = self.send_queue.remove(0);
|
||||
self.send_queue.shrink_to_fit();
|
||||
self.outbound_substream_establishing = true;
|
||||
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: self.listen_protocol.clone().map_info(|()| message)
|
||||
protocol: self.listen_protocol.clone().map_info(|()| message),
|
||||
});
|
||||
}
|
||||
|
||||
@ -203,15 +359,15 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
Some(InboundSubstreamState::WaitingInput(substream));
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Custom(message));
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::InvalidData => {
|
||||
// Invalid message, ignore it and reset to waiting
|
||||
warn!("Invalid message received. Error: {}", e);
|
||||
Poll::Ready(Some(Err(error))) => {
|
||||
match error {
|
||||
GossipsubHandlerError::MaxTransmissionSize => {
|
||||
warn!("Message exceeded the maximum transmission size");
|
||||
self.inbound_substream =
|
||||
Some(InboundSubstreamState::WaitingInput(substream));
|
||||
}
|
||||
_ => {
|
||||
warn!("Inbound stream error: {}", error);
|
||||
// More serious errors, close this side of the stream. If the
|
||||
// peer is still around, they will re-establish their
|
||||
// connection
|
||||
@ -222,6 +378,7 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
}
|
||||
// peer closed the stream
|
||||
Poll::Ready(None) => {
|
||||
warn!("Peer closed their outbound stream");
|
||||
self.inbound_substream =
|
||||
Some(InboundSubstreamState::Closing(substream));
|
||||
}
|
||||
@ -239,7 +396,7 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
// Don't close the connection but just drop the inbound substream.
|
||||
// In case the remote has more to send, they will open up a new
|
||||
// substream.
|
||||
debug!("Inbound substream error while closing: {:?}", e);
|
||||
warn!("Inbound substream error while closing: {:?}", e);
|
||||
}
|
||||
|
||||
self.inbound_substream = None;
|
||||
@ -265,6 +422,7 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// process outbound stream
|
||||
loop {
|
||||
match std::mem::replace(
|
||||
&mut self.outbound_substream,
|
||||
@ -291,19 +449,19 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
self.outbound_substream =
|
||||
Some(OutboundSubstreamState::PendingFlush(substream))
|
||||
}
|
||||
Err(GossipsubHandlerError::MaxTransmissionSize) => {
|
||||
error!("Message exceeded the maximum transmission size and was not sent.");
|
||||
self.outbound_substream =
|
||||
Some(OutboundSubstreamState::WaitingOutput(substream));
|
||||
}
|
||||
Err(e) => {
|
||||
if let io::ErrorKind::PermissionDenied = e.kind() {
|
||||
error!("Message over the maximum transmission limit was not sent.");
|
||||
self.outbound_substream =
|
||||
Some(OutboundSubstreamState::WaitingOutput(substream));
|
||||
} else {
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
|
||||
}
|
||||
error!("Error sending message: {}", e);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
debug!("Outbound substream error while sending output: {:?}", e);
|
||||
error!("Outbound substream error while sending output: {:?}", e);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
|
||||
}
|
||||
Poll::Pending => {
|
||||
@ -338,11 +496,14 @@ impl ProtocolsHandler for GossipsubHandler {
|
||||
break;
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
debug!("Outbound substream error while closing: {:?}", e);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(io::Error::new(
|
||||
io::ErrorKind::BrokenPipe,
|
||||
"Failed to close outbound substream",
|
||||
)));
|
||||
warn!("Outbound substream error while closing: {:?}", e);
|
||||
return Poll::Ready(ProtocolsHandlerEvent::Close(
|
||||
io::Error::new(
|
||||
io::ErrorKind::BrokenPipe,
|
||||
"Failed to close outbound substream",
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.outbound_substream =
|
||||
|
Reference in New Issue
Block a user