diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 56d90b3c..46177efe 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -6,6 +6,12 @@ - Implement `Keypair::from_protobuf_encoding` for ed25519 keys (see [PR 2090]). +- Deprecate `upgrade::write_one`. + Deprecate `upgrade::write_with_len_prefix`. + Deprecate `upgrade::read_one`. + Introduce `upgrade::read_length_prefixed` and `upgrade::write_length_prefixed`. + See [PR 2111](https://github.com/libp2p/rust-libp2p/pull/2111). + [PR 2090]: https://github.com/libp2p/rust-libp2p/pull/2090 # 0.28.3 [2021-04-26] diff --git a/core/src/upgrade.rs b/core/src/upgrade.rs index 9798ae6c..376cbfc1 100644 --- a/core/src/upgrade.rs +++ b/core/src/upgrade.rs @@ -80,8 +80,10 @@ pub use self::{ map::{MapInboundUpgrade, MapOutboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgradeErr}, optional::OptionalUpgrade, select::SelectUpgrade, - transfer::{write_one, write_with_len_prefix, write_varint, read_one, ReadOneError, read_varint}, + transfer::{write_length_prefixed, write_varint, read_length_prefixed, read_varint}, }; +#[allow(deprecated)] +pub use self::transfer::ReadOneError; /// Types serving as protocol names. /// diff --git a/core/src/upgrade/from_fn.rs b/core/src/upgrade/from_fn.rs index c6ef52c1..0c8947e5 100644 --- a/core/src/upgrade/from_fn.rs +++ b/core/src/upgrade/from_fn.rs @@ -28,18 +28,20 @@ use std::iter; /// # Example /// /// ``` -/// # use libp2p_core::transport::{Transport, MemoryTransport}; -/// # use libp2p_core::upgrade; +/// # use libp2p_core::transport::{Transport, MemoryTransport, memory::Channel}; +/// # use libp2p_core::{upgrade, Negotiated}; /// # use std::io; +/// # use futures::AsyncWriteExt; /// let _transport = MemoryTransport::default() /// .and_then(move |out, cp| { -/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock, endpoint| async move { +/// upgrade::apply(out, upgrade::from_fn("/foo/1", move |mut sock: Negotiated>>, endpoint| async move { /// if endpoint.is_dialer() { -/// upgrade::write_one(&mut sock, "some handshake data").await?; +/// upgrade::write_length_prefixed(&mut sock, "some handshake data").await?; +/// sock.close().await?; /// } else { -/// let handshake_data = upgrade::read_one(&mut sock, 1024).await?; +/// let handshake_data = upgrade::read_length_prefixed(&mut sock, 1024).await?; /// if handshake_data != b"some handshake data" { -/// return Err(upgrade::ReadOneError::from(io::Error::from(io::ErrorKind::Other))); +/// return Err(io::Error::new(io::ErrorKind::Other, "bad handshake")); /// } /// } /// Ok(sock) diff --git a/core/src/upgrade/transfer.rs b/core/src/upgrade/transfer.rs index 28a9c298..500ece52 100644 --- a/core/src/upgrade/transfer.rs +++ b/core/src/upgrade/transfer.rs @@ -25,10 +25,27 @@ use std::{error, fmt, io}; // TODO: these methods could be on an Ext trait to AsyncWrite +/// Writes a message to the given socket with a length prefix appended to it. Also flushes the socket. +/// +/// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is +/// > compatible with what [`read_length_prefixed`] expects. +pub async fn write_length_prefixed(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>) + -> Result<(), io::Error> +{ + write_varint(socket, data.as_ref().len()).await?; + socket.write_all(data.as_ref()).await?; + socket.flush().await?; + + Ok(()) +} + /// Send a message to the given socket, then shuts down the writing side. /// /// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is /// > compatible with what `read_one` expects. +/// +#[deprecated(since = "0.29.0", note = "Use `write_length_prefixed` instead. You will need to manually close the stream using `socket.close().await`.")] +#[allow(dead_code)] pub async fn write_one(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>) -> Result<(), io::Error> { @@ -42,6 +59,8 @@ pub async fn write_one(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef< /// /// > **Note**: Prepends a variable-length prefix indicate the length of the message. This is /// > compatible with what `read_one` expects. +#[deprecated(since = "0.29.0", note = "Use `write_length_prefixed` instead.")] +#[allow(dead_code)] pub async fn write_with_len_prefix(socket: &mut (impl AsyncWrite + Unpin), data: impl AsRef<[u8]>) -> Result<(), io::Error> { @@ -60,6 +79,7 @@ pub async fn write_varint(socket: &mut (impl AsyncWrite + Unpin), len: usize) let mut len_data = unsigned_varint::encode::usize_buffer(); let encoded_len = unsigned_varint::encode::usize(len, &mut len_data).len(); socket.write_all(&len_data[..encoded_len]).await?; + Ok(()) } @@ -106,6 +126,27 @@ pub async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result **Note**: Assumes that a variable-length prefix indicates the length of the message. This is +/// > compatible with what [`write_length_prefixed`] does. +pub async fn read_length_prefixed(socket: &mut (impl AsyncRead + Unpin), max_size: usize) -> io::Result> +{ + let len = read_varint(socket).await?; + if len > max_size { + return Err(io::Error::new(io::ErrorKind::InvalidData, format!("Received data size ({} bytes) exceeds maximum ({} bytes)", len, max_size))) + } + + let mut buf = vec![0; len]; + socket.read_exact(&mut buf).await?; + + Ok(buf) +} + /// Reads a length-prefixed message from the given socket. /// /// The `max_size` parameter is the maximum size in bytes of the message that we accept. This is @@ -114,6 +155,8 @@ pub async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result **Note**: Assumes that a variable-length prefix indicates the length of the message. This is /// > compatible with what `write_one` does. +#[deprecated(since = "0.29.0", note = "Use `read_length_prefixed` instead.")] +#[allow(dead_code, deprecated)] pub async fn read_one(socket: &mut (impl AsyncRead + Unpin), max_size: usize) -> Result, ReadOneError> { @@ -132,6 +175,7 @@ pub async fn read_one(socket: &mut (impl AsyncRead + Unpin), max_size: usize) /// Error while reading one message. #[derive(Debug)] +#[deprecated(since = "0.29.0", note = "Use `read_length_prefixed` instead of `read_one` to avoid depending on this type.")] pub enum ReadOneError { /// Error on the socket. Io(std::io::Error), @@ -144,12 +188,14 @@ pub enum ReadOneError { }, } +#[allow(deprecated)] impl From for ReadOneError { fn from(err: std::io::Error) -> ReadOneError { ReadOneError::Io(err) } } +#[allow(deprecated)] impl fmt::Display for ReadOneError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { @@ -159,6 +205,7 @@ impl fmt::Display for ReadOneError { } } +#[allow(deprecated)] impl error::Error for ReadOneError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match *self { @@ -173,15 +220,18 @@ mod tests { use super::*; #[test] - fn write_one_works() { + fn write_length_prefixed_works() { let data = (0..rand::random::() % 10_000) .map(|_| rand::random::()) .collect::>(); - let mut out = vec![0; 10_000]; - futures::executor::block_on( - write_one(&mut futures::io::Cursor::new(&mut out[..]), data.clone()) - ).unwrap(); + + futures::executor::block_on(async { + let mut socket = futures::io::Cursor::new(&mut out[..]); + + write_length_prefixed(&mut socket, &data).await.unwrap(); + socket.close().await.unwrap(); + }); let (out_len, out_data) = unsigned_varint::decode::usize(&out).unwrap(); assert_eq!(out_len, data.len()); diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index e31da085..6fe20028 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -2,6 +2,11 @@ - Update dependencies. +- Change `FloodsubDecodeError::ReadError` from a `upgrade::ReadOneError` to + `std::io::Error`. See [PR 2111]. + +[PR 2111]: https://github.com/libp2p/rust-libp2p/pull/2111 + # 0.29.0 [2021-04-13] - Update `libp2p-swarm`. diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index bd0b5b2a..1b942549 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -23,7 +23,7 @@ use crate::topic::Topic; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade}; use prost::Message; use std::{error, fmt, io, iter, pin::Pin}; -use futures::{Future, io::{AsyncRead, AsyncWrite}}; +use futures::{Future, io::{AsyncRead, AsyncWrite}, AsyncWriteExt}; /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] @@ -55,7 +55,7 @@ where fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { - let packet = upgrade::read_one(&mut socket, 2048).await?; + let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?; let rpc = rpc_proto::Rpc::decode(&packet[..])?; let mut messages = Vec::with_capacity(rpc.publish.len()); @@ -95,15 +95,15 @@ where #[derive(Debug)] pub enum FloodsubDecodeError { /// Error when reading the packet from the socket. - ReadError(upgrade::ReadOneError), + ReadError(io::Error), /// Error when decoding the raw buffer into a protobuf. ProtobufError(prost::DecodeError), /// Error when parsing the `PeerId` in the message. InvalidPeerId, } -impl From for FloodsubDecodeError { - fn from(err: upgrade::ReadOneError) -> Self { +impl From for FloodsubDecodeError { + fn from(err: io::Error) -> Self { FloodsubDecodeError::ReadError(err) } } @@ -166,7 +166,10 @@ where fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { let bytes = self.into_bytes(); - upgrade::write_one(&mut socket, bytes).await?; + + upgrade::write_length_prefixed(&mut socket, bytes).await?; + socket.close().await?; + Ok(()) }) } diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 2c73a1d6..c3a323dd 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -186,7 +186,11 @@ where let mut bytes = Vec::with_capacity(message.encoded_len()); message.encode(&mut bytes).expect("Vec provides capacity as needed"); - upgrade::write_one(&mut io, &bytes).await + + upgrade::write_length_prefixed(&mut io, bytes).await?; + io.close().await?; + + Ok(()) } async fn recv(mut socket: T) -> io::Result @@ -195,7 +199,7 @@ where { socket.close().await?; - let msg = upgrade::read_one(&mut socket, 4096) + let msg = upgrade::read_length_prefixed(&mut socket, 4096) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) .await?; diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 6355e8b7..6b45ed0b 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -27,13 +27,13 @@ use libp2p_core::{ identity, muxing::StreamMuxerBox, transport::{self, Transport}, - upgrade::{self, read_one, write_one} + upgrade::{self, read_length_prefixed, write_length_prefixed} }; use libp2p_noise::{NoiseConfig, X25519Spec, Keypair}; use libp2p_request_response::*; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_tcp::TcpConfig; -use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt}; +use futures::{channel::mpsc, executor::LocalPool, prelude::*, task::SpawnExt, AsyncWriteExt}; use rand::{self, Rng}; use std::{io, iter}; use std::{collections::HashSet, num::NonZeroU16}; @@ -421,13 +421,13 @@ impl RequestResponseCodec for PingCodec { where T: AsyncRead + Unpin + Send { - read_one(io, 1024) - .map(|res| match res { - Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), - Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), - Ok(vec) => Ok(Ping(vec)) - }) - .await + let vec = read_length_prefixed(io, 1024).await?; + + if vec.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()) + } + + Ok(Ping(vec)) } async fn read_response(&mut self, _: &PingProtocol, io: &mut T) @@ -435,13 +435,13 @@ impl RequestResponseCodec for PingCodec { where T: AsyncRead + Unpin + Send { - read_one(io, 1024) - .map(|res| match res { - Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), - Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), - Ok(vec) => Ok(Pong(vec)) - }) - .await + let vec = read_length_prefixed(io, 1024).await?; + + if vec.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()) + } + + Ok(Pong(vec)) } async fn write_request(&mut self, _: &PingProtocol, io: &mut T, Ping(data): Ping) @@ -449,7 +449,10 @@ impl RequestResponseCodec for PingCodec { where T: AsyncWrite + Unpin + Send { - write_one(io, data).await + write_length_prefixed(io, data).await?; + io.close().await?; + + Ok(()) } async fn write_response(&mut self, _: &PingProtocol, io: &mut T, Pong(data): Pong) @@ -457,6 +460,9 @@ impl RequestResponseCodec for PingCodec { where T: AsyncWrite + Unpin + Send { - write_one(io, data).await + write_length_prefixed(io, data).await?; + io.close().await?; + + Ok(()) } }