protocols/gossipsub: Make use of prost-codec (#3070)

This commit is contained in:
Nick Loadholtes
2022-11-04 06:12:08 -04:00
committed by GitHub
parent b528d336cd
commit 1ba9e4579d
4 changed files with 31 additions and 50 deletions

View File

@ -4,6 +4,12 @@
- Update to `libp2p-swarm` `v0.41.0`. - Update to `libp2p-swarm` `v0.41.0`.
- Update to `prost-codec` `v0.3.0`.
- Refactoring GossipsubCodec to use common protobuf Codec. See [PR 3070].
[PR 3070]: https://github.com/libp2p/rust-libp2p/pull/3070
# 0.42.0 # 0.42.0
- Bump rand to 0.8 and quickcheck to 1. See [PR 2857]. - Bump rand to 0.8 and quickcheck to 1. See [PR 2857].

View File

@ -25,9 +25,11 @@ sha2 = "0.10.0"
base64 = "0.13.0" base64 = "0.13.0"
smallvec = "1.6.1" smallvec = "1.6.1"
prost = "0.11" prost = "0.11"
prost-codec = { version = "0.3", path = "../../misc/prost-codec" }
hex_fmt = "0.3.0" hex_fmt = "0.3.0"
regex = "1.5.5" regex = "1.5.5"
serde = { version = "1", optional = true, features = ["derive"] } serde = { version = "1", optional = true, features = ["derive"] }
thiserror = "1.0"
wasm-timer = "0.2.5" wasm-timer = "0.2.5"
instant = "0.1.11" instant = "0.1.11"
# Metrics dependencies # Metrics dependencies

View File

@ -22,7 +22,7 @@
use libp2p_core::identity::error::SigningError; use libp2p_core::identity::error::SigningError;
use libp2p_core::upgrade::ProtocolError; use libp2p_core::upgrade::ProtocolError;
use std::fmt; use thiserror::Error;
/// Error associated with publishing a gossipsub message. /// Error associated with publishing a gossipsub message.
#[derive(Debug)] #[derive(Debug)]
@ -87,20 +87,20 @@ impl From<SigningError> for PublishError {
} }
/// Errors that can occur in the protocols handler. /// Errors that can occur in the protocols handler.
#[derive(Debug)] #[derive(Debug, Error)]
pub enum GossipsubHandlerError { pub enum GossipsubHandlerError {
/// The maximum number of inbound substreams created has been exceeded. #[error("The maximum number of inbound substreams created has been exceeded.")]
MaxInboundSubstreams, MaxInboundSubstreams,
/// The maximum number of outbound substreams created has been exceeded. #[error("The maximum number of outbound substreams created has been exceeded.")]
MaxOutboundSubstreams, MaxOutboundSubstreams,
/// The message exceeds the maximum transmission size. #[error("The message exceeds the maximum transmission size.")]
MaxTransmissionSize, MaxTransmissionSize,
/// Protocol negotiation timeout. #[error("Protocol negotiation timeout.")]
NegotiationTimeout, NegotiationTimeout,
/// Protocol negotiation failed. #[error("Protocol negotiation failed.")]
NegotiationProtocolError(ProtocolError), NegotiationProtocolError(ProtocolError),
/// IO error. #[error("Failed to encode or decode")]
Io(std::io::Error), Codec(#[from] prost_codec::Error),
} }
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
@ -136,7 +136,7 @@ impl std::error::Error for ValidationError {}
impl From<std::io::Error> for GossipsubHandlerError { impl From<std::io::Error> for GossipsubHandlerError {
fn from(error: std::io::Error) -> GossipsubHandlerError { fn from(error: std::io::Error) -> GossipsubHandlerError {
GossipsubHandlerError::Io(error) GossipsubHandlerError::Codec(prost_codec::Error::from(error))
} }
} }
@ -145,18 +145,3 @@ impl From<std::io::Error> for PublishError {
PublishError::TransformFailed(error) PublishError::TransformFailed(error)
} }
} }
impl fmt::Display for GossipsubHandlerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for GossipsubHandlerError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
GossipsubHandlerError::Io(io) => Some(io),
_ => None,
}
}
}

View File

@ -29,7 +29,6 @@ use crate::types::{
}; };
use asynchronous_codec::{Decoder, Encoder, Framed}; use asynchronous_codec::{Decoder, Encoder, Framed};
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use bytes::BytesMut; use bytes::BytesMut;
use futures::future; use futures::future;
use futures::prelude::*; use futures::prelude::*;
@ -184,17 +183,18 @@ where
/* Gossip codec for the framing */ /* Gossip codec for the framing */
pub struct GossipsubCodec { pub struct GossipsubCodec {
/// Codec to encode/decode the Unsigned varint length prefix of the frames.
length_codec: codec::UviBytes,
/// Determines the level of validation performed on incoming messages. /// Determines the level of validation performed on incoming messages.
validation_mode: ValidationMode, validation_mode: ValidationMode,
/// The codec to handle common encoding/decoding of protobuf messages
codec: prost_codec::Codec<rpc_proto::Rpc>,
} }
impl GossipsubCodec { impl GossipsubCodec {
pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> Self { pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> GossipsubCodec {
let codec = prost_codec::Codec::new(length_codec.max_len());
GossipsubCodec { GossipsubCodec {
length_codec,
validation_mode, validation_mode,
codec,
} }
} }
@ -267,16 +267,12 @@ impl Encoder for GossipsubCodec {
type Item = rpc_proto::Rpc; type Item = rpc_proto::Rpc;
type Error = GossipsubHandlerError; type Error = GossipsubHandlerError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(
let mut buf = Vec::with_capacity(item.encoded_len()); &mut self,
item: Self::Item,
item.encode(&mut buf) dst: &mut BytesMut,
.expect("Buffer has sufficient capacity"); ) -> Result<(), GossipsubHandlerError> {
Ok(self.codec.encode(item, dst)?)
// length prefix the protobuf message, ensuring the max limit is not hit
self.length_codec
.encode(Bytes::from(buf), dst)
.map_err(|_| GossipsubHandlerError::MaxTransmissionSize)
} }
} }
@ -284,20 +280,12 @@ impl Decoder for GossipsubCodec {
type Item = HandlerEvent; type Item = HandlerEvent;
type Error = GossipsubHandlerError; type Error = GossipsubHandlerError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, GossipsubHandlerError> {
let packet = match self.length_codec.decode(src).map_err(|e| { let rpc = match self.codec.decode(src)? {
if let std::io::ErrorKind::PermissionDenied = e.kind() {
GossipsubHandlerError::MaxTransmissionSize
} else {
GossipsubHandlerError::Io(e)
}
})? {
Some(p) => p, Some(p) => p,
None => return Ok(None), None => return Ok(None),
}; };
let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(std::io::Error::from)?;
// Store valid messages. // Store valid messages.
let mut messages = Vec::with_capacity(rpc.publish.len()); let mut messages = Vec::with_capacity(rpc.publish.len());
// Store any invalid messages. // Store any invalid messages.