feat: migrate to quick-protobuf

Instead of relying on `protoc` and buildscripts, we generate the bindings using `pb-rs` and version them within our codebase. This makes for a better IDE integration, a faster build and an easier use of `rust-libp2p` because we don't force the `protoc` dependency onto them.

Resolves #3024.

Pull-Request: #3312.
This commit is contained in:
Miguel Guarniz
2023-03-02 05:45:07 -05:00
committed by GitHub
parent 4910160bea
commit db82e0210e
141 changed files with 3662 additions and 1276 deletions

View File

@ -32,7 +32,6 @@ use std::{
use futures::StreamExt;
use log::{debug, error, trace, warn};
use prometheus_client::registry::Registry;
use prost::Message as _;
use rand::{seq::SliceRandom, thread_rng};
use libp2p_core::{
@ -64,8 +63,9 @@ use crate::types::{
Subscription, SubscriptionAction,
};
use crate::types::{PeerConnections, PeerKind, Rpc};
use crate::{rpc_proto, TopicScoreParams};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use quick_protobuf::{MessageWrite, Writer};
use std::{cmp::Ordering::Equal, fmt::Debug};
use wasm_timer::Interval;
@ -179,8 +179,8 @@ impl From<MessageAuthenticity> for PublishConfig {
let public_key = keypair.public();
let key_enc = public_key.to_protobuf_encoding();
let key = if key_enc.len() <= 42 {
// The public key can be inlined in [`rpc_proto::Message::from`], so we don't include it
// specifically in the [`rpc_proto::Message::key`] field.
// The public key can be inlined in [`rpc_proto::proto::::Message::from`], so we don't include it
// specifically in the [`rpc_proto::proto::Message::key`] field.
None
} else {
// Include the protobuf encoding of the public key in the message.
@ -610,7 +610,7 @@ where
.into_protobuf();
// check that the size doesn't exceed the max transmission size
if event.encoded_len() > self.config.max_transmit_size() {
if event.get_size() > self.config.max_transmit_size() {
return Err(PublishError::MessageTooLarge);
}
@ -721,7 +721,7 @@ where
}
// Send to peers we know are subscribed to the topic.
let msg_bytes = event.encoded_len();
let msg_bytes = event.get_size();
for peer_id in recipient_peers.iter() {
trace!("Sending message to peer: {:?}", peer_id);
self.send_message(*peer_id, event.clone())?;
@ -1338,7 +1338,7 @@ where
}
.into_protobuf();
let msg_bytes = message.encoded_len();
let msg_bytes = message.get_size();
if self.send_message(*peer_id, message).is_err() {
error!("Failed to send cached messages. Messages too large");
@ -2733,7 +2733,7 @@ where
}
.into_protobuf();
let msg_bytes = event.encoded_len();
let msg_bytes = event.get_size();
for peer in recipient_peers.iter() {
debug!("Sending message: {:?} to peer {:?}", msg_id, peer);
self.send_message(*peer, event.clone())?;
@ -2764,7 +2764,7 @@ where
let sequence_number: u64 = rand::random();
let signature = {
let message = rpc_proto::Message {
let message = proto::Message {
from: Some(author.clone().to_bytes()),
data: Some(data.clone()),
seqno: Some(sequence_number.to_be_bytes().to_vec()),
@ -2773,10 +2773,12 @@ where
key: None,
};
let mut buf = Vec::with_capacity(message.encoded_len());
let mut buf = Vec::with_capacity(message.get_size());
let mut writer = Writer::new(&mut buf);
message
.encode(&mut buf)
.expect("Buffer has sufficient capacity");
.write_message(&mut writer)
.expect("Encoding to succeed");
// the signature is over the bytes "libp2p-pubsub:<protobuf-message>"
let mut signature_bytes = SIGNING_PREFIX.to_vec();
@ -2875,11 +2877,7 @@ where
/// Send a [`Rpc`] message to a peer. This will wrap the message in an arc if it
/// is not already an arc.
fn send_message(
&mut self,
peer_id: PeerId,
message: rpc_proto::Rpc,
) -> Result<(), PublishError> {
fn send_message(&mut self, peer_id: PeerId, message: proto::RPC) -> Result<(), PublishError> {
// If the message is oversized, try and fragment it. If it cannot be fragmented, log an
// error and drop the message (all individual messages should be small enough to fit in the
// max_transmit_size)
@ -2899,12 +2897,12 @@ where
// If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC
// messages to be sent.
fn fragment_message(&self, rpc: rpc_proto::Rpc) -> Result<Vec<rpc_proto::Rpc>, PublishError> {
if rpc.encoded_len() < self.config.max_transmit_size() {
fn fragment_message(&self, rpc: proto::RPC) -> Result<Vec<proto::RPC>, PublishError> {
if rpc.get_size() < self.config.max_transmit_size() {
return Ok(vec![rpc]);
}
let new_rpc = rpc_proto::Rpc {
let new_rpc = proto::RPC {
subscriptions: Vec::new(),
publish: Vec::new(),
control: None,
@ -2920,7 +2918,7 @@ where
// create a new RPC if the new object plus 5% of its size (for length prefix
// buffers) exceeds the max transmit size.
if rpc_list[list_index].encoded_len() + (($object_size as f64) * 1.05) as usize
if rpc_list[list_index].get_size() + (($object_size as f64) * 1.05) as usize
> self.config.max_transmit_size()
&& rpc_list[list_index] != new_rpc
{
@ -2932,7 +2930,7 @@ where
macro_rules! add_item {
($object: ident, $type: ident ) => {
let object_size = $object.encoded_len();
let object_size = $object.get_size();
if object_size + 2 > self.config.max_transmit_size() {
// This should not be possible. All received and published messages have already
@ -2960,12 +2958,12 @@ where
// handle the control messages. If all are within the max_transmit_size, send them without
// fragmenting, otherwise, fragment the control messages
let empty_control = rpc_proto::ControlMessage::default();
let empty_control = proto::ControlMessage::default();
if let Some(control) = rpc.control.as_ref() {
if control.encoded_len() + 2 > self.config.max_transmit_size() {
if control.get_size() + 2 > self.config.max_transmit_size() {
// fragment the RPC
for ihave in &control.ihave {
let len = ihave.encoded_len();
let len = ihave.get_size();
create_or_add_rpc!(len);
rpc_list
.last_mut()
@ -2976,7 +2974,7 @@ where
.push(ihave.clone());
}
for iwant in &control.iwant {
let len = iwant.encoded_len();
let len = iwant.get_size();
create_or_add_rpc!(len);
rpc_list
.last_mut()
@ -2987,7 +2985,7 @@ where
.push(iwant.clone());
}
for graft in &control.graft {
let len = graft.encoded_len();
let len = graft.get_size();
create_or_add_rpc!(len);
rpc_list
.last_mut()
@ -2998,7 +2996,7 @@ where
.push(graft.clone());
}
for prune in &control.prune {
let len = prune.encoded_len();
let len = prune.get_size();
create_or_add_rpc!(len);
rpc_list
.last_mut()
@ -3009,7 +3007,7 @@ where
.push(prune.clone());
}
} else {
let len = control.encoded_len();
let len = control.get_size();
create_or_add_rpc!(len);
rpc_list.last_mut().expect("Always an element").control = Some(control.clone());
}
@ -3769,7 +3767,7 @@ mod local_test {
// Messages over the limit should be split
while rpc_proto.encoded_len() < max_transmit_size {
while rpc_proto.get_size() < max_transmit_size {
rpc.messages.push(test_message());
rpc_proto = rpc.clone().into_protobuf();
}
@ -3786,7 +3784,7 @@ mod local_test {
// all fragmented messages should be under the limit
for message in fragmented_messages {
assert!(
message.encoded_len() < max_transmit_size,
message.get_size() < max_transmit_size,
"all messages should be less than the transmission size"
);
}
@ -3813,7 +3811,7 @@ mod local_test {
.fragment_message(rpc_proto.clone())
.expect("Messages must be valid");
if rpc_proto.encoded_len() < max_transmit_size {
if rpc_proto.get_size() < max_transmit_size {
assert_eq!(
fragmented_messages.len(),
1,
@ -3829,12 +3827,12 @@ mod local_test {
// all fragmented messages should be under the limit
for message in fragmented_messages {
assert!(
message.encoded_len() < max_transmit_size,
"all messages should be less than the transmission size: list size {} max size{}", message.encoded_len(), max_transmit_size
message.get_size() < max_transmit_size,
"all messages should be less than the transmission size: list size {} max size{}", message.get_size(), max_transmit_size
);
// ensure they can all be encoded
let mut buf = bytes::BytesMut::with_capacity(message.encoded_len());
let mut buf = bytes::BytesMut::with_capacity(message.get_size());
codec.encode(message, &mut buf).unwrap()
}
}

View File

@ -284,7 +284,7 @@ where
}
// Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue.
fn proto_to_message(rpc: &crate::rpc_proto::Rpc) -> Rpc {
fn proto_to_message(rpc: &proto::RPC) -> Rpc {
// Store valid messages.
let mut messages = Vec::with_capacity(rpc.publish.len());
let rpc = rpc.clone();

View File

@ -100,7 +100,7 @@ pub enum HandlerError {
#[error("Protocol negotiation failed.")]
NegotiationProtocolError(ProtocolError),
#[error("Failed to encode or decode")]
Codec(#[from] prost_codec::Error),
Codec(#[from] quick_protobuf_codec::Error),
}
#[derive(Debug, Clone, Copy)]
@ -136,7 +136,7 @@ impl std::error::Error for ValidationError {}
impl From<std::io::Error> for HandlerError {
fn from(error: std::io::Error) -> HandlerError {
HandlerError::Codec(prost_codec::Error::from(error))
HandlerError::Codec(quick_protobuf_codec::Error::from(error))
}
}

View File

@ -0,0 +1,2 @@
// Automatically generated mod.rs
pub mod pb;

View File

@ -0,0 +1,67 @@
// Automatically generated rust module for 'compat.proto' file
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(unused_imports)]
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::super::*;
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct Message {
pub from: Option<Vec<u8>>,
pub data: Option<Vec<u8>>,
pub seqno: Option<Vec<u8>>,
pub topic_ids: Vec<String>,
pub signature: Option<Vec<u8>>,
pub key: Option<Vec<u8>>,
}
impl<'a> MessageRead<'a> for Message {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.from = Some(r.read_bytes(bytes)?.to_owned()),
Ok(18) => msg.data = Some(r.read_bytes(bytes)?.to_owned()),
Ok(26) => msg.seqno = Some(r.read_bytes(bytes)?.to_owned()),
Ok(34) => msg.topic_ids.push(r.read_string(bytes)?.to_owned()),
Ok(42) => msg.signature = Some(r.read_bytes(bytes)?.to_owned()),
Ok(50) => msg.key = Some(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for Message {
fn get_size(&self) -> usize {
0
+ self.from.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.data.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.seqno.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.topic_ids.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
+ self.signature.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.key.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.from { w.write_with_tag(10, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.data { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.seqno { w.write_with_tag(26, |w| w.write_bytes(&**s))?; }
for s in &self.topic_ids { w.write_with_tag(34, |w| w.write_string(&**s))?; }
if let Some(ref s) = self.signature { w.write_with_tag(42, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.key { w.write_with_tag(50, |w| w.write_bytes(&**s))?; }
Ok(())
}
}

View File

@ -0,0 +1,2 @@
// Automatically generated mod.rs
pub mod pb;

View File

@ -0,0 +1,567 @@
// Automatically generated rust module for 'rpc.proto' file
#![allow(non_snake_case)]
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(unused_imports)]
#![allow(unknown_lints)]
#![allow(clippy::all)]
#![cfg_attr(rustfmt, rustfmt_skip)]
use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::super::*;
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct RPC {
pub subscriptions: Vec<gossipsub::pb::mod_RPC::SubOpts>,
pub publish: Vec<gossipsub::pb::Message>,
pub control: Option<gossipsub::pb::ControlMessage>,
}
impl<'a> MessageRead<'a> for RPC {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.subscriptions.push(r.read_message::<gossipsub::pb::mod_RPC::SubOpts>(bytes)?),
Ok(18) => msg.publish.push(r.read_message::<gossipsub::pb::Message>(bytes)?),
Ok(26) => msg.control = Some(r.read_message::<gossipsub::pb::ControlMessage>(bytes)?),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for RPC {
fn get_size(&self) -> usize {
0
+ self.subscriptions.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.publish.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.control.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size()))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
for s in &self.subscriptions { w.write_with_tag(10, |w| w.write_message(s))?; }
for s in &self.publish { w.write_with_tag(18, |w| w.write_message(s))?; }
if let Some(ref s) = self.control { w.write_with_tag(26, |w| w.write_message(s))?; }
Ok(())
}
}
pub mod mod_RPC {
use super::*;
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct SubOpts {
pub subscribe: Option<bool>,
pub topic_id: Option<String>,
}
impl<'a> MessageRead<'a> for SubOpts {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(8) => msg.subscribe = Some(r.read_bool(bytes)?),
Ok(18) => msg.topic_id = Some(r.read_string(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for SubOpts {
fn get_size(&self) -> usize {
0
+ self.subscribe.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64))
+ self.topic_id.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.subscribe { w.write_with_tag(8, |w| w.write_bool(*s))?; }
if let Some(ref s) = self.topic_id { w.write_with_tag(18, |w| w.write_string(&**s))?; }
Ok(())
}
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct Message {
pub from: Option<Vec<u8>>,
pub data: Option<Vec<u8>>,
pub seqno: Option<Vec<u8>>,
pub topic: String,
pub signature: Option<Vec<u8>>,
pub key: Option<Vec<u8>>,
}
impl<'a> MessageRead<'a> for Message {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.from = Some(r.read_bytes(bytes)?.to_owned()),
Ok(18) => msg.data = Some(r.read_bytes(bytes)?.to_owned()),
Ok(26) => msg.seqno = Some(r.read_bytes(bytes)?.to_owned()),
Ok(34) => msg.topic = r.read_string(bytes)?.to_owned(),
Ok(42) => msg.signature = Some(r.read_bytes(bytes)?.to_owned()),
Ok(50) => msg.key = Some(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for Message {
fn get_size(&self) -> usize {
0
+ self.from.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.data.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.seqno.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ 1 + sizeof_len((&self.topic).len())
+ self.signature.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.key.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.from { w.write_with_tag(10, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.data { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.seqno { w.write_with_tag(26, |w| w.write_bytes(&**s))?; }
w.write_with_tag(34, |w| w.write_string(&**&self.topic))?;
if let Some(ref s) = self.signature { w.write_with_tag(42, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.key { w.write_with_tag(50, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct ControlMessage {
pub ihave: Vec<gossipsub::pb::ControlIHave>,
pub iwant: Vec<gossipsub::pb::ControlIWant>,
pub graft: Vec<gossipsub::pb::ControlGraft>,
pub prune: Vec<gossipsub::pb::ControlPrune>,
}
impl<'a> MessageRead<'a> for ControlMessage {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.ihave.push(r.read_message::<gossipsub::pb::ControlIHave>(bytes)?),
Ok(18) => msg.iwant.push(r.read_message::<gossipsub::pb::ControlIWant>(bytes)?),
Ok(26) => msg.graft.push(r.read_message::<gossipsub::pb::ControlGraft>(bytes)?),
Ok(34) => msg.prune.push(r.read_message::<gossipsub::pb::ControlPrune>(bytes)?),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for ControlMessage {
fn get_size(&self) -> usize {
0
+ self.ihave.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.iwant.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.graft.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.prune.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
for s in &self.ihave { w.write_with_tag(10, |w| w.write_message(s))?; }
for s in &self.iwant { w.write_with_tag(18, |w| w.write_message(s))?; }
for s in &self.graft { w.write_with_tag(26, |w| w.write_message(s))?; }
for s in &self.prune { w.write_with_tag(34, |w| w.write_message(s))?; }
Ok(())
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct ControlIHave {
pub topic_id: Option<String>,
pub message_ids: Vec<Vec<u8>>,
}
impl<'a> MessageRead<'a> for ControlIHave {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.topic_id = Some(r.read_string(bytes)?.to_owned()),
Ok(18) => msg.message_ids.push(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for ControlIHave {
fn get_size(&self) -> usize {
0
+ self.topic_id.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.message_ids.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.topic_id { w.write_with_tag(10, |w| w.write_string(&**s))?; }
for s in &self.message_ids { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct ControlIWant {
pub message_ids: Vec<Vec<u8>>,
}
impl<'a> MessageRead<'a> for ControlIWant {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.message_ids.push(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for ControlIWant {
fn get_size(&self) -> usize {
0
+ self.message_ids.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
for s in &self.message_ids { w.write_with_tag(10, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct ControlGraft {
pub topic_id: Option<String>,
}
impl<'a> MessageRead<'a> for ControlGraft {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.topic_id = Some(r.read_string(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for ControlGraft {
fn get_size(&self) -> usize {
0
+ self.topic_id.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.topic_id { w.write_with_tag(10, |w| w.write_string(&**s))?; }
Ok(())
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct ControlPrune {
pub topic_id: Option<String>,
pub peers: Vec<gossipsub::pb::PeerInfo>,
pub backoff: Option<u64>,
}
impl<'a> MessageRead<'a> for ControlPrune {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.topic_id = Some(r.read_string(bytes)?.to_owned()),
Ok(18) => msg.peers.push(r.read_message::<gossipsub::pb::PeerInfo>(bytes)?),
Ok(24) => msg.backoff = Some(r.read_uint64(bytes)?),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for ControlPrune {
fn get_size(&self) -> usize {
0
+ self.topic_id.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.peers.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
+ self.backoff.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.topic_id { w.write_with_tag(10, |w| w.write_string(&**s))?; }
for s in &self.peers { w.write_with_tag(18, |w| w.write_message(s))?; }
if let Some(ref s) = self.backoff { w.write_with_tag(24, |w| w.write_uint64(*s))?; }
Ok(())
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct PeerInfo {
pub peer_id: Option<Vec<u8>>,
pub signed_peer_record: Option<Vec<u8>>,
}
impl<'a> MessageRead<'a> for PeerInfo {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.peer_id = Some(r.read_bytes(bytes)?.to_owned()),
Ok(18) => msg.signed_peer_record = Some(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for PeerInfo {
fn get_size(&self) -> usize {
0
+ self.peer_id.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.signed_peer_record.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.peer_id { w.write_with_tag(10, |w| w.write_bytes(&**s))?; }
if let Some(ref s) = self.signed_peer_record { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct TopicDescriptor {
pub name: Option<String>,
pub auth: Option<gossipsub::pb::mod_TopicDescriptor::AuthOpts>,
pub enc: Option<gossipsub::pb::mod_TopicDescriptor::EncOpts>,
}
impl<'a> MessageRead<'a> for TopicDescriptor {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.name = Some(r.read_string(bytes)?.to_owned()),
Ok(18) => msg.auth = Some(r.read_message::<gossipsub::pb::mod_TopicDescriptor::AuthOpts>(bytes)?),
Ok(26) => msg.enc = Some(r.read_message::<gossipsub::pb::mod_TopicDescriptor::EncOpts>(bytes)?),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for TopicDescriptor {
fn get_size(&self) -> usize {
0
+ self.name.as_ref().map_or(0, |m| 1 + sizeof_len((m).len()))
+ self.auth.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size()))
+ self.enc.as_ref().map_or(0, |m| 1 + sizeof_len((m).get_size()))
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.name { w.write_with_tag(10, |w| w.write_string(&**s))?; }
if let Some(ref s) = self.auth { w.write_with_tag(18, |w| w.write_message(s))?; }
if let Some(ref s) = self.enc { w.write_with_tag(26, |w| w.write_message(s))?; }
Ok(())
}
}
pub mod mod_TopicDescriptor {
use super::*;
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct AuthOpts {
pub mode: Option<gossipsub::pb::mod_TopicDescriptor::mod_AuthOpts::AuthMode>,
pub keys: Vec<Vec<u8>>,
}
impl<'a> MessageRead<'a> for AuthOpts {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(8) => msg.mode = Some(r.read_enum(bytes)?),
Ok(18) => msg.keys.push(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for AuthOpts {
fn get_size(&self) -> usize {
0
+ self.mode.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64))
+ self.keys.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.mode { w.write_with_tag(8, |w| w.write_enum(*s as i32))?; }
for s in &self.keys { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
pub mod mod_AuthOpts {
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum AuthMode {
NONE = 0,
KEY = 1,
WOT = 2,
}
impl Default for AuthMode {
fn default() -> Self {
AuthMode::NONE
}
}
impl From<i32> for AuthMode {
fn from(i: i32) -> Self {
match i {
0 => AuthMode::NONE,
1 => AuthMode::KEY,
2 => AuthMode::WOT,
_ => Self::default(),
}
}
}
impl<'a> From<&'a str> for AuthMode {
fn from(s: &'a str) -> Self {
match s {
"NONE" => AuthMode::NONE,
"KEY" => AuthMode::KEY,
"WOT" => AuthMode::WOT,
_ => Self::default(),
}
}
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct EncOpts {
pub mode: Option<gossipsub::pb::mod_TopicDescriptor::mod_EncOpts::EncMode>,
pub key_hashes: Vec<Vec<u8>>,
}
impl<'a> MessageRead<'a> for EncOpts {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(8) => msg.mode = Some(r.read_enum(bytes)?),
Ok(18) => msg.key_hashes.push(r.read_bytes(bytes)?.to_owned()),
Ok(t) => { r.read_unknown(bytes, t)?; }
Err(e) => return Err(e),
}
}
Ok(msg)
}
}
impl MessageWrite for EncOpts {
fn get_size(&self) -> usize {
0
+ self.mode.as_ref().map_or(0, |m| 1 + sizeof_varint(*(m) as u64))
+ self.key_hashes.iter().map(|s| 1 + sizeof_len((s).len())).sum::<usize>()
}
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
if let Some(ref s) = self.mode { w.write_with_tag(8, |w| w.write_enum(*s as i32))?; }
for s in &self.key_hashes { w.write_with_tag(18, |w| w.write_bytes(&**s))?; }
Ok(())
}
}
pub mod mod_EncOpts {
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum EncMode {
NONE = 0,
SHAREDKEY = 1,
WOT = 2,
}
impl Default for EncMode {
fn default() -> Self {
EncMode::NONE
}
}
impl From<i32> for EncMode {
fn from(i: i32) -> Self {
match i {
0 => EncMode::NONE,
1 => EncMode::SHAREDKEY,
2 => EncMode::WOT,
_ => Self::default(),
}
}
}
impl<'a> From<&'a str> for EncMode {
fn from(s: &'a str) -> Self {
match s {
"NONE" => EncMode::NONE,
"SHAREDKEY" => EncMode::SHAREDKEY,
"WOT" => EncMode::WOT,
_ => Self::default(),
}
}
}
}
}

View File

@ -0,0 +1,3 @@
// Automatically generated mod.rs
pub mod compat;
pub mod gossipsub;

View File

@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::protocol::{GossipsubCodec, ProtocolConfig};
use crate::rpc_proto::proto;
use crate::types::{PeerKind, RawMessage, Rpc};
use crate::{HandlerError, ValidationError};
use asynchronous_codec::Framed;
@ -67,7 +68,7 @@ pub enum HandlerEvent {
#[derive(Debug)]
pub enum HandlerIn {
/// A gossipsub message to send.
Message(crate::rpc_proto::Rpc),
Message(proto::RPC),
/// The peer has joined the mesh.
JoinedMesh,
/// The peer has left the mesh.
@ -93,7 +94,7 @@ pub struct Handler {
inbound_substream: Option<InboundSubstreamState>,
/// Queue of values that we want to send to the remote.
send_queue: SmallVec<[crate::rpc_proto::Rpc; 16]>,
send_queue: SmallVec<[proto::RPC; 16]>,
/// Flag indicating that an outbound substream is being established to prevent duplicate
/// requests.
@ -149,10 +150,7 @@ enum OutboundSubstreamState {
/// Waiting for the user to send a message. The idle state for an outbound substream.
WaitingOutput(Framed<NegotiatedSubstream, GossipsubCodec>),
/// Waiting to send a message to the remote.
PendingSend(
Framed<NegotiatedSubstream, GossipsubCodec>,
crate::rpc_proto::Rpc,
),
PendingSend(Framed<NegotiatedSubstream, GossipsubCodec>, proto::RPC),
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(Framed<NegotiatedSubstream, GossipsubCodec>),
/// The substream is being closed. Used by either substream.
@ -251,7 +249,7 @@ impl ConnectionHandler for Handler {
type Error = HandlerError;
type InboundOpenInfo = ();
type InboundProtocol = ProtocolConfig;
type OutboundOpenInfo = crate::rpc_proto::Rpc;
type OutboundOpenInfo = proto::RPC;
type OutboundProtocol = ProtocolConfig;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {

View File

@ -24,7 +24,7 @@ use crate::topic::TopicHash;
use crate::types::{
ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction,
};
use crate::{rpc_proto, Config};
use crate::{rpc_proto::proto, Config};
use crate::{HandlerError, ValidationError};
use asynchronous_codec::{Decoder, Encoder, Framed};
use byteorder::{BigEndian, ByteOrder};
@ -35,7 +35,7 @@ use libp2p_core::{
identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo,
};
use log::{debug, warn};
use prost::Message as _;
use quick_protobuf::Writer;
use std::pin::Pin;
use unsigned_varint::codec;
@ -191,12 +191,12 @@ pub struct GossipsubCodec {
/// Determines the level of validation performed on incoming messages.
validation_mode: ValidationMode,
/// The codec to handle common encoding/decoding of protobuf messages
codec: prost_codec::Codec<rpc_proto::Rpc>,
codec: quick_protobuf_codec::Codec<proto::RPC>,
}
impl GossipsubCodec {
pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> GossipsubCodec {
let codec = prost_codec::Codec::new(length_codec.max_len());
let codec = quick_protobuf_codec::Codec::new(length_codec.max_len());
GossipsubCodec {
validation_mode,
codec,
@ -206,7 +206,9 @@ impl GossipsubCodec {
/// Verifies a gossipsub message. This returns either a success or failure. All errors
/// are logged, which prevents error handling in the codec and handler. We simply drop invalid
/// messages and log warnings, rather than propagating errors through the codec.
fn verify_signature(message: &rpc_proto::Message) -> bool {
fn verify_signature(message: &proto::Message) -> bool {
use quick_protobuf::MessageWrite;
let from = match message.from.as_ref() {
Some(v) => v,
None => {
@ -258,10 +260,11 @@ impl GossipsubCodec {
let mut message_sig = message.clone();
message_sig.signature = None;
message_sig.key = None;
let mut buf = Vec::with_capacity(message_sig.encoded_len());
let mut buf = Vec::with_capacity(message_sig.get_size());
let mut writer = Writer::new(&mut buf);
message_sig
.encode(&mut buf)
.expect("Buffer has sufficient capacity");
.write_message(&mut writer)
.expect("Encoding to succeed");
let mut signature_bytes = SIGNING_PREFIX.to_vec();
signature_bytes.extend_from_slice(&buf);
public_key.verify(&signature_bytes, signature)
@ -269,7 +272,7 @@ impl GossipsubCodec {
}
impl Encoder for GossipsubCodec {
type Item = rpc_proto::Rpc;
type Item = proto::RPC;
type Error = HandlerError;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), HandlerError> {

View File

@ -17,27 +17,26 @@
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
#![allow(clippy::derive_partial_eq_without_eq)]
include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs"));
pub mod proto {
include!("generated/mod.rs");
pub use self::gossipsub::pb::{mod_RPC::SubOpts, *};
}
#[cfg(test)]
mod test {
use crate::rpc_proto::proto::compat;
use crate::IdentTopic as Topic;
use libp2p_core::PeerId;
use prost::Message;
use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer};
use rand::Rng;
mod compat_proto {
include!(concat!(env!("OUT_DIR"), "/compat.pb.rs"));
}
#[test]
fn test_multi_topic_message_compatibility() {
let topic1 = Topic::new("t1").hash();
let topic2 = Topic::new("t2").hash();
let new_message1 = super::Message {
let new_message1 = super::proto::Message {
from: Some(PeerId::random().to_bytes()),
data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()),
@ -45,7 +44,7 @@ mod test {
signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
};
let old_message1 = compat_proto::Message {
let old_message1 = compat::pb::Message {
from: Some(PeerId::random().to_bytes()),
data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()),
@ -53,7 +52,7 @@ mod test {
signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
};
let old_message2 = compat_proto::Message {
let old_message2 = compat::pb::Message {
from: Some(PeerId::random().to_bytes()),
data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()),
@ -62,22 +61,31 @@ mod test {
key: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
};
let mut new_message1b = Vec::with_capacity(new_message1.encoded_len());
new_message1.encode(&mut new_message1b).unwrap();
let mut new_message1b = Vec::with_capacity(new_message1.get_size());
let mut writer = Writer::new(&mut new_message1b);
new_message1.write_message(&mut writer).unwrap();
let mut old_message1b = Vec::with_capacity(old_message1.encoded_len());
old_message1.encode(&mut old_message1b).unwrap();
let mut old_message1b = Vec::with_capacity(old_message1.get_size());
let mut writer = Writer::new(&mut old_message1b);
old_message1.write_message(&mut writer).unwrap();
let mut old_message2b = Vec::with_capacity(old_message2.encoded_len());
old_message2.encode(&mut old_message2b).unwrap();
let mut old_message2b = Vec::with_capacity(old_message2.get_size());
let mut writer = Writer::new(&mut old_message2b);
old_message2.write_message(&mut writer).unwrap();
let new_message = super::Message::decode(&old_message1b[..]).unwrap();
let mut reader = BytesReader::from_bytes(&old_message1b[..]);
let new_message =
super::proto::Message::from_reader(&mut reader, &old_message1b[..]).unwrap();
assert_eq!(new_message.topic, topic1.clone().into_string());
let new_message = super::Message::decode(&old_message2b[..]).unwrap();
let mut reader = BytesReader::from_bytes(&old_message2b[..]);
let new_message =
super::proto::Message::from_reader(&mut reader, &old_message2b[..]).unwrap();
assert_eq!(new_message.topic, topic2.into_string());
let old_message = compat_proto::Message::decode(&new_message1b[..]).unwrap();
let mut reader = BytesReader::from_bytes(&new_message1b[..]);
let old_message =
compat::pb::Message::from_reader(&mut reader, &new_message1b[..]).unwrap();
assert_eq!(old_message.topic_ids, vec![topic1.into_string()]);
}
}

View File

@ -18,10 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::rpc_proto;
use crate::rpc_proto::proto;
use base64::prelude::*;
use prometheus_client::encoding::EncodeLabelSet;
use prost::Message;
use quick_protobuf::Writer;
use sha2::{Digest, Sha256};
use std::fmt;
@ -47,15 +47,18 @@ impl Hasher for Sha256Hash {
/// Creates a [`TopicHash`] by SHA256 hashing the topic then base64 encoding the
/// hash.
fn hash(topic_string: String) -> TopicHash {
let topic_descripter = rpc_proto::TopicDescriptor {
use quick_protobuf::MessageWrite;
let topic_descripter = proto::TopicDescriptor {
name: Some(topic_string),
auth: None,
enc: None,
};
let mut bytes = Vec::with_capacity(topic_descripter.encoded_len());
let mut bytes = Vec::with_capacity(topic_descripter.get_size());
let mut writer = Writer::new(&mut bytes);
topic_descripter
.encode(&mut bytes)
.expect("buffer is large enough");
.write_message(&mut writer)
.expect("Encoding to succeed");
let hash = BASE64_STANDARD.encode(Sha256::digest(&bytes));
TopicHash { hash }
}

View File

@ -19,15 +19,15 @@
// DEALINGS IN THE SOFTWARE.
//! A collection of types using the Gossipsub system.
use crate::rpc_proto;
use crate::TopicHash;
use libp2p_core::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
use prost::Message as _;
use quick_protobuf::MessageWrite;
use std::fmt;
use std::fmt::Debug;
use crate::rpc_proto::proto;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
@ -136,7 +136,7 @@ pub struct RawMessage {
impl RawMessage {
/// Calculates the encoded length of this message (used for calculating metrics).
pub fn raw_protobuf_len(&self) -> usize {
let message = rpc_proto::Message {
let message = proto::Message {
from: self.source.map(|m| m.to_bytes()),
data: Some(self.data.clone()),
seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()),
@ -144,7 +144,7 @@ impl RawMessage {
signature: self.signature.clone(),
key: self.key.clone(),
};
message.encoded_len()
message.get_size()
}
}
@ -250,19 +250,19 @@ pub struct Rpc {
impl Rpc {
/// Converts the GossipsubRPC into its protobuf format.
// A convenience function to avoid explicitly specifying types.
pub fn into_protobuf(self) -> rpc_proto::Rpc {
pub fn into_protobuf(self) -> proto::RPC {
self.into()
}
}
impl From<Rpc> for rpc_proto::Rpc {
impl From<Rpc> for proto::RPC {
/// Converts the RPC into protobuf format.
fn from(rpc: Rpc) -> Self {
// Messages
let mut publish = Vec::new();
for message in rpc.messages.into_iter() {
let message = rpc_proto::Message {
let message = proto::Message {
from: message.source.map(|m| m.to_bytes()),
data: Some(message.data),
seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
@ -278,14 +278,14 @@ impl From<Rpc> for rpc_proto::Rpc {
let subscriptions = rpc
.subscriptions
.into_iter()
.map(|sub| rpc_proto::rpc::SubOpts {
.map(|sub| proto::SubOpts {
subscribe: Some(sub.action == SubscriptionAction::Subscribe),
topic_id: Some(sub.topic_hash.into_string()),
})
.collect::<Vec<_>>();
// control messages
let mut control = rpc_proto::ControlMessage {
let mut control = proto::ControlMessage {
ihave: Vec::new(),
iwant: Vec::new(),
graft: Vec::new(),
@ -301,20 +301,20 @@ impl From<Rpc> for rpc_proto::Rpc {
topic_hash,
message_ids,
} => {
let rpc_ihave = rpc_proto::ControlIHave {
let rpc_ihave = proto::ControlIHave {
topic_id: Some(topic_hash.into_string()),
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.ihave.push(rpc_ihave);
}
ControlAction::IWant { message_ids } => {
let rpc_iwant = rpc_proto::ControlIWant {
let rpc_iwant = proto::ControlIWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.iwant.push(rpc_iwant);
}
ControlAction::Graft { topic_hash } => {
let rpc_graft = rpc_proto::ControlGraft {
let rpc_graft = proto::ControlGraft {
topic_id: Some(topic_hash.into_string()),
};
control.graft.push(rpc_graft);
@ -324,11 +324,11 @@ impl From<Rpc> for rpc_proto::Rpc {
peers,
backoff,
} => {
let rpc_prune = rpc_proto::ControlPrune {
let rpc_prune = proto::ControlPrune {
topic_id: Some(topic_hash.into_string()),
peers: peers
.into_iter()
.map(|info| rpc_proto::PeerInfo {
.map(|info| proto::PeerInfo {
peer_id: info.peer_id.map(|id| id.to_bytes()),
/// TODO, see https://github.com/libp2p/specs/pull/217
signed_peer_record: None,
@ -341,7 +341,7 @@ impl From<Rpc> for rpc_proto::Rpc {
}
}
rpc_proto::Rpc {
proto::RPC {
subscriptions,
publish,
control: if empty_control_msg {