mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-10 16:41:21 +00:00
core/: Redesign upgrade::{write_one, write_with_len_prefix, read_one}
(#2111)
1. Deprecating the `write_one` function Semantically, this function is a composition of `write_with_len_prefix` and `io.close()`. This represents a footgun because the `close` functionality is not obvious and only mentioned in the docs. Using this function multiple times on a single substream will produces hard to debug behaviour. 2. Deprecating `read_one` and `write_with_len_prefix` functions 3. Introducing `write_length_prefixed` and `read_length_prefixed` - These functions are symmetric and do exactly what you would expect, just writing to the socket without closing - They also have a symmetric interface (no more custom errors, just `io::Error`) Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@ -6,6 +6,12 @@
|
||||
|
||||
- Implement `Keypair::from_protobuf_encoding` for ed25519 keys (see [PR 2090]).
|
||||
|
||||
- Deprecate `upgrade::write_one`.
|
||||
Deprecate `upgrade::write_with_len_prefix`.
|
||||
Deprecate `upgrade::read_one`.
|
||||
Introduce `upgrade::read_length_prefixed` and `upgrade::write_length_prefixed`.
|
||||
See [PR 2111](https://github.com/libp2p/rust-libp2p/pull/2111).
|
||||
|
||||
[PR 2090]: https://github.com/libp2p/rust-libp2p/pull/2090
|
||||
|
||||
# 0.28.3 [2021-04-26]
|
||||
|
@ -80,8 +80,10 @@ pub use self::{
|
||||
map::{MapInboundUpgrade, MapOutboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgradeErr},
|
||||
optional::OptionalUpgrade,
|
||||
select::SelectUpgrade,
|
||||
transfer::{write_one, write_with_len_prefix, write_varint, read_one, ReadOneError, read_varint},
|
||||
transfer::{write_length_prefixed, write_varint, read_length_prefixed, read_varint},
|
||||
};
|
||||
#[allow(deprecated)]
|
||||
pub use self::transfer::ReadOneError;
|
||||
|
||||
/// Types serving as protocol names.
|
||||
///
|
||||
|
@ -28,18 +28,20 @@ use std::iter;
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # use libp2p_core::transport::{Transport, MemoryTransport};
|
||||
/// # use libp2p_core::upgrade;
|
||||
/// # use libp2p_core::transport::{Transport, MemoryTransport, memory::Channel};
|
||||
/// # use libp2p_core::{upgrade, Negotiated};
|
||||
/// # use std::io;
|
||||
/// # use futures::AsyncWriteExt;
|
||||
/// let _transport = MemoryTransport::default()
|
||||
/// .and_then(move |out, cp| {
|
||||
/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint| async move {
|
||||
/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock: Negotiated<Channel<Vec<u8>>>, endpoint| async move {
|
||||
/// if endpoint.is_dialer() {
|
||||
/// upgrade::write_one(&mut sock, "some handshake data").await?;
|
||||
/// upgrade::write_length_prefixed(&mut sock, "some handshake data").await?;
|
||||
/// sock.close().await?;
|
||||
/// } else {
|
||||
/// let handshake_data = upgrade::read_one(&mut sock, 1024).await?;
|
||||
/// let handshake_data = upgrade::read_length_prefixed(&mut sock, 1024).await?;
|
||||
/// if handshake_data != b"some handshake data" {
|
||||
/// return Err(upgrade::ReadOneError::from(io::Error::from(io::ErrorKind::Other)));
|
||||
/// return Err(io::Error::new(io::ErrorKind::Other, "bad handshake"));
|
||||
/// }
|
||||
/// }
|
||||
/// Ok(sock)
|
||||
|
@ -25,10 +25,27 @@ use std::{error, fmt, io};
|
||||
|
||||
// TODO: these methods could be on an Ext trait to AsyncWrite
|
||||
|
||||
/// Writes a message to the given socket with a length prefix appended to it. Also flushes the socket.
|
||||
///
|
||||
/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is
|
||||
/// > compatible with what [`read_length_prefixed`] expects.
|
||||
pub async fn write_length_prefixed(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>)
|
||||
-> Result<(), io::Error>
|
||||
{
|
||||
write_varint(socket, data.as_ref().len()).await?;
|
||||
socket.write_all(data.as_ref()).await?;
|
||||
socket.flush().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a message to the given socket, then shuts down the writing side.
|
||||
///
|
||||
/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is
|
||||
/// > compatible with what `read_one` expects.
|
||||
///
|
||||
#[deprecated(since = "0.29.0", note = "Use `write_length_prefixed` instead. You will need to manually close the stream using `socket.close().await`.")]
|
||||
#[allow(dead_code)]
|
||||
pub async fn write_one(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>)
|
||||
-> Result<(), io::Error>
|
||||
{
|
||||
@ -42,6 +59,8 @@ pub async fn write_one(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<
|
||||
///
|
||||
/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is
|
||||
/// > compatible with what `read_one` expects.
|
||||
#[deprecated(since = "0.29.0", note = "Use `write_length_prefixed` instead.")]
|
||||
#[allow(dead_code)]
|
||||
pub async fn write_with_len_prefix(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>)
|
||||
-> Result<(), io::Error>
|
||||
{
|
||||
@ -60,6 +79,7 @@ pub async fn write_varint(socket: &mut (impl AsyncWrite + Unpin), len: usize)
|
||||
let mut len_data = unsigned_varint::encode::usize_buffer();
|
||||
let encoded_len = unsigned_varint::encode::usize(len, &mut len_data).len();
|
||||
socket.write_all(&len_data[..encoded_len]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -106,6 +126,27 @@ pub async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result<usize,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads a length-prefixed message from the given socket.
|
||||
///
|
||||
/// The `max_size` parameter is the maximum size in bytes of the message that we accept. This is
|
||||
/// necessary in order to avoid DoS attacks where the remote sends us a message of several
|
||||
/// gigabytes.
|
||||
///
|
||||
/// > **Note**: Assumes that a variable-length prefix indicates the length of the message. This is
|
||||
/// > compatible with what [`write_length_prefixed`] does.
|
||||
pub async fn read_length_prefixed(socket: &mut (impl AsyncRead + Unpin), max_size: usize) -> io::Result<Vec<u8>>
|
||||
{
|
||||
let len = read_varint(socket).await?;
|
||||
if len > max_size {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidData, format!("Received data size ({} bytes) exceeds maximum ({} bytes)", len, max_size)))
|
||||
}
|
||||
|
||||
let mut buf = vec![0; len];
|
||||
socket.read_exact(&mut buf).await?;
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Reads a length-prefixed message from the given socket.
|
||||
///
|
||||
/// The `max_size` parameter is the maximum size in bytes of the message that we accept. This is
|
||||
@ -114,6 +155,8 @@ pub async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result<usize,
|
||||
///
|
||||
/// > **Note**: Assumes that a variable-length prefix indicates the length of the message. This is
|
||||
/// > compatible with what `write_one` does.
|
||||
#[deprecated(since = "0.29.0", note = "Use `read_length_prefixed` instead.")]
|
||||
#[allow(dead_code, deprecated)]
|
||||
pub async fn read_one(socket: &mut (impl AsyncRead + Unpin), max_size: usize)
|
||||
-> Result<Vec<u8>, ReadOneError>
|
||||
{
|
||||
@ -132,6 +175,7 @@ pub async fn read_one(socket: &mut (impl AsyncRead + Unpin), max_size: usize)
|
||||
|
||||
/// Error while reading one message.
|
||||
#[derive(Debug)]
|
||||
#[deprecated(since = "0.29.0", note = "Use `read_length_prefixed` instead of `read_one` to avoid depending on this type.")]
|
||||
pub enum ReadOneError {
|
||||
/// Error on the socket.
|
||||
Io(std::io::Error),
|
||||
@ -144,12 +188,14 @@ pub enum ReadOneError {
|
||||
},
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
impl From<std::io::Error> for ReadOneError {
|
||||
fn from(err: std::io::Error) -> ReadOneError {
|
||||
ReadOneError::Io(err)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
impl fmt::Display for ReadOneError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
@ -159,6 +205,7 @@ impl fmt::Display for ReadOneError {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
impl error::Error for ReadOneError {
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
match *self {
|
||||
@ -173,15 +220,18 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn write_one_works() {
|
||||
fn write_length_prefixed_works() {
|
||||
let data = (0..rand::random::<usize>() % 10_000)
|
||||
.map(|_| rand::random::<u8>())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut out = vec![0; 10_000];
|
||||
futures::executor::block_on(
|
||||
write_one(&mut futures::io::Cursor::new(&mut out[..]), data.clone())
|
||||
).unwrap();
|
||||
|
||||
futures::executor::block_on(async {
|
||||
let mut socket = futures::io::Cursor::new(&mut out[..]);
|
||||
|
||||
write_length_prefixed(&mut socket, &data).await.unwrap();
|
||||
socket.close().await.unwrap();
|
||||
});
|
||||
|
||||
let (out_len, out_data) = unsigned_varint::decode::usize(&out).unwrap();
|
||||
assert_eq!(out_len, data.len());
|
||||
|
@ -2,6 +2,11 @@
|
||||
|
||||
- Update dependencies.
|
||||
|
||||
- Change `FloodsubDecodeError::ReadError` from a `upgrade::ReadOneError` to
|
||||
`std::io::Error`. See [PR 2111].
|
||||
|
||||
[PR 2111]: https://github.com/libp2p/rust-libp2p/pull/2111
|
||||
|
||||
# 0.29.0 [2021-04-13]
|
||||
|
||||
- Update `libp2p-swarm`.
|
||||
|
@ -23,7 +23,7 @@ use crate::topic::Topic;
|
||||
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade};
|
||||
use prost::Message;
|
||||
use std::{error, fmt, io, iter, pin::Pin};
|
||||
use futures::{Future, io::{AsyncRead, AsyncWrite}};
|
||||
use futures::{Future, io::{AsyncRead, AsyncWrite}, AsyncWriteExt};
|
||||
|
||||
/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
@ -55,7 +55,7 @@ where
|
||||
|
||||
fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
let packet = upgrade::read_one(&mut socket, 2048).await?;
|
||||
let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?;
|
||||
let rpc = rpc_proto::Rpc::decode(&packet[..])?;
|
||||
|
||||
let mut messages = Vec::with_capacity(rpc.publish.len());
|
||||
@ -95,15 +95,15 @@ where
|
||||
#[derive(Debug)]
|
||||
pub enum FloodsubDecodeError {
|
||||
/// Error when reading the packet from the socket.
|
||||
ReadError(upgrade::ReadOneError),
|
||||
ReadError(io::Error),
|
||||
/// Error when decoding the raw buffer into a protobuf.
|
||||
ProtobufError(prost::DecodeError),
|
||||
/// Error when parsing the `PeerId` in the message.
|
||||
InvalidPeerId,
|
||||
}
|
||||
|
||||
impl From<upgrade::ReadOneError> for FloodsubDecodeError {
|
||||
fn from(err: upgrade::ReadOneError) -> Self {
|
||||
impl From<io::Error> for FloodsubDecodeError {
|
||||
fn from(err: io::Error) -> Self {
|
||||
FloodsubDecodeError::ReadError(err)
|
||||
}
|
||||
}
|
||||
@ -166,7 +166,10 @@ where
|
||||
fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
let bytes = self.into_bytes();
|
||||
upgrade::write_one(&mut socket, bytes).await?;
|
||||
|
||||
upgrade::write_length_prefixed(&mut socket, bytes).await?;
|
||||
socket.close().await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
@ -186,7 +186,11 @@ where
|
||||
|
||||
let mut bytes = Vec::with_capacity(message.encoded_len());
|
||||
message.encode(&mut bytes).expect("Vec<u8> provides capacity as needed");
|
||||
upgrade::write_one(&mut io, &bytes).await
|
||||
|
||||
upgrade::write_length_prefixed(&mut io, bytes).await?;
|
||||
io.close().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv<T>(mut socket: T) -> io::Result<IdentifyInfo>
|
||||
@ -195,7 +199,7 @@ where
|
||||
{
|
||||
socket.close().await?;
|
||||
|
||||
let msg = upgrade::read_one(&mut socket, 4096)
|
||||
let msg = upgrade::read_length_prefixed(&mut socket, 4096)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
|
||||
.await?;
|
||||
|
||||
|
@ -27,13 +27,13 @@ use libp2p_core::{
|
||||
identity,
|
||||
muxing::StreamMuxerBox,
|
||||
transport::{self, Transport},
|
||||
upgrade::{self, read_one, write_one}
|
||||
upgrade::{self, read_length_prefixed, write_length_prefixed}
|
||||
};
|
||||
use libp2p_noise::{NoiseConfig, X25519Spec, Keypair};
|
||||
use libp2p_request_response::*;
|
||||
use libp2p_swarm::{Swarm, SwarmEvent};
|
||||
use libp2p_tcp::TcpConfig;
|
||||
use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt};
|
||||
use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt, AsyncWriteExt};
|
||||
use rand::{self, Rng};
|
||||
use std::{io, iter};
|
||||
use std::{collections::HashSet, num::NonZeroU16};
|
||||
@ -421,13 +421,13 @@ impl RequestResponseCodec for PingCodec {
|
||||
where
|
||||
T: AsyncRead + Unpin + Send
|
||||
{
|
||||
read_one(io, 1024)
|
||||
.map(|res| match res {
|
||||
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
|
||||
Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()),
|
||||
Ok(vec) => Ok(Ping(vec))
|
||||
})
|
||||
.await
|
||||
let vec = read_length_prefixed(io, 1024).await?;
|
||||
|
||||
if vec.is_empty() {
|
||||
return Err(io::ErrorKind::UnexpectedEof.into())
|
||||
}
|
||||
|
||||
Ok(Ping(vec))
|
||||
}
|
||||
|
||||
async fn read_response<T>(&mut self, _: &PingProtocol, io: &mut T)
|
||||
@ -435,13 +435,13 @@ impl RequestResponseCodec for PingCodec {
|
||||
where
|
||||
T: AsyncRead + Unpin + Send
|
||||
{
|
||||
read_one(io, 1024)
|
||||
.map(|res| match res {
|
||||
Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
|
||||
Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()),
|
||||
Ok(vec) => Ok(Pong(vec))
|
||||
})
|
||||
.await
|
||||
let vec = read_length_prefixed(io, 1024).await?;
|
||||
|
||||
if vec.is_empty() {
|
||||
return Err(io::ErrorKind::UnexpectedEof.into())
|
||||
}
|
||||
|
||||
Ok(Pong(vec))
|
||||
}
|
||||
|
||||
async fn write_request<T>(&mut self, _: &PingProtocol, io: &mut T, Ping(data): Ping)
|
||||
@ -449,7 +449,10 @@ impl RequestResponseCodec for PingCodec {
|
||||
where
|
||||
T: AsyncWrite + Unpin + Send
|
||||
{
|
||||
write_one(io, data).await
|
||||
write_length_prefixed(io, data).await?;
|
||||
io.close().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_response<T>(&mut self, _: &PingProtocol, io: &mut T, Pong(data): Pong)
|
||||
@ -457,6 +460,9 @@ impl RequestResponseCodec for PingCodec {
|
||||
where
|
||||
T: AsyncWrite + Unpin + Send
|
||||
{
|
||||
write_one(io, data).await
|
||||
write_length_prefixed(io, data).await?;
|
||||
io.close().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user