Replace protobuf crate with prost! (#1390)

* Replace protobuf crate with prost!

* Add copyright headers to build.rs files.

* kad: Fix error when mapping connection types.

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Fix more mapping mistakes.

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Toralf Wittner
2020-01-15 12:02:02 +01:00
committed by Pierre Krieger
parent 9d2df148cd
commit 680c467f7e
52 changed files with 600 additions and 5836 deletions

View File

@ -38,7 +38,7 @@ use futures::prelude::*;
use futures_codec::Framed;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use protobuf::{self, Message};
use prost::Message;
use std::{borrow::Cow, convert::TryFrom, time::Duration};
use std::{io, iter};
use unsigned_varint::codec;
@ -57,30 +57,26 @@ pub enum KadConnectionType {
CannotConnect = 3,
}
impl From<proto::Message_ConnectionType> for KadConnectionType {
fn from(raw: proto::Message_ConnectionType) -> KadConnectionType {
use proto::Message_ConnectionType::{
CAN_CONNECT, CANNOT_CONNECT, CONNECTED, NOT_CONNECTED
};
impl From<proto::message::ConnectionType> for KadConnectionType {
fn from(raw: proto::message::ConnectionType) -> KadConnectionType {
use proto::message::ConnectionType::*;
match raw {
NOT_CONNECTED => KadConnectionType::NotConnected,
CONNECTED => KadConnectionType::Connected,
CAN_CONNECT => KadConnectionType::CanConnect,
CANNOT_CONNECT => KadConnectionType::CannotConnect,
NotConnected => KadConnectionType::NotConnected,
Connected => KadConnectionType::Connected,
CanConnect => KadConnectionType::CanConnect,
CannotConnect => KadConnectionType::CannotConnect,
}
}
}
impl Into<proto::Message_ConnectionType> for KadConnectionType {
fn into(self) -> proto::Message_ConnectionType {
use proto::Message_ConnectionType::{
CAN_CONNECT, CANNOT_CONNECT, CONNECTED, NOT_CONNECTED
};
impl Into<proto::message::ConnectionType> for KadConnectionType {
fn into(self) -> proto::message::ConnectionType {
use proto::message::ConnectionType::*;
match self {
KadConnectionType::NotConnected => NOT_CONNECTED,
KadConnectionType::Connected => CONNECTED,
KadConnectionType::CanConnect => CAN_CONNECT,
KadConnectionType::CannotConnect => CANNOT_CONNECT,
KadConnectionType::NotConnected => NotConnected,
KadConnectionType::Connected => Connected,
KadConnectionType::CanConnect => CanConnect,
KadConnectionType::CannotConnect => CannotConnect,
}
}
}
@ -97,23 +93,25 @@ pub struct KadPeer {
}
// Builds a `KadPeer` from a corresponding protobuf message.
impl TryFrom<&mut proto::Message_Peer> for KadPeer {
impl TryFrom<proto::message::Peer> for KadPeer {
type Error = io::Error;
fn try_from(peer: &mut proto::Message_Peer) -> Result<KadPeer, Self::Error> {
fn try_from(peer: proto::message::Peer) -> Result<KadPeer, Self::Error> {
// TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or
// as a special case here
let node_id = PeerId::from_bytes(peer.get_id().to_vec())
let node_id = PeerId::from_bytes(peer.id)
.map_err(|_| invalid_data("invalid peer id"))?;
let mut addrs = Vec::with_capacity(peer.get_addrs().len());
for addr in peer.take_addrs().into_iter() {
let mut addrs = Vec::with_capacity(peer.addrs.len());
for addr in peer.addrs.into_iter() {
let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?;
addrs.push(as_ma);
}
debug_assert_eq!(addrs.len(), addrs.capacity());
let connection_ty = peer.get_connection().into();
let connection_ty = proto::message::ConnectionType::from_i32(peer.connection)
.ok_or_else(|| invalid_data("unknown connection type"))?
.into();
Ok(KadPeer {
node_id,
@ -123,15 +121,16 @@ impl TryFrom<&mut proto::Message_Peer> for KadPeer {
}
}
impl Into<proto::Message_Peer> for KadPeer {
fn into(self) -> proto::Message_Peer {
let mut out = proto::Message_Peer::new();
out.set_id(self.node_id.into_bytes());
for addr in self.multiaddrs {
out.mut_addrs().push(addr.to_vec());
impl Into<proto::message::Peer> for KadPeer {
fn into(self) -> proto::message::Peer {
proto::message::Peer {
id: self.node_id.into_bytes(),
addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
connection: {
let ct: proto::message::ConnectionType = self.connection_ty.into();
ct as i32
}
}
out.set_connection(self.connection_ty.into());
out
}
}
@ -188,12 +187,12 @@ where
.err_into()
.with::<_, _, fn(_) -> _, _>(|response| {
let proto_struct = resp_msg_to_proto(response);
future::ready(proto_struct.write_to_bytes()
.map(io::Cursor::new)
.map_err(invalid_data))
let mut buf = Vec::with_capacity(proto_struct.encoded_len());
proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
future::ready(Ok(io::Cursor::new(buf)))
})
.and_then::<_, fn(_) -> _>(|bytes| {
let request = match protobuf::parse_from_bytes(&bytes) {
let request = match proto::Message::decode(bytes) {
Ok(r) => r,
Err(err) => return future::ready(Err(err.into()))
};
@ -220,12 +219,12 @@ where
.err_into()
.with::<_, _, fn(_) -> _, _>(|request| {
let proto_struct = req_msg_to_proto(request);
future::ready(proto_struct.write_to_bytes()
.map(io::Cursor::new)
.map_err(invalid_data))
let mut buf = Vec::with_capacity(proto_struct.encoded_len());
proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
future::ready(Ok(io::Cursor::new(buf)))
})
.and_then::<_, fn(_) -> _>(|bytes| {
let response = match protobuf::parse_from_bytes(&bytes) {
let response = match proto::Message::decode(bytes) {
Ok(r) => r,
Err(err) => return future::ready(Err(err.into()))
};
@ -333,46 +332,39 @@ pub enum KadResponseMsg {
/// Converts a `KadRequestMsg` into the corresponding protobuf message for sending.
fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
match kad_msg {
KadRequestMsg::Ping => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::PING);
msg
}
KadRequestMsg::FindNode { key } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::FIND_NODE);
msg.set_key(key);
msg.set_clusterLevelRaw(10);
msg
}
KadRequestMsg::GetProviders { key } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::GET_PROVIDERS);
msg.set_key(key.to_vec());
msg.set_clusterLevelRaw(10);
msg
}
KadRequestMsg::AddProvider { key, provider } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::ADD_PROVIDER);
msg.set_clusterLevelRaw(10);
msg.set_key(key.to_vec());
msg.mut_providerPeers().push(provider.into());
msg
}
KadRequestMsg::GetValue { key } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::GET_VALUE);
msg.set_clusterLevelRaw(10);
msg.set_key(key.to_vec());
msg
}
KadRequestMsg::PutValue { record } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::PUT_VALUE);
msg.set_record(record_to_proto(record));
msg
KadRequestMsg::Ping => proto::Message {
r#type: proto::message::MessageType::Ping as i32,
.. proto::Message::default()
},
KadRequestMsg::FindNode { key } => proto::Message {
r#type: proto::message::MessageType::FindNode as i32,
key,
cluster_level_raw: 10,
.. proto::Message::default()
},
KadRequestMsg::GetProviders { key } => proto::Message {
r#type: proto::message::MessageType::GetProviders as i32,
key: key.to_vec(),
cluster_level_raw: 10,
.. proto::Message::default()
},
KadRequestMsg::AddProvider { key, provider } => proto::Message {
r#type: proto::message::MessageType::AddProvider as i32,
cluster_level_raw: 10,
key: key.to_vec(),
provider_peers: vec![provider.into()],
.. proto::Message::default()
},
KadRequestMsg::GetValue { key } => proto::Message {
r#type: proto::message::MessageType::GetValue as i32,
cluster_level_raw: 10,
key: key.to_vec(),
.. proto::Message::default()
},
KadRequestMsg::PutValue { record } => proto::Message {
r#type: proto::message::MessageType::PutValue as i32,
record: Some(record_to_proto(record)),
.. proto::Message::default()
}
}
}
@ -380,65 +372,39 @@ fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
/// Converts a `KadResponseMsg` into the corresponding protobuf message for sending.
fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
match kad_msg {
KadResponseMsg::Pong => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::PING);
msg
}
KadResponseMsg::FindNode { closer_peers } => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::FIND_NODE);
msg.set_clusterLevelRaw(9);
for peer in closer_peers {
msg.mut_closerPeers().push(peer.into());
}
msg
}
KadResponseMsg::GetProviders {
closer_peers,
provider_peers,
} => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::GET_PROVIDERS);
msg.set_clusterLevelRaw(9);
for peer in closer_peers {
msg.mut_closerPeers().push(peer.into());
}
for peer in provider_peers {
msg.mut_providerPeers().push(peer.into());
}
msg
}
KadResponseMsg::GetValue {
record,
closer_peers,
} => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::GET_VALUE);
msg.set_clusterLevelRaw(9);
for peer in closer_peers {
msg.mut_closerPeers().push(peer.into());
}
if let Some(record) = record {
msg.set_record(record_to_proto(record));
}
msg
}
KadResponseMsg::PutValue {
key,
value,
} => {
let mut msg = proto::Message::new();
msg.set_field_type(proto::Message_MessageType::PUT_VALUE);
msg.set_key(key.to_vec());
let mut record = proto::Record::new();
record.set_key(key.to_vec());
record.set_value(value);
msg.set_record(record);
msg
KadResponseMsg::Pong => proto::Message {
r#type: proto::message::MessageType::Ping as i32,
.. proto::Message::default()
},
KadResponseMsg::FindNode { closer_peers } => proto::Message {
r#type: proto::message::MessageType::FindNode as i32,
cluster_level_raw: 9,
closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
.. proto::Message::default()
},
KadResponseMsg::GetProviders { closer_peers, provider_peers } => proto::Message {
r#type: proto::message::MessageType::GetProviders as i32,
cluster_level_raw: 9,
closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
provider_peers: provider_peers.into_iter().map(KadPeer::into).collect(),
.. proto::Message::default()
},
KadResponseMsg::GetValue { record, closer_peers } => proto::Message {
r#type: proto::message::MessageType::GetValue as i32,
cluster_level_raw: 9,
closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
record: record.map(record_to_proto),
.. proto::Message::default()
},
KadResponseMsg::PutValue { key, value } => proto::Message {
r#type: proto::message::MessageType::PutValue as i32,
key: key.to_vec(),
record: Some(proto::Record {
key: key.to_vec(),
value,
.. proto::Record::default()
}),
.. proto::Message::default()
}
}
}
@ -446,44 +412,38 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
/// Converts a received protobuf message into a corresponding `KadRequestMsg`.
///
/// Fails if the protobuf message is not a valid and supported Kademlia request message.
fn proto_to_req_msg(mut message: proto::Message) -> Result<KadRequestMsg, io::Error> {
match message.get_field_type() {
proto::Message_MessageType::PING => Ok(KadRequestMsg::Ping),
fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
let msg_type = proto::message::MessageType::from_i32(message.r#type)
.ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
proto::Message_MessageType::PUT_VALUE => {
let record = record_from_proto(message.take_record())?;
match msg_type {
proto::message::MessageType::Ping => Ok(KadRequestMsg::Ping),
proto::message::MessageType::PutValue => {
let record = record_from_proto(message.record.unwrap_or_default())?;
Ok(KadRequestMsg::PutValue { record })
}
proto::Message_MessageType::GET_VALUE => {
let key = record::Key::from(message.take_key());
Ok(KadRequestMsg::GetValue { key })
proto::message::MessageType::GetValue => {
Ok(KadRequestMsg::GetValue { key: record::Key::from(message.key) })
}
proto::Message_MessageType::FIND_NODE => {
let key = message.take_key();
Ok(KadRequestMsg::FindNode { key })
proto::message::MessageType::FindNode => {
Ok(KadRequestMsg::FindNode { key: message.key })
}
proto::Message_MessageType::GET_PROVIDERS => {
let key = record::Key::from(message.take_key());
Ok(KadRequestMsg::GetProviders { key })
proto::message::MessageType::GetProviders => {
Ok(KadRequestMsg::GetProviders { key: record::Key::from(message.key)})
}
proto::Message_MessageType::ADD_PROVIDER => {
proto::message::MessageType::AddProvider => {
// TODO: for now we don't parse the peer properly, so it is possible that we get
// parsing errors for peers even when they are valid; we ignore these
// errors for now, but ultimately we should just error altogether
let provider = message
.mut_providerPeers()
.iter_mut()
let provider = message.provider_peers
.into_iter()
.find_map(|peer| KadPeer::try_from(peer).ok());
if let Some(provider) = provider {
let key = record::Key::from(message.take_key());
let key = record::Key::from(message.key);
Ok(KadRequestMsg::AddProvider { key, provider })
} else {
Err(invalid_data("ADD_PROVIDER message with no valid peer."))
Err(invalid_data("AddProvider message with no valid peer."))
}
}
}
@ -492,49 +452,43 @@ fn proto_to_req_msg(mut message: proto::Message) -> Result<KadRequestMsg, io::Er
/// Converts a received protobuf message into a corresponding `KadResponseMessage`.
///
/// Fails if the protobuf message is not a valid and supported Kademlia response message.
fn proto_to_resp_msg(mut message: proto::Message) -> Result<KadResponseMsg, io::Error> {
match message.get_field_type() {
proto::Message_MessageType::PING => Ok(KadResponseMsg::Pong),
fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
let msg_type = proto::message::MessageType::from_i32(message.r#type)
.ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
proto::Message_MessageType::GET_VALUE => {
match msg_type {
proto::message::MessageType::Ping => Ok(KadResponseMsg::Pong),
proto::message::MessageType::GetValue => {
let record =
if message.has_record() {
Some(record_from_proto(message.take_record())?)
if let Some(r) = message.record {
Some(record_from_proto(r)?)
} else {
None
};
let closer_peers = message
.mut_closerPeers()
.iter_mut()
let closer_peers = message.closer_peers.into_iter()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect::<Vec<_>>();
.collect();
Ok(KadResponseMsg::GetValue { record, closer_peers })
},
}
proto::Message_MessageType::FIND_NODE => {
let closer_peers = message
.mut_closerPeers()
.iter_mut()
proto::message::MessageType::FindNode => {
let closer_peers = message.closer_peers.into_iter()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect::<Vec<_>>();
.collect();
Ok(KadResponseMsg::FindNode { closer_peers })
}
proto::Message_MessageType::GET_PROVIDERS => {
let closer_peers = message
.mut_closerPeers()
.iter_mut()
proto::message::MessageType::GetProviders => {
let closer_peers = message.closer_peers.into_iter()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect::<Vec<_>>();
.collect();
let provider_peers = message
.mut_providerPeers()
.iter_mut()
let provider_peers = message.provider_peers.into_iter()
.filter_map(|peer| KadPeer::try_from(peer).ok())
.collect::<Vec<_>>();
.collect();
Ok(KadResponseMsg::GetProviders {
closer_peers,
@ -542,31 +496,30 @@ fn proto_to_resp_msg(mut message: proto::Message) -> Result<KadResponseMsg, io::
})
}
proto::Message_MessageType::PUT_VALUE => {
let key = record::Key::from(message.take_key());
if !message.has_record() {
return Err(invalid_data("received PUT_VALUE message with no record"));
}
proto::message::MessageType::PutValue => {
let key = record::Key::from(message.key);
let rec = message.record.ok_or_else(|| {
invalid_data("received PutValue message with no record")
})?;
let mut record = message.take_record();
Ok(KadResponseMsg::PutValue {
key,
value: record.take_value(),
value: rec.value
})
}
proto::Message_MessageType::ADD_PROVIDER =>
Err(invalid_data("received an unexpected ADD_PROVIDER message"))
proto::message::MessageType::AddProvider =>
Err(invalid_data("received an unexpected AddProvider message"))
}
}
fn record_from_proto(mut record: proto::Record) -> Result<Record, io::Error> {
let key = record::Key::from(record.take_key());
let value = record.take_value();
fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
let key = record::Key::from(record.key);
let value = record.value;
let publisher =
if record.publisher.len() > 0 {
PeerId::from_bytes(record.take_publisher())
if !record.publisher.is_empty() {
PeerId::from_bytes(record.publisher)
.map(Some)
.map_err(|_| invalid_data("Invalid publisher peer ID."))?
} else {
@ -584,22 +537,22 @@ fn record_from_proto(mut record: proto::Record) -> Result<Record, io::Error> {
}
fn record_to_proto(record: Record) -> proto::Record {
let mut pb_record = proto::Record::new();
pb_record.key = record.key.to_vec();
pb_record.value = record.value;
if let Some(p) = record.publisher {
pb_record.publisher = p.into_bytes();
proto::Record {
key: record.key.to_vec(),
value: record.value,
publisher: record.publisher.map(PeerId::into_bytes).unwrap_or_default(),
ttl: record.expires
.map(|t| {
let now = Instant::now();
if t > now {
(t - now).as_secs() as u32
} else {
1 // because 0 means "does not expire"
}
})
.unwrap_or(0),
time_received: String::new()
}
if let Some(t) = record.expires {
let now = Instant::now();
if t > now {
pb_record.ttl = (t - now).as_secs() as u32;
} else {
pb_record.ttl = 1; // because 0 means "does not expire"
}
}
pb_record
}
/// Creates an `io::Error` with `io::ErrorKind::InvalidData`.