From 2dd188e8970abafdc5dfbb7bb1eaa597bb1e1303 Mon Sep 17 00:00:00 2001 From: Nick Loadholtes Date: Mon, 19 Dec 2022 06:41:38 -0500 Subject: [PATCH] feat(floodsub): make use of `prost-codec` (#3224) This patch addresses #2500 for the `libp2p-floodsub` crate. For this PR the existing code was upgraded to use `Framed` with the `prost_codec::Codec` as the standard codec for handling the RPC message format serialization/deserialization. --- protocols/floodsub/CHANGELOG.md | 4 ++ protocols/floodsub/Cargo.toml | 2 + protocols/floodsub/src/protocol.rs | 75 +++++++++++++++++------------- 3 files changed, 48 insertions(+), 33 deletions(-) diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 564476d9..7b7293e6 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -2,6 +2,10 @@ - Update to `libp2p-swarm` `v0.42.0`. +- Read and write protocols messages via `prost-codec`. See [PR 3224]. + +[pr 3224]: https://github.com/libp2p/rust-libp2p/pull/3224 + # 0.41.0 - Update to `libp2p-core` `v0.38.0`. diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 5716da7d..778f949b 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +asynchronous-codec = "0.6" cuckoofilter = "0.5.0" fnv = "1.0" futures = "0.3.1" @@ -18,6 +19,7 @@ libp2p-core = { version = "0.38.0", path = "../../core" } libp2p-swarm = { version = "0.42.0", path = "../../swarm" } log = "0.4" prost = "0.11" +prost-codec = { version = "0.3", path = "../../misc/prost-codec" } rand = "0.8" smallvec = "1.6.1" thiserror = "1.0.37" diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 104e92e4..fe3f2859 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -20,14 +20,19 @@ use crate::rpc_proto; use crate::topic::Topic; +use asynchronous_codec::Framed; use futures::{ io::{AsyncRead, AsyncWrite}, - AsyncWriteExt, Future, + Future, }; -use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; -use prost::Message; +use futures::{SinkExt, StreamExt}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; use std::{io, iter, pin::Pin}; +const MAX_MESSAGE_LEN_BYTES: usize = 2048; + +const PROTOCOL_NAME: &[u8] = b"/floodsub/1.0.0"; + /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] pub struct FloodsubProtocol {} @@ -44,7 +49,7 @@ impl UpgradeInfo for FloodsubProtocol { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/floodsub/1.0.0") + iter::once(PROTOCOL_NAME) } } @@ -53,19 +58,27 @@ where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type Output = FloodsubRpc; - type Error = FloodsubDecodeError; + type Error = FloodsubError; type Future = Pin> + Send>>; - fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { - let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?; - let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(DecodeError)?; + let mut framed = Framed::new( + socket, + prost_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES), + ); + + let rpc = framed + .next() + .await + .ok_or_else(|| FloodsubError::ReadError(io::ErrorKind::UnexpectedEof.into()))? + .map_err(CodecError)?; let mut messages = Vec::with_capacity(rpc.publish.len()); for publish in rpc.publish.into_iter() { messages.push(FloodsubMessage { source: PeerId::from_bytes(&publish.from.unwrap_or_default()) - .map_err(|_| FloodsubDecodeError::InvalidPeerId)?, + .map_err(|_| FloodsubError::InvalidPeerId)?, data: publish.data.unwrap_or_default(), sequence_number: publish.seqno.unwrap_or_default(), topics: publish.topic_ids.into_iter().map(Topic::new).collect(), @@ -93,21 +106,21 @@ where /// Reach attempt interrupt errors. #[derive(thiserror::Error, Debug)] -pub enum FloodsubDecodeError { - /// Error when reading the packet from the socket. - #[error("Failed to read from socket")] - ReadError(#[from] io::Error), - /// Error when decoding the raw buffer into a protobuf. - #[error("Failed to decode protobuf")] - ProtobufError(#[from] DecodeError), +pub enum FloodsubError { /// Error when parsing the `PeerId` in the message. #[error("Failed to decode PeerId from message")] InvalidPeerId, + /// Error when decoding the raw buffer into a protobuf. + #[error("Failed to decode protobuf")] + ProtobufError(#[from] CodecError), + /// Error when reading the packet from the socket. + #[error("Failed to read from socket")] + ReadError(#[from] io::Error), } #[derive(thiserror::Error, Debug)] #[error(transparent)] -pub struct DecodeError(prost::DecodeError); +pub struct CodecError(#[from] prost_codec::Error); /// An RPC received by the floodsub system. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -123,7 +136,7 @@ impl UpgradeInfo for FloodsubRpc { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/floodsub/1.0.0") + iter::once(PROTOCOL_NAME) } } @@ -132,16 +145,17 @@ where TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static, { type Output = (); - type Error = io::Error; + type Error = CodecError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { - let bytes = self.into_bytes(); - - upgrade::write_length_prefixed(&mut socket, bytes).await?; - socket.close().await?; - + let mut framed = Framed::new( + socket, + prost_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES), + ); + framed.send(self.into_rpc()).await?; + framed.close().await?; Ok(()) }) } @@ -149,8 +163,8 @@ where impl FloodsubRpc { /// Turns this `FloodsubRpc` into a message that can be sent to a substream. - fn into_bytes(self) -> Vec { - let rpc = rpc_proto::Rpc { + fn into_rpc(self) -> rpc_proto::Rpc { + rpc_proto::Rpc { publish: self .messages .into_iter() @@ -170,12 +184,7 @@ impl FloodsubRpc { topic_id: Some(topic.topic.into()), }) .collect(), - }; - - let mut buf = Vec::with_capacity(rpc.encoded_len()); - rpc.encode(&mut buf) - .expect("Vec provides capacity as needed"); - buf + } } }