protocols/plaintext: Move to stable futures and use unsigned varints (#1306)

* protocols/plaintext: Move to stable futures and use unsigned varints

The plaintext 2.0 specification requires to use unsigned varints for
frame length delimiting instead of fixed 4 byte integer frame length
delimiting. This commit aligns the implementation with the
specification.

* protocols/secio: Fix doc comment BytesMut -> Vec<u8>

* protocols/plaintext: Add quick check smoke test

* protocols/plaintext: Rework imports and log levels

* protocols/plaintext: Use BytesMut instead of Vec<u8>

* protocols/plaintext: Use BoxFuture
This commit is contained in:
Max Inden
2019-11-19 11:15:35 +01:00
committed by Toralf Wittner
parent 3dd07fcc3a
commit be73b90345
6 changed files with 251 additions and 128 deletions

View File

@ -1,3 +1,7 @@
# Next Version
- Use varints instead of fixed sized (4 byte) integers to delimit plaintext 2.0 messages to align implementation with the specification.
# Version 0.13.0 (2019-11-05) # Version 0.13.0 (2019-11-05)
- Reworked the transport upgrade API. See https://github.com/libp2p/rust-libp2p/pull/1240 for more information. - Reworked the transport upgrade API. See https://github.com/libp2p/rust-libp2p/pull/1240 for more information.

View File

@ -10,11 +10,18 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
futures-preview = "0.3.0-alpha.18"
libp2p-core = { version = "0.13.0", path = "../../core" }
bytes = "0.4.12" bytes = "0.4.12"
futures = "0.3.1"
futures_codec = "0.3.1"
libp2p-core = { version = "0.13.0", path = "../../core" }
log = "0.4.8" log = "0.4.8"
void = "1.0.2"
tokio-io = "0.1.12"
protobuf = "2.8.1" protobuf = "2.8.1"
rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" } rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" }
unsigned-varint = { version = "0.2.3", features = ["futures-codec"] }
void = "1.0.2"
[dev-dependencies]
env_logger = "0.7.1"
quickcheck = "0.9.0"
rand = "0.7"
futures-timer = "2.0"

View File

@ -18,21 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::PlainText2Config;
use crate::error::PlainTextError;
use crate::pb::structs::Exchange;
use bytes::BytesMut; use bytes::BytesMut;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use futures::prelude::*;
use futures::Future; use futures_codec::Framed;
use futures::future;
use futures::sink::Sink;
use futures::stream::Stream;
use libp2p_core::{PublicKey, PeerId}; use libp2p_core::{PublicKey, PeerId};
use log::{debug, trace}; use log::{debug, trace};
use crate::pb::structs::Exchange;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use tokio_io::codec::length_delimited::Framed;
use protobuf::Message; use protobuf::Message;
use crate::error::PlainTextError; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use crate::PlainText2Config; use unsigned_varint::codec::UviBytes;
struct HandshakeContext<T> { struct HandshakeContext<T> {
config: PlainText2Config, config: PlainText2Config,
@ -68,7 +65,9 @@ impl HandshakeContext<Local> {
}) })
} }
fn with_remote(self, exchange_bytes: BytesMut) -> Result<HandshakeContext<Remote>, PlainTextError> { fn with_remote(self, exchange_bytes: BytesMut)
-> Result<HandshakeContext<Remote>, PlainTextError>
{
let mut prop = match protobuf::parse_from_bytes::<Exchange>(&exchange_bytes) { let mut prop = match protobuf::parse_from_bytes::<Exchange>(&exchange_bytes) {
Ok(prop) => prop, Ok(prop) => prop,
Err(e) => { Err(e) => {
@ -95,7 +94,7 @@ impl HandshakeContext<Local> {
// Check the validity of the remote's `Exchange`. // Check the validity of the remote's `Exchange`.
if peer_id != public_key.clone().into_peer_id() { if peer_id != public_key.clone().into_peer_id() {
debug!("The remote's `PeerId` of the exchange isn't consist with the remote public key"); debug!("the remote's `PeerId` isn't consistent with the remote's public key");
return Err(PlainTextError::InvalidPeerId) return Err(PlainTextError::InvalidPeerId)
} }
@ -109,36 +108,23 @@ impl HandshakeContext<Local> {
} }
} }
pub fn handshake<S>(socket: S, config: PlainText2Config) pub async fn handshake<S>(socket: S, config: PlainText2Config)
-> impl Future<Item = (Framed<S, BytesMut>, Remote), Error = PlainTextError> -> Result<(Framed<S, UviBytes<BytesMut>>, Remote), PlainTextError>
where where
S: AsyncRead + AsyncWrite + Send, S: AsyncRead + AsyncWrite + Send + Unpin,
{ {
let socket = length_delimited::Builder::new() // The handshake messages all start with a variable-length integer indicating the size.
.big_endian() let mut socket = Framed::new(socket, UviBytes::default());
.length_field_length(4)
.new_framed(socket);
future::ok::<_, PlainTextError>(())
.and_then(|_| {
trace!("starting handshake"); trace!("starting handshake");
Ok(HandshakeContext::new(config)?) let context = HandshakeContext::new(config)?;
})
// Send our local `Exchange`.
.and_then(|context| {
trace!("sending exchange to remote"); trace!("sending exchange to remote");
socket.send(BytesMut::from(context.state.exchange_bytes.clone())) socket.send(BytesMut::from(context.state.exchange_bytes.clone())).await?;
.from_err()
.map(|s| (s, context))
})
// Receive the remote's `Exchange`.
.and_then(move |(socket, context)| {
trace!("receiving the remote's exchange"); trace!("receiving the remote's exchange");
socket.into_future() let context = match socket.next().await {
.map_err(|(e, _)| e.into()) Some(p) => context.with_remote(p?)?,
.and_then(move |(prop_raw, socket)| {
let context = match prop_raw {
Some(p) => context.with_remote(p)?,
None => { None => {
debug!("unexpected eof while waiting for remote's exchange"); debug!("unexpected eof while waiting for remote's exchange");
let err = IoError::new(IoErrorKind::BrokenPipe, "unexpected eof"); let err = IoError::new(IoErrorKind::BrokenPipe, "unexpected eof");
@ -148,6 +134,4 @@ where
trace!("received exchange from remote; pubkey = {:?}", context.state.public_key); trace!("received exchange from remote; pubkey = {:?}", context.state.public_key);
Ok((socket, context.state)) Ok((socket, context.state))
})
})
} }

View File

@ -18,16 +18,29 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use futures::future::{self, Ready};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited::Framed;
use crate::error::PlainTextError; use crate::error::PlainTextError;
use void::Void;
use futures::future::FutureResult;
use crate::handshake::Remote; use crate::handshake::Remote;
use bytes::BytesMut;
use futures::future::{self, Ready};
use futures::prelude::*;
use futures::{future::BoxFuture, Sink, Stream};
use futures_codec::Framed;
use libp2p_core::{
identity,
InboundUpgrade,
OutboundUpgrade,
UpgradeInfo,
upgrade::Negotiated,
PeerId,
PublicKey,
};
use log::debug;
use rw_stream_sink::RwStreamSink;
use std::{io, iter, pin::Pin, task::{Context, Poll}};
use unsigned_varint::codec::UviBytes;
use void::Void;
mod error; mod error;
mod handshake; mod handshake;
mod pb; mod pb;
@ -108,144 +121,138 @@ impl UpgradeInfo for PlainText2Config {
impl<C> InboundUpgrade<C> for PlainText2Config impl<C> InboundUpgrade<C> for PlainText2Config
where where
C: AsyncRead + AsyncWrite + Send + 'static C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{ {
type Output = (PeerId, PlainTextOutput<Negotiated<C>>); type Output = (PeerId, PlainTextOutput<Negotiated<C>>);
type Error = PlainTextError; type Error = PlainTextError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>; type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future { fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::new(self.handshake(socket)) Box::pin(self.handshake(socket))
} }
} }
impl<C> OutboundUpgrade<C> for PlainText2Config impl<C> OutboundUpgrade<C> for PlainText2Config
where where
C: AsyncRead + AsyncWrite + Send + 'static C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{ {
type Output = (PeerId, PlainTextOutput<Negotiated<C>>); type Output = (PeerId, PlainTextOutput<Negotiated<C>>);
type Error = PlainTextError; type Error = PlainTextError;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>; type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future { fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::new(self.handshake(socket)) Box::pin(self.handshake(socket))
} }
} }
impl PlainText2Config { impl PlainText2Config {
fn handshake<T>(self, socket: T) -> impl Future<Item = (PeerId, PlainTextOutput<T>), Error = PlainTextError> async fn handshake<T>(self, socket: T) -> Result<(PeerId, PlainTextOutput<T>), PlainTextError>
where where
T: AsyncRead + AsyncWrite + Send + 'static T: AsyncRead + AsyncWrite + Send + Unpin + 'static
{ {
debug!("Starting plaintext upgrade"); debug!("Starting plaintext upgrade");
PlainTextMiddleware::handshake(socket, self) let (stream_sink, remote) = PlainTextMiddleware::handshake(socket, self).await?;
.map(|(stream_sink, remote)| {
let mapped = stream_sink.map_err(map_err as fn(_) -> _); let mapped = stream_sink.map_err(map_err as fn(_) -> _);
( Ok((
remote.peer_id, remote.peer_id,
PlainTextOutput { PlainTextOutput {
stream: RwStreamSink::new(mapped), stream: RwStreamSink::new(mapped),
remote_key: remote.public_key, remote_key: remote.public_key,
} }
) ))
})
} }
} }
#[inline]
fn map_err(err: io::Error) -> io::Error { fn map_err(err: io::Error) -> io::Error {
debug!("error during plaintext handshake {:?}", err); debug!("error during plaintext handshake {:?}", err);
io::Error::new(io::ErrorKind::InvalidData, err) io::Error::new(io::ErrorKind::InvalidData, err)
} }
pub struct PlainTextMiddleware<S> { pub struct PlainTextMiddleware<S> {
inner: Framed<S, BytesMut>, inner: Framed<S, UviBytes<BytesMut>>,
} }
impl<S> PlainTextMiddleware<S> impl<S> PlainTextMiddleware<S>
where where
S: AsyncRead + AsyncWrite + Send, S: AsyncRead + AsyncWrite + Send + Unpin,
{ {
fn handshake(socket: S, config: PlainText2Config) async fn handshake(socket: S, config: PlainText2Config)
-> impl Future<Item = (PlainTextMiddleware<S>, Remote), Error = PlainTextError> -> Result<(PlainTextMiddleware<S>, Remote), PlainTextError>
{ {
handshake::handshake(socket, config).map(|(inner, remote)| { let (inner, remote) = handshake::handshake(socket, config).await?;
(PlainTextMiddleware { inner }, remote) Ok((PlainTextMiddleware { inner }, remote))
})
} }
} }
impl<S> Sink for PlainTextMiddleware<S> impl<S> Sink<BytesMut> for PlainTextMiddleware<S>
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite + Unpin,
{ {
type SinkItem = BytesMut; type Error = io::Error;
type SinkError = io::Error;
#[inline] fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { Sink::poll_ready(Pin::new(&mut self.inner), cx)
self.inner.start_send(item)
} }
#[inline] fn start_send(mut self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> {
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { Sink::start_send(Pin::new(&mut self.inner), item)
self.inner.poll_complete()
} }
#[inline] fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
fn close(&mut self) -> Poll<(), Self::SinkError> { Sink::poll_flush(Pin::new(&mut self.inner), cx)
self.inner.close() }
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_close(Pin::new(&mut self.inner), cx)
} }
} }
impl<S> Stream for PlainTextMiddleware<S> impl<S> Stream for PlainTextMiddleware<S>
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite + Unpin,
{ {
type Item = BytesMut; type Item = Result<BytesMut, io::Error>;
type Error = io::Error;
#[inline] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { Stream::poll_next(Pin::new(&mut self.inner), cx)
self.inner.poll()
} }
} }
/// Output of the plaintext protocol. /// Output of the plaintext protocol.
pub struct PlainTextOutput<S> pub struct PlainTextOutput<S>
where where
S: AsyncRead + AsyncWrite, S: AsyncRead + AsyncWrite + Unpin,
{ {
/// The plaintext stream. /// The plaintext stream.
pub stream: RwStreamSink<StreamMapErr<PlainTextMiddleware<S>, fn(io::Error) -> io::Error>>, pub stream: RwStreamSink<futures::stream::MapErr<PlainTextMiddleware<S>, fn(io::Error) -> io::Error>>,
/// The public key of the remote. /// The public key of the remote.
pub remote_key: PublicKey, pub remote_key: PublicKey,
} }
impl<S: AsyncRead + AsyncWrite> std::io::Read for PlainTextOutput<S> { impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for PlainTextOutput<S> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8])
self.stream.read(buf) -> Poll<Result<usize, io::Error>>
{
AsyncRead::poll_read(Pin::new(&mut self.stream), cx, buf)
} }
} }
impl<S: AsyncRead + AsyncWrite> AsyncRead for PlainTextOutput<S> { impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for PlainTextOutput<S> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8])
self.stream.prepare_uninitialized_buffer(buf) -> Poll<Result<usize, io::Error>>
} {
AsyncWrite::poll_write(Pin::new(&mut self.stream), cx, buf)
} }
impl<S: AsyncRead + AsyncWrite> std::io::Write for PlainTextOutput<S> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context)
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { -> Poll<Result<(), io::Error>>
self.stream.write(buf) {
AsyncWrite::poll_flush(Pin::new(&mut self.stream), cx)
} }
fn flush(&mut self) -> std::io::Result<()> { fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context)
self.stream.flush() -> Poll<Result<(), io::Error>>
} {
} AsyncWrite::poll_close(Pin::new(&mut self.stream), cx)
impl<S: AsyncRead + AsyncWrite> AsyncWrite for PlainTextOutput<S> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.stream.shutdown()
} }
} }

View File

@ -0,0 +1,121 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::io::{AsyncWriteExt, AsyncReadExt};
use futures::stream::TryStreamExt;
use libp2p_core::{
identity,
multiaddr::Multiaddr,
transport::{Transport, ListenerEvent},
upgrade,
};
use libp2p_plaintext::PlainText2Config;
use log::debug;
use quickcheck::QuickCheck;
#[test]
fn variable_msg_length() {
let _ = env_logger::try_init();
fn prop(msg: Vec<u8>) {
let mut msg_to_send = msg.clone();
let msg_to_receive = msg;
let server_id = identity::Keypair::generate_ed25519();
let server_id_public = server_id.public();
let client_id = identity::Keypair::generate_ed25519();
let client_id_public = client_id.public();
futures::executor::block_on(async {
let server_transport = libp2p_core::transport::MemoryTransport{}.and_then(
move |output, endpoint| {
upgrade::apply(
output,
PlainText2Config{local_public_key: server_id_public},
endpoint,
libp2p_core::upgrade::Version::V1,
)
}
);
let client_transport = libp2p_core::transport::MemoryTransport{}.and_then(
move |output, endpoint| {
upgrade::apply(
output,
PlainText2Config{local_public_key: client_id_public},
endpoint,
libp2p_core::upgrade::Version::V1,
)
}
);
let server_address: Multiaddr = format!(
"/memory/{}",
std::cmp::Ord::max(1, rand::random::<u64>())
).parse().unwrap();
let mut server = server_transport.listen_on(server_address.clone()).unwrap();
// Ignore server listen address event.
let _ = server.try_next()
.await
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
let client_fut = async {
debug!("dialing {:?}", server_address);
let (received_server_id, mut client_channel) = client_transport.dial(server_address).unwrap().await.unwrap();
assert_eq!(received_server_id, server_id.public().into_peer_id());
debug!("Client: writing message.");
client_channel.write_all(&mut msg_to_send).await.expect("no error");
debug!("Client: flushing channel.");
client_channel.flush().await.expect("no error");
};
let server_fut = async {
let mut server_channel = server.try_next()
.await
.expect("some event")
.map(ListenerEvent::into_upgrade)
.expect("no error")
.map(|client| client.0)
.expect("listener upgrade xyz")
.await
.map(|(_, session)| session)
.expect("no error");
let mut server_buffer = vec![0; msg_to_receive.len()];
debug!("Server: reading message.");
server_channel.read_exact(&mut server_buffer).await.expect("reading client message");
assert_eq!(server_buffer, msg_to_receive);
};
futures::future::join(server_fut, client_fut).await;
})
}
QuickCheck::new().max_tests(30).quickcheck(prop as fn(Vec<u8>))
}

View File

@ -103,7 +103,7 @@ impl Hmac {
} }
/// Takes control of `socket`. Returns an object that implements `future::Sink` and /// Takes control of `socket`. Returns an object that implements `future::Sink` and
/// `future::Stream`. The `Stream` and `Sink` produce and accept `BytesMut` objects. /// `future::Stream`. The `Stream` and `Sink` produce and accept `Vec<u8>` objects.
/// ///
/// The conversion between the stream/sink items and the socket is done with the given cipher and /// The conversion between the stream/sink items and the socket is done with the given cipher and
/// hash algorithm (which are generally decided during the handshake). /// hash algorithm (which are generally decided during the handshake).