diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 3e10bb3e..4ddb017b 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -4,6 +4,12 @@ - Update to `libp2p-swarm` `v0.41.0`. +- Update to `prost-codec` `v0.3.0`. + +- Refactoring GossipsubCodec to use common protobuf Codec. See [PR 3070]. + +[PR 3070]: https://github.com/libp2p/rust-libp2p/pull/3070 + # 0.42.0 - Bump rand to 0.8 and quickcheck to 1. See [PR 2857]. diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 4f34aada..819805fd 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -25,9 +25,11 @@ sha2 = "0.10.0" base64 = "0.13.0" smallvec = "1.6.1" prost = "0.11" +prost-codec = { version = "0.3", path = "../../misc/prost-codec" } hex_fmt = "0.3.0" regex = "1.5.5" serde = { version = "1", optional = true, features = ["derive"] } +thiserror = "1.0" wasm-timer = "0.2.5" instant = "0.1.11" # Metrics dependencies diff --git a/protocols/gossipsub/src/error.rs b/protocols/gossipsub/src/error.rs index e469a5a6..6654db8d 100644 --- a/protocols/gossipsub/src/error.rs +++ b/protocols/gossipsub/src/error.rs @@ -22,7 +22,7 @@ use libp2p_core::identity::error::SigningError; use libp2p_core::upgrade::ProtocolError; -use std::fmt; +use thiserror::Error; /// Error associated with publishing a gossipsub message. #[derive(Debug)] @@ -87,20 +87,20 @@ impl From for PublishError { } /// Errors that can occur in the protocols handler. -#[derive(Debug)] +#[derive(Debug, Error)] pub enum GossipsubHandlerError { - /// The maximum number of inbound substreams created has been exceeded. + #[error("The maximum number of inbound substreams created has been exceeded.")] MaxInboundSubstreams, - /// The maximum number of outbound substreams created has been exceeded. + #[error("The maximum number of outbound substreams created has been exceeded.")] MaxOutboundSubstreams, - /// The message exceeds the maximum transmission size. + #[error("The message exceeds the maximum transmission size.")] MaxTransmissionSize, - /// Protocol negotiation timeout. + #[error("Protocol negotiation timeout.")] NegotiationTimeout, - /// Protocol negotiation failed. + #[error("Protocol negotiation failed.")] NegotiationProtocolError(ProtocolError), - /// IO error. - Io(std::io::Error), + #[error("Failed to encode or decode")] + Codec(#[from] prost_codec::Error), } #[derive(Debug, Clone, Copy)] @@ -136,7 +136,7 @@ impl std::error::Error for ValidationError {} impl From for GossipsubHandlerError { fn from(error: std::io::Error) -> GossipsubHandlerError { - GossipsubHandlerError::Io(error) + GossipsubHandlerError::Codec(prost_codec::Error::from(error)) } } @@ -145,18 +145,3 @@ impl From for PublishError { PublishError::TransformFailed(error) } } - -impl fmt::Display for GossipsubHandlerError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self) - } -} - -impl std::error::Error for GossipsubHandlerError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - GossipsubHandlerError::Io(io) => Some(io), - _ => None, - } - } -} diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index c6aa2bdd..ff0bd1c0 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -29,7 +29,6 @@ use crate::types::{ }; use asynchronous_codec::{Decoder, Encoder, Framed}; use byteorder::{BigEndian, ByteOrder}; -use bytes::Bytes; use bytes::BytesMut; use futures::future; use futures::prelude::*; @@ -184,17 +183,18 @@ where /* Gossip codec for the framing */ pub struct GossipsubCodec { - /// Codec to encode/decode the Unsigned varint length prefix of the frames. - length_codec: codec::UviBytes, /// Determines the level of validation performed on incoming messages. validation_mode: ValidationMode, + /// The codec to handle common encoding/decoding of protobuf messages + codec: prost_codec::Codec, } impl GossipsubCodec { - pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> Self { + pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> GossipsubCodec { + let codec = prost_codec::Codec::new(length_codec.max_len()); GossipsubCodec { - length_codec, validation_mode, + codec, } } @@ -267,16 +267,12 @@ impl Encoder for GossipsubCodec { type Item = rpc_proto::Rpc; type Error = GossipsubHandlerError; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - let mut buf = Vec::with_capacity(item.encoded_len()); - - item.encode(&mut buf) - .expect("Buffer has sufficient capacity"); - - // length prefix the protobuf message, ensuring the max limit is not hit - self.length_codec - .encode(Bytes::from(buf), dst) - .map_err(|_| GossipsubHandlerError::MaxTransmissionSize) + fn encode( + &mut self, + item: Self::Item, + dst: &mut BytesMut, + ) -> Result<(), GossipsubHandlerError> { + Ok(self.codec.encode(item, dst)?) } } @@ -284,20 +280,12 @@ impl Decoder for GossipsubCodec { type Item = HandlerEvent; type Error = GossipsubHandlerError; - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - let packet = match self.length_codec.decode(src).map_err(|e| { - if let std::io::ErrorKind::PermissionDenied = e.kind() { - GossipsubHandlerError::MaxTransmissionSize - } else { - GossipsubHandlerError::Io(e) - } - })? { + fn decode(&mut self, src: &mut BytesMut) -> Result, GossipsubHandlerError> { + let rpc = match self.codec.decode(src)? { Some(p) => p, None => return Ok(None), }; - let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(std::io::Error::from)?; - // Store valid messages. let mut messages = Vec::with_capacity(rpc.publish.len()); // Store any invalid messages.