feat(floodsub): make use of prost-codec (#3224)

This patch addresses #2500 for the `libp2p-floodsub` crate.

For this PR the existing code was upgraded to use `Framed` with the `prost_codec::Codec` as the standard codec for handling the RPC message format serialization/deserialization.
This commit is contained in:
Nick Loadholtes 2022-12-19 06:41:38 -05:00 committed by GitHub
parent 76abab9e20
commit 2dd188e897
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 33 deletions

View File

@ -2,6 +2,10 @@
- Update to `libp2p-swarm` `v0.42.0`. - Update to `libp2p-swarm` `v0.42.0`.
- Read and write protocols messages via `prost-codec`. See [PR 3224].
[pr 3224]: https://github.com/libp2p/rust-libp2p/pull/3224
# 0.41.0 # 0.41.0
- Update to `libp2p-core` `v0.38.0`. - Update to `libp2p-core` `v0.38.0`.

View File

@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
asynchronous-codec = "0.6"
cuckoofilter = "0.5.0" cuckoofilter = "0.5.0"
fnv = "1.0" fnv = "1.0"
futures = "0.3.1" futures = "0.3.1"
@ -18,6 +19,7 @@ libp2p-core = { version = "0.38.0", path = "../../core" }
libp2p-swarm = { version = "0.42.0", path = "../../swarm" } libp2p-swarm = { version = "0.42.0", path = "../../swarm" }
log = "0.4" log = "0.4"
prost = "0.11" prost = "0.11"
prost-codec = { version = "0.3", path = "../../misc/prost-codec" }
rand = "0.8" rand = "0.8"
smallvec = "1.6.1" smallvec = "1.6.1"
thiserror = "1.0.37" thiserror = "1.0.37"

View File

@ -20,14 +20,19 @@
use crate::rpc_proto; use crate::rpc_proto;
use crate::topic::Topic; use crate::topic::Topic;
use asynchronous_codec::Framed;
use futures::{ use futures::{
io::{AsyncRead, AsyncWrite}, io::{AsyncRead, AsyncWrite},
AsyncWriteExt, Future, Future,
}; };
use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; use futures::{SinkExt, StreamExt};
use prost::Message; use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
use std::{io, iter, pin::Pin}; use std::{io, iter, pin::Pin};
const MAX_MESSAGE_LEN_BYTES: usize = 2048;
const PROTOCOL_NAME: &[u8] = b"/floodsub/1.0.0";
/// Implementation of `ConnectionUpgrade` for the floodsub protocol. /// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct FloodsubProtocol {} pub struct FloodsubProtocol {}
@ -44,7 +49,7 @@ impl UpgradeInfo for FloodsubProtocol {
type InfoIter = iter::Once<Self::Info>; type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter { fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/floodsub/1.0.0") iter::once(PROTOCOL_NAME)
} }
} }
@ -53,19 +58,27 @@ where
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type Output = FloodsubRpc; type Output = FloodsubRpc;
type Error = FloodsubDecodeError; type Error = FloodsubError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
Box::pin(async move { Box::pin(async move {
let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?; let mut framed = Framed::new(
let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(DecodeError)?; socket,
prost_codec::Codec::<rpc_proto::Rpc>::new(MAX_MESSAGE_LEN_BYTES),
);
let rpc = framed
.next()
.await
.ok_or_else(|| FloodsubError::ReadError(io::ErrorKind::UnexpectedEof.into()))?
.map_err(CodecError)?;
let mut messages = Vec::with_capacity(rpc.publish.len()); let mut messages = Vec::with_capacity(rpc.publish.len());
for publish in rpc.publish.into_iter() { for publish in rpc.publish.into_iter() {
messages.push(FloodsubMessage { messages.push(FloodsubMessage {
source: PeerId::from_bytes(&publish.from.unwrap_or_default()) source: PeerId::from_bytes(&publish.from.unwrap_or_default())
.map_err(|_| FloodsubDecodeError::InvalidPeerId)?, .map_err(|_| FloodsubError::InvalidPeerId)?,
data: publish.data.unwrap_or_default(), data: publish.data.unwrap_or_default(),
sequence_number: publish.seqno.unwrap_or_default(), sequence_number: publish.seqno.unwrap_or_default(),
topics: publish.topic_ids.into_iter().map(Topic::new).collect(), topics: publish.topic_ids.into_iter().map(Topic::new).collect(),
@ -93,21 +106,21 @@ where
/// Reach attempt interrupt errors. /// Reach attempt interrupt errors.
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum FloodsubDecodeError { pub enum FloodsubError {
/// Error when reading the packet from the socket.
#[error("Failed to read from socket")]
ReadError(#[from] io::Error),
/// Error when decoding the raw buffer into a protobuf.
#[error("Failed to decode protobuf")]
ProtobufError(#[from] DecodeError),
/// Error when parsing the `PeerId` in the message. /// Error when parsing the `PeerId` in the message.
#[error("Failed to decode PeerId from message")] #[error("Failed to decode PeerId from message")]
InvalidPeerId, InvalidPeerId,
/// Error when decoding the raw buffer into a protobuf.
#[error("Failed to decode protobuf")]
ProtobufError(#[from] CodecError),
/// Error when reading the packet from the socket.
#[error("Failed to read from socket")]
ReadError(#[from] io::Error),
} }
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
#[error(transparent)] #[error(transparent)]
pub struct DecodeError(prost::DecodeError); pub struct CodecError(#[from] prost_codec::Error);
/// An RPC received by the floodsub system. /// An RPC received by the floodsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
@ -123,7 +136,7 @@ impl UpgradeInfo for FloodsubRpc {
type InfoIter = iter::Once<Self::Info>; type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter { fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/floodsub/1.0.0") iter::once(PROTOCOL_NAME)
} }
} }
@ -132,16 +145,17 @@ where
TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static, TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static,
{ {
type Output = (); type Output = ();
type Error = io::Error; type Error = CodecError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
Box::pin(async move { Box::pin(async move {
let bytes = self.into_bytes(); let mut framed = Framed::new(
socket,
upgrade::write_length_prefixed(&mut socket, bytes).await?; prost_codec::Codec::<rpc_proto::Rpc>::new(MAX_MESSAGE_LEN_BYTES),
socket.close().await?; );
framed.send(self.into_rpc()).await?;
framed.close().await?;
Ok(()) Ok(())
}) })
} }
@ -149,8 +163,8 @@ where
impl FloodsubRpc { impl FloodsubRpc {
/// Turns this `FloodsubRpc` into a message that can be sent to a substream. /// Turns this `FloodsubRpc` into a message that can be sent to a substream.
fn into_bytes(self) -> Vec<u8> { fn into_rpc(self) -> rpc_proto::Rpc {
let rpc = rpc_proto::Rpc { rpc_proto::Rpc {
publish: self publish: self
.messages .messages
.into_iter() .into_iter()
@ -170,12 +184,7 @@ impl FloodsubRpc {
topic_id: Some(topic.topic.into()), topic_id: Some(topic.topic.into()),
}) })
.collect(), .collect(),
}; }
let mut buf = Vec::with_capacity(rpc.encoded_len());
rpc.encode(&mut buf)
.expect("Vec<u8> provides capacity as needed");
buf
} }
} }