Configurable multistream-select protocol. Add V1Lazy variant. (#1245)

Make the multistream-select protocol (version) configurable
on transport upgrades as well as for individual substreams.

Add a "lazy" variant of multistream-select 1.0 that delays
sending of negotiation protocol frames as much as possible
but is only safe to use under additional assumptions that
go beyond what is required by the multistream-select v1
specification.
This commit is contained in:
Roman Borschel
2019-09-23 12:04:39 +02:00
committed by GitHub
parent 8c119269d6
commit 73e7878216
29 changed files with 361 additions and 269 deletions

View File

@ -209,12 +209,12 @@ pub trait Transport {
}
/// Begins a series of protocol upgrades via an [`upgrade::Builder`].
fn upgrade(self) -> upgrade::Builder<Self>
fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
where
Self: Sized,
Self::Error: 'static
{
upgrade::Builder::new(self)
upgrade::Builder::new(self, version)
}
}

View File

@ -20,6 +20,8 @@
//! Configuration of transport protocol upgrades.
pub use crate::upgrade::Version;
use crate::{
ConnectedPoint,
ConnectionInfo,
@ -68,7 +70,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
///
/// [`Network`]: crate::nodes::Network
pub struct Builder<T> {
inner: T
inner: T,
version: upgrade::Version,
}
impl<T> Builder<T>
@ -77,8 +80,8 @@ where
T::Error: 'static,
{
/// Creates a `Builder` over the given (base) `Transport`.
pub fn new(transport: T) -> Builder<T> {
Builder { inner: transport }
pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
Builder { inner, version }
}
/// Upgrades the transport to perform authentication of the remote.
@ -105,11 +108,12 @@ where
U: OutboundUpgrade<C, Output = (I, D), Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
Builder::new(self.inner.and_then(move |conn, endpoint| {
Authenticate {
inner: upgrade::apply(conn, upgrade, endpoint)
inner: upgrade::apply(conn, upgrade, endpoint, version)
}
}))
}), version)
}
/// Applies an arbitrary upgrade on an authenticated, non-multiplexed
@ -133,7 +137,7 @@ where
U: OutboundUpgrade<C, Output = D, Error = E> + Clone,
E: Error + 'static,
{
Builder::new(Upgrade::new(self.inner, upgrade))
Builder::new(Upgrade::new(self.inner, upgrade), self.version)
}
/// Upgrades the transport with a (sub)stream multiplexer.
@ -158,8 +162,9 @@ where
U: OutboundUpgrade<C, Output = M, Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint);
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}
@ -332,7 +337,7 @@ where
future::Either::A(ref mut up) => {
let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some).");
future::Either::B((Some(i), apply_outbound(c, u)))
future::Either::B((Some(i), apply_outbound(c, u, upgrade::Version::V1)))
}
future::Either::B((ref mut i, ref mut up)) => {
let d = try_ready!(up.poll().map_err(TransportUpgradeError::Upgrade));

View File

@ -20,15 +20,17 @@
use crate::ConnectedPoint;
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use crate::upgrade::{ProtocolName, NegotiatedComplete};
use crate::upgrade::ProtocolName;
use futures::{future::Either, prelude::*};
use log::debug;
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
use std::{iter, mem};
use tokio_io::{AsyncRead, AsyncWrite};
pub use multistream_select::Version;
/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint)
pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint, v: Version)
-> Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>
where
C: AsyncRead + AsyncWrite,
@ -37,7 +39,7 @@ where
if cp.is_listener() {
Either::A(apply_inbound(conn, up))
} else {
Either::B(apply_outbound(conn, up))
Either::B(apply_outbound(conn, up, v))
}
}
@ -55,13 +57,13 @@ where
}
/// Tries to perform an upgrade on an outbound connection or substream.
pub fn apply_outbound<C, U>(conn: C, up: U) -> OutboundUpgradeApply<C, U>
pub fn apply_outbound<C, U>(conn: C, up: U, v: Version) -> OutboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite,
U: OutboundUpgrade<C>
{
let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
let future = multistream_select::dialer_select_proto(conn, iter);
let future = multistream_select::dialer_select_proto(conn, iter, v);
OutboundUpgradeApply {
inner: OutboundUpgradeApplyState::Init { future, upgrade: up }
}
@ -155,11 +157,6 @@ where
future: DialerSelectFuture<C, NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>>,
upgrade: U
},
AwaitNegotiated {
io: NegotiatedComplete<C>,
upgrade: U,
protocol: U::Info
},
Upgrade {
future: U::Future
},
@ -185,24 +182,8 @@ where
return Ok(Async::NotReady)
}
};
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
io: connection.complete(),
protocol: info.0,
upgrade
};
}
OutboundUpgradeApplyState::AwaitNegotiated { mut io, protocol, upgrade } => {
let io = match io.poll()? {
Async::NotReady => {
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
io, protocol, upgrade
};
return Ok(Async::NotReady)
}
Async::Ready(io) => io
};
self.inner = OutboundUpgradeApplyState::Upgrade {
future: upgrade.upgrade_outbound(io, protocol)
future: upgrade.upgrade_outbound(connection, info.0)
};
}
OutboundUpgradeApplyState::Upgrade { mut future } => {

View File

@ -68,7 +68,7 @@ mod transfer;
use futures::future::Future;
pub use multistream_select::{Negotiated, NegotiatedComplete, NegotiationError, ProtocolError};
pub use multistream_select::{Version, Negotiated, NegotiatedComplete, NegotiationError, ProtocolError};
pub use self::{
apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply},
denied::DeniedUpgrade,

View File

@ -95,7 +95,7 @@ fn deny_incoming_connec() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into())
@ -105,7 +105,7 @@ fn deny_incoming_connec() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into())
@ -170,7 +170,7 @@ fn dial_self() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
@ -249,7 +249,7 @@ fn dial_self_by_id() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into())
@ -267,7 +267,7 @@ fn multiple_addresses_err() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into())

View File

@ -110,7 +110,7 @@ fn raw_swarm_simultaneous_connect() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
@ -125,7 +125,7 @@ fn raw_swarm_simultaneous_connect() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {

View File

@ -24,7 +24,7 @@ use futures::future::Future;
use futures::stream::Stream;
use libp2p_core::identity;
use libp2p_core::transport::{Transport, MemoryTransport, ListenerEvent};
use libp2p_core::upgrade::{UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
use libp2p_mplex::MplexConfig;
use libp2p_secio::SecioConfig;
use multiaddr::Multiaddr;
@ -78,7 +78,7 @@ fn upgrade_pipeline() {
let listener_keys = identity::Keypair::generate_ed25519();
let listener_id = listener_keys.public().into_peer_id();
let listener_transport = MemoryTransport::default()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(listener_keys))
.apply(HelloUpgrade {})
.apply(HelloUpgrade {})
@ -93,7 +93,7 @@ fn upgrade_pipeline() {
let dialer_keys = identity::Keypair::generate_ed25519();
let dialer_id = dialer_keys.public().into_peer_id();
let dialer_transport = MemoryTransport::default()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(dialer_keys))
.apply(HelloUpgrade {})
.apply(HelloUpgrade {})

View File

@ -42,18 +42,16 @@ use crate::{Negotiated, NegotiationError};
/// determined through the `size_hint` of the given iterator and thus
/// an inaccurate size estimate may result in a suboptimal choice.
///
/// > **Note**: When multiple `DialerSelectFuture`s are composed, i.e. a
/// > dialer performs multiple, nested protocol negotiations with just a
/// > single supported protocol (0-RTT negotiations), a listener that
/// > does not support one of the intermediate protocols may still process
/// > the request data associated with a supported follow-up protocol.
/// > See \[[1]\]. To avoid this behaviour, a dialer should ensure completion
/// > of the previous negotiation before starting the next negotiation,
/// > which can be accomplished by waiting for the future returned by
/// > [`Negotiated::complete`] to resolve.
///
/// [1]: https://github.com/multiformats/go-multistream/issues/20
pub fn dialer_select_proto<R, I>(inner: R, protocols: I) -> DialerSelectFuture<R, I::IntoIter>
/// Within the scope of this library, a dialer always commits to a specific
/// multistream-select protocol [`Version`], whereas a listener always supports
/// all versions supported by this library. Frictionless multistream-select
/// protocol upgrades may thus proceed by deployments with updated listeners,
/// eventually followed by deployments of dialers choosing the newer protocol.
pub fn dialer_select_proto<R, I>(
inner: R,
protocols: I,
version: Version
) -> DialerSelectFuture<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
I: IntoIterator,
@ -62,9 +60,9 @@ where
let iter = protocols.into_iter();
// We choose between the "serial" and "parallel" strategies based on the number of protocols.
if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) {
Either::A(dialer_select_proto_serial(inner, iter))
Either::A(dialer_select_proto_serial(inner, iter, version))
} else {
Either::B(dialer_select_proto_parallel(inner, iter))
Either::B(dialer_select_proto_parallel(inner, iter, version))
}
}
@ -80,7 +78,11 @@ pub type DialerSelectFuture<R, I> = Either<DialerSelectSeq<R, I>, DialerSelectPa
/// trying the given list of supported protocols one-by-one.
///
/// This strategy is preferable if the dialer only supports a few protocols.
pub fn dialer_select_proto_serial<R, I>(inner: R, protocols: I) -> DialerSelectSeq<R, I::IntoIter>
pub fn dialer_select_proto_serial<R, I>(
inner: R,
protocols: I,
version: Version
) -> DialerSelectSeq<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
I: IntoIterator,
@ -88,9 +90,10 @@ where
{
let protocols = protocols.into_iter().peekable();
DialerSelectSeq {
version,
protocols,
state: SeqState::SendHeader {
io: MessageIO::new(inner)
io: MessageIO::new(inner),
}
}
}
@ -104,7 +107,11 @@ where
///
/// This strategy may be beneficial if the dialer supports many protocols
/// and it is unclear whether the remote supports one of the first few.
pub fn dialer_select_proto_parallel<R, I>(inner: R, protocols: I) -> DialerSelectPar<R, I::IntoIter>
pub fn dialer_select_proto_parallel<R, I>(
inner: R,
protocols: I,
version: Version
) -> DialerSelectPar<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
I: IntoIterator,
@ -112,6 +119,7 @@ where
{
let protocols = protocols.into_iter();
DialerSelectPar {
version,
protocols,
state: ParState::SendHeader {
io: MessageIO::new(inner)
@ -129,7 +137,8 @@ where
{
// TODO: It would be nice if eventually N = I::Item = Protocol.
protocols: iter::Peekable<I>,
state: SeqState<R, I::Item>
state: SeqState<R, I::Item>,
version: Version,
}
enum SeqState<R, N>
@ -157,7 +166,7 @@ where
loop {
match mem::replace(&mut self.state, SeqState::Done) {
SeqState::SendHeader { mut io } => {
if io.start_send(Message::Header(Version::V1))?.is_not_ready() {
if io.start_send(Message::Header(self.version))?.is_not_ready() {
self.state = SeqState::SendHeader { io };
return Ok(Async::NotReady)
}
@ -174,11 +183,16 @@ where
if self.protocols.peek().is_some() {
self.state = SeqState::FlushProtocol { io, protocol }
} else {
match self.version {
Version::V1 => self.state = SeqState::FlushProtocol { io, protocol },
Version::V1Lazy => {
debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p);
let io = Negotiated::expecting(io.into_reader(), p, self.version);
return Ok(Async::Ready((protocol, io)))
}
}
}
}
SeqState::FlushProtocol { mut io, protocol } => {
if io.poll_complete()?.is_not_ready() {
self.state = SeqState::FlushProtocol { io, protocol };
@ -199,7 +213,7 @@ where
};
match msg {
Message::Header(Version::V1) => {
Message::Header(v) if v == self.version => {
self.state = SeqState::AwaitProtocol { io, protocol };
}
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
@ -234,7 +248,8 @@ where
I::Item: AsRef<[u8]>
{
protocols: I,
state: ParState<R, I::Item>
state: ParState<R, I::Item>,
version: Version,
}
enum ParState<R, N>
@ -263,7 +278,7 @@ where
loop {
match mem::replace(&mut self.state, ParState::Done) {
ParState::SendHeader { mut io } => {
if io.start_send(Message::Header(Version::V1))?.is_not_ready() {
if io.start_send(Message::Header(self.version))?.is_not_ready() {
self.state = ParState::SendHeader { io };
return Ok(Async::NotReady)
}
@ -297,7 +312,7 @@ where
};
match &msg {
Message::Header(Version::V1) => {
Message::Header(v) if v == &self.version => {
self.state = ParState::RecvProtocols { io }
}
Message::Protocols(supported) => {
@ -319,7 +334,7 @@ where
return Ok(Async::NotReady)
}
debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p);
let io = Negotiated::expecting(io.into_reader(), p, self.version);
return Ok(Async::Ready((protocol, io)))
}
ParState::Done => panic!("ParState::poll called after completion")

View File

@ -62,7 +62,6 @@
//! yet have sent the last negotiation message despite having settled on a protocol
//! proposed by the dialer that it supports.
//!
//!
//! This behaviour allows both the dialer and the listener to send data
//! relating to the negotiated protocol together with the last negotiation
//! message(s), which, in the case of the dialer only supporting a single
@ -79,7 +78,7 @@
//! ```no_run
//! # fn main() {
//! use bytes::Bytes;
//! use multistream_select::dialer_select_proto;
//! use multistream_select::{dialer_select_proto, Version};
//! use futures::{Future, Sink, Stream};
//! use tokio_tcp::TcpStream;
//! use tokio::runtime::current_thread::Runtime;
@ -91,7 +90,7 @@
//! .from_err()
//! .and_then(move |io| {
//! let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"];
//! dialer_select_proto(io, protos) // .map(|r| r.0)
//! dialer_select_proto(io, protos, Version::V1)
//! })
//! .map(|(protocol, _io)| protocol);
//!
@ -110,7 +109,7 @@ mod protocol;
mod tests;
pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError};
pub use self::protocol::ProtocolError;
pub use self::protocol::{ProtocolError, Version};
pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};

View File

@ -36,7 +36,10 @@ use crate::{Negotiated, NegotiationError};
/// computation that performs the protocol negotiation with the remote. The
/// returned `Future` resolves with the name of the negotiated protocol and
/// a [`Negotiated`] I/O stream.
pub fn listener_select_proto<R, I>(inner: R, protocols: I) -> ListenerSelectFuture<R, I::Item>
pub fn listener_select_proto<R, I>(
inner: R,
protocols: I,
) -> ListenerSelectFuture<R, I::Item>
where
R: AsyncRead + AsyncWrite,
I: IntoIterator,
@ -78,7 +81,7 @@ where
N: AsRef<[u8]>
{
RecvHeader { io: MessageIO<R> },
SendHeader { io: MessageIO<R> },
SendHeader { io: MessageIO<R>, version: Version },
RecvMessage { io: MessageIO<R> },
SendMessage {
io: MessageIO<R>,
@ -102,22 +105,8 @@ where
match mem::replace(&mut self.state, State::Done) {
State::RecvHeader { mut io } => {
match io.poll()? {
Async::Ready(Some(Message::Header(Version::V1))) => {
self.state = State::SendHeader { io }
}
Async::Ready(Some(Message::Header(Version::V2))) => {
// The V2 protocol is not yet supported and not even
// yet fully specified or implemented anywhere. For
// now we just return 'na' to force any dialer to
// fall back to V1, according to the current plans
// for the "transition period".
//
// See: https://github.com/libp2p/specs/pull/95.
self.state = State::SendMessage {
io,
message: Message::NotAvailable,
protocol: None,
}
Async::Ready(Some(Message::Header(version))) => {
self.state = State::SendHeader { io, version }
}
Async::Ready(Some(_)) => {
return Err(ProtocolError::InvalidMessage.into())
@ -132,11 +121,14 @@ where
}
}
}
State::SendHeader { mut io } => {
if io.start_send(Message::Header(Version::V1))?.is_not_ready() {
State::SendHeader { mut io, version } => {
if io.start_send(Message::Header(version))?.is_not_ready() {
return Ok(Async::NotReady)
}
self.state = State::RecvMessage { io };
self.state = match version {
Version::V1 => State::Flush { io },
Version::V1Lazy => State::RecvMessage { io },
}
}
State::RecvMessage { mut io } => {
let msg = match io.poll() {

View File

@ -70,8 +70,8 @@ impl<TInner> Negotiated<TInner> {
/// Creates a `Negotiated` in state [`State::Expecting`] that is still
/// expecting confirmation of the given `protocol`.
pub(crate) fn expecting(io: MessageReader<TInner>, protocol: Protocol) -> Self {
Negotiated { state: State::Expecting { io, protocol } }
pub(crate) fn expecting(io: MessageReader<TInner>, protocol: Protocol, version: Version) -> Self {
Negotiated { state: State::Expecting { io, protocol, version } }
}
/// Polls the `Negotiated` for completion.
@ -100,28 +100,30 @@ impl<TInner> Negotiated<TInner> {
// Read outstanding protocol negotiation messages.
loop {
match mem::replace(&mut self.state, State::Invalid) {
State::Expecting { mut io, protocol } => {
State::Expecting { mut io, protocol, version } => {
let msg = match io.poll() {
Ok(Async::Ready(Some(msg))) => msg,
Ok(Async::NotReady) => {
self.state = State::Expecting { io, protocol };
self.state = State::Expecting { io, protocol, version };
return Ok(Async::NotReady)
}
Ok(Async::Ready(None)) => {
self.state = State::Expecting { io, protocol };
self.state = State::Expecting { io, protocol, version };
return Err(ProtocolError::IoError(
io::ErrorKind::UnexpectedEof.into()).into())
}
Err(err) => {
self.state = State::Expecting { io, protocol };
self.state = State::Expecting { io, protocol, version };
return Err(err.into())
}
};
if let Message::Header(Version::V1) = &msg {
self.state = State::Expecting { io, protocol };
if let Message::Header(v) = &msg {
if v == &version {
self.state = State::Expecting { io, protocol, version };
continue
}
}
if let Message::Protocol(p) = &msg {
if p.as_ref() == protocol.as_ref() {
@ -152,7 +154,14 @@ impl<TInner> Negotiated<TInner> {
enum State<R> {
/// In this state, a `Negotiated` is still expecting to
/// receive confirmation of the protocol it as settled on.
Expecting { io: MessageReader<R>, protocol: Protocol },
Expecting {
/// The underlying I/O stream.
io: MessageReader<R>,
/// The expected protocol (i.e. name and version).
protocol: Protocol,
/// The expected multistream-select protocol version.
version: Version
},
/// In this state, a protocol has been agreed upon and may
/// only be pending the sending of the final acknowledgement,

View File

@ -50,21 +50,72 @@ const MAX_PROTOCOL_LEN: usize = 140;
/// The encoded form of a multistream-select 1.0.0 header message.
const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n";
/// The encoded form of a multistream-select 2.0.0 header message.
const MSG_MULTISTREAM_2_0: &[u8] = b"/multistream/2.0.0\n";
/// The encoded form of a multistream-select 1.0.0 header message.
const MSG_MULTISTREAM_1_0_LAZY: &[u8] = b"/multistream-lazy/1\n";
/// The encoded form of a multistream-select 'na' message.
const MSG_PROTOCOL_NA: &[u8] = b"na\n";
/// The encoded form of a multistream-select 'ls' message.
const MSG_LS: &[u8] = b"ls\n";
/// The known multistream-select protocol versions.
/// Supported multistream-select protocol versions.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Version {
/// The first and currently still the only deployed version
/// of multistream-select.
/// Version 1 of the multistream-select protocol. See [1] and [2].
///
/// [1] https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation
/// [2] https://github.com/multiformats/multistream-select
V1,
/// Draft: https://github.com/libp2p/specs/pull/95
V2,
/// A lazy variant of version 1 that is identical on the wire but delays
/// sending of protocol negotiation data as much as possible.
///
/// Delaying the sending of protocol negotiation data can result in
/// significantly fewer network roundtrips used for the negotiation,
/// up to 0-RTT negotiation.
///
/// 0-RTT negotiation is achieved if the dialer supports only a single
/// application protocol. In that case the dialer immedidately settles
/// on that protocol, buffering the negotiation messages to be sent
/// with the first round of application protocol data (or an attempt
/// is made to read from the `Negotiated` I/O stream).
///
/// A listener receiving a `V1Lazy` header will similarly delay sending
/// of the protocol confirmation. Though typically the listener will need
/// to read the request data before sending its response, thus triggering
/// sending of the protocol confirmation, which, in absence of additional
/// buffering on lower layers will result in at least two response frames
/// to be sent.
///
/// `V1Lazy` is specific to `rust-libp2p`: While the wire protocol
/// is identical to `V1`, delayed sending of protocol negotiation frames
/// is only safe under the following assumptions:
///
/// 1. The dialer is assumed to always send the first multistream-select
/// protocol message immediately after the multistream header, without
/// first waiting for confirmation of that header. Since the listener
/// delays sending the protocol confirmation, a deadlock situation may
/// otherwise occurs that is only resolved by a timeout. This assumption
/// is trivially satisfied if both peers support and use `V1Lazy`.
///
/// 2. When nesting multiple protocol negotiations, the listener is either
/// known to support all of the dialer's optimistically chosen protocols
/// or there is no intermediate protocol without a payload and none of
/// the protocol payloads has the potential for being mistaken for a
/// multistream-select protocol message. This avoids rare edge-cases whereby
/// the listener may not recognize upgrade boundaries and erroneously
/// process a request despite not supporting one of the intermediate
/// protocols that the dialer committed to. See [1] and [2].
///
/// [1]: https://github.com/multiformats/go-multistream/issues/20
/// [2]: https://github.com/libp2p/rust-libp2p/pull/1212
V1Lazy,
// Draft: https://github.com/libp2p/specs/pull/95
// V2,
}
impl Default for Version {
fn default() -> Self {
Version::V1
}
}
/// A protocol (name) exchanged during protocol negotiation.
@ -131,9 +182,9 @@ impl Message {
dest.put(MSG_MULTISTREAM_1_0);
Ok(())
}
Message::Header(Version::V2) => {
dest.reserve(MSG_MULTISTREAM_2_0.len());
dest.put(MSG_MULTISTREAM_2_0);
Message::Header(Version::V1Lazy) => {
dest.reserve(MSG_MULTISTREAM_1_0_LAZY.len());
dest.put(MSG_MULTISTREAM_1_0_LAZY);
Ok(())
}
Message::Protocol(p) => {
@ -170,12 +221,12 @@ impl Message {
/// Decodes a `Message` from its byte representation.
pub fn decode(mut msg: Bytes) -> Result<Message, ProtocolError> {
if msg == MSG_MULTISTREAM_1_0 {
return Ok(Message::Header(Version::V1))
if msg == MSG_MULTISTREAM_1_0_LAZY {
return Ok(Message::Header(Version::V1Lazy))
}
if msg == MSG_MULTISTREAM_2_0 {
return Ok(Message::Header(Version::V2))
if msg == MSG_MULTISTREAM_1_0 {
return Ok(Message::Header(Version::V1))
}
if msg.get(0) == Some(&b'/') && msg.last() == Some(&b'\n') && msg.len() <= MAX_PROTOCOL_LEN {

View File

@ -22,7 +22,7 @@
#![cfg(test)]
use crate::NegotiationError;
use crate::{Version, NegotiationError};
use crate::dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial};
use crate::{dialer_select_proto, listener_select_proto};
use futures::prelude::*;
@ -32,6 +32,7 @@ use tokio_io::io as nio;
#[test]
fn select_proto_basic() {
fn run(version: Version) {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let listener_addr = listener.local_addr().unwrap();
@ -52,7 +53,7 @@ fn select_proto_basic() {
.from_err()
.and_then(move |connec| {
let protos = vec![b"/proto3", b"/proto2"];
dialer_select_proto(connec, protos)
dialer_select_proto(connec, protos, version)
})
.and_then(|(proto, io)| {
nio::write_all(io, b"ping").from_err().map(move |(io, _)| (proto, io))
@ -72,8 +73,13 @@ fn select_proto_basic() {
assert_eq!(listener_chosen, b"/proto2");
}
run(Version::V1);
run(Version::V1Lazy);
}
#[test]
fn no_protocol_found() {
fn run(version: Version) {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let listener_addr = listener.local_addr().unwrap();
@ -92,7 +98,7 @@ fn no_protocol_found() {
.from_err()
.and_then(move |connec| {
let protos = vec![b"/proto3", b"/proto4"];
dialer_select_proto(connec, protos)
dialer_select_proto(connec, protos, version)
})
.and_then(|(proto, io)| io.complete().map(move |_| proto));
@ -103,8 +109,13 @@ fn no_protocol_found() {
}
}
run(Version::V1);
run(Version::V1Lazy);
}
#[test]
fn select_proto_parallel() {
fn run(version: Version) {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let listener_addr = listener.local_addr().unwrap();
@ -123,7 +134,7 @@ fn select_proto_parallel() {
.from_err()
.and_then(move |connec| {
let protos = vec![b"/proto3", b"/proto2"];
dialer_select_proto_parallel(connec, protos.into_iter())
dialer_select_proto_parallel(connec, protos.into_iter(), version)
})
.and_then(|(proto, io)| io.complete().map(move |_| proto));
@ -135,8 +146,13 @@ fn select_proto_parallel() {
assert_eq!(listener_chosen, b"/proto2");
}
run(Version::V1);
run(Version::V1Lazy);
}
#[test]
fn select_proto_serial() {
fn run(version: Version) {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let listener_addr = listener.local_addr().unwrap();
@ -155,7 +171,7 @@ fn select_proto_serial() {
.from_err()
.and_then(move |connec| {
let protos = vec![b"/proto3", b"/proto2"];
dialer_select_proto_serial(connec, protos.into_iter())
dialer_select_proto_serial(connec, protos.into_iter(), version)
})
.and_then(|(proto, io)| io.complete().map(move |_| proto));
@ -166,3 +182,7 @@ fn select_proto_serial() {
assert_eq!(dialer_chosen, b"/proto2");
assert_eq!(listener_chosen, b"/proto2");
}
run(Version::V1);
run(Version::V1Lazy);
}

View File

@ -34,7 +34,8 @@ fn async_write() {
let bg_thread = thread::spawn(move || {
let mplex = libp2p_mplex::MplexConfig::new();
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
let transport = TcpConfig::new().and_then(move |c, e|
upgrade::apply(c, mplex, e, upgrade::Version::V1));
let mut listener = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
@ -69,7 +70,8 @@ fn async_write() {
});
let mplex = libp2p_mplex::MplexConfig::new();
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
let transport = TcpConfig::new().and_then(move |c, e|
upgrade::apply(c, mplex, e, upgrade::Version::V1));
let future = transport
.dial(rx.recv().unwrap())

View File

@ -37,7 +37,8 @@ fn client_to_server_outbound() {
let bg_thread = thread::spawn(move || {
let mplex = libp2p_mplex::MplexConfig::new();
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
let transport = TcpConfig::new().and_then(move |c, e|
upgrade::apply(c, mplex, e, upgrade::Version::V1));
let mut listener = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
@ -77,7 +78,8 @@ fn client_to_server_outbound() {
});
let mplex = libp2p_mplex::MplexConfig::new();
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
let transport = TcpConfig::new().and_then(move |c, e|
upgrade::apply(c, mplex, e, upgrade::Version::V1));
let future = transport
.dial(rx.recv().unwrap())
@ -101,7 +103,8 @@ fn client_to_server_inbound() {
let bg_thread = thread::spawn(move || {
let mplex = libp2p_mplex::MplexConfig::new();
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
let transport = TcpConfig::new().and_then(move |c, e|
upgrade::apply(c, mplex, e, upgrade::Version::V1));
let mut listener = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
@ -142,7 +145,8 @@ fn client_to_server_inbound() {
});
let mplex = libp2p_mplex::MplexConfig::new();
let transport = TcpConfig::new().and_then(move |c, e| upgrade::apply(c, mplex, e));
let transport = TcpConfig::new().and_then(move |c, e|
upgrade::apply(c, mplex, e, upgrade::Version::V1));
let future = transport
.dial(rx.recv().unwrap())

View File

@ -32,7 +32,8 @@ fn deflate() {
let _ = env_logger::try_init();
fn prop(message: Vec<u8>) -> bool {
let client = TcpConfig::new().and_then(|c, e| upgrade::apply(c, DeflateConfig {}, e));
let client = TcpConfig::new().and_then(|c, e|
upgrade::apply(c, DeflateConfig {}, e, upgrade::Version::V1));
let server = client.clone();
run(server, client, message);
true

View File

@ -262,6 +262,7 @@ mod tests {
muxing::StreamMuxer,
Multiaddr,
Transport,
upgrade
};
use libp2p_tcp::TcpConfig;
use libp2p_secio::SecioConfig;
@ -282,7 +283,7 @@ mod tests {
let pubkey = id_keys.public();
let transport = TcpConfig::new()
.nodelay(true)
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(id_keys))
.multiplex(MplexConfig::new());
(pubkey, transport)

View File

@ -288,7 +288,7 @@ mod tests {
identity,
Transport,
transport::ListenerEvent,
upgrade::{apply_outbound, apply_inbound}
upgrade::{self, apply_outbound, apply_inbound}
};
use std::{io, sync::mpsc, thread};
@ -351,7 +351,7 @@ mod tests {
let future = transport.dial(rx.recv().unwrap())
.unwrap()
.and_then(|socket| {
apply_outbound(socket, IdentifyProtocolConfig)
apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
})
.and_then(|RemoteInfo { info, observed_addr, .. }| {

View File

@ -34,6 +34,7 @@ use libp2p_core::{
nodes::Substream,
multiaddr::{Protocol, multiaddr},
muxing::StreamMuxerBox,
upgrade
};
use libp2p_secio::SecioConfig;
use libp2p_swarm::Swarm;
@ -63,7 +64,7 @@ fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<TestSwa
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = MemoryTransport::default()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(local_key))
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))

View File

@ -36,7 +36,7 @@
//! Example:
//!
//! ```
//! use libp2p_core::{identity, Transport};
//! use libp2p_core::{identity, Transport, upgrade};
//! use libp2p_tcp::TcpConfig;
//! use libp2p_noise::{Keypair, X25519, NoiseConfig};
//!
@ -44,7 +44,7 @@
//! let id_keys = identity::Keypair::generate_ed25519();
//! let dh_keys = Keypair::<X25519>::new().into_authentic(&id_keys).unwrap();
//! let noise = NoiseConfig::xx(dh_keys).into_authenticated();
//! let builder = TcpConfig::new().upgrade().authenticate(noise);
//! let builder = TcpConfig::new().upgrade(upgrade::Version::V1).authenticate(noise);
//! // let transport = builder.multiplex(...);
//! # }
//! ```

View File

@ -35,7 +35,7 @@ fn core_upgrade_compat() {
let id_keys = identity::Keypair::generate_ed25519();
let dh_keys = Keypair::<X25519>::new().into_authentic(&id_keys).unwrap();
let noise = NoiseConfig::xx(dh_keys).into_authenticated();
let _ = TcpConfig::new().upgrade().authenticate(noise);
let _ = TcpConfig::new().upgrade(upgrade::Version::V1).authenticate(noise);
}
#[test]
@ -51,14 +51,14 @@ fn xx() {
let server_dh = Keypair::<X25519>::new().into_authentic(&server_id).unwrap();
let server_transport = TcpConfig::new()
.and_then(move |output, endpoint| {
upgrade::apply(output, NoiseConfig::xx(server_dh), endpoint)
upgrade::apply(output, NoiseConfig::xx(server_dh), endpoint, upgrade::Version::V1)
})
.and_then(move |out, _| expect_identity(out, &client_id_public));
let client_dh = Keypair::<X25519>::new().into_authentic(&client_id).unwrap();
let client_transport = TcpConfig::new()
.and_then(move |output, endpoint| {
upgrade::apply(output, NoiseConfig::xx(client_dh), endpoint)
upgrade::apply(output, NoiseConfig::xx(client_dh), endpoint, upgrade::Version::V1)
})
.and_then(move |out, _| expect_identity(out, &server_id_public));
@ -81,14 +81,14 @@ fn ix() {
let server_dh = Keypair::<X25519>::new().into_authentic(&server_id).unwrap();
let server_transport = TcpConfig::new()
.and_then(move |output, endpoint| {
upgrade::apply(output, NoiseConfig::ix(server_dh), endpoint)
upgrade::apply(output, NoiseConfig::ix(server_dh), endpoint, upgrade::Version::V1)
})
.and_then(move |out, _| expect_identity(out, &client_id_public));
let client_dh = Keypair::<X25519>::new().into_authentic(&client_id).unwrap();
let client_transport = TcpConfig::new()
.and_then(move |output, endpoint| {
upgrade::apply(output, NoiseConfig::ix(client_dh), endpoint)
upgrade::apply(output, NoiseConfig::ix(client_dh), endpoint, upgrade::Version::V1)
})
.and_then(move |out, _| expect_identity(out, &server_id_public));
@ -115,7 +115,8 @@ fn ik_xx() {
if endpoint.is_listener() {
Either::A(apply_inbound(output, NoiseConfig::ik_listener(server_dh)))
} else {
Either::B(apply_outbound(output, NoiseConfig::xx(server_dh)))
Either::B(apply_outbound(output, NoiseConfig::xx(server_dh),
upgrade::Version::V1))
}
})
.and_then(move |out, _| expect_identity(out, &client_id_public));
@ -126,7 +127,8 @@ fn ik_xx() {
.and_then(move |output, endpoint| {
if endpoint.is_dialer() {
Either::A(apply_outbound(output,
NoiseConfig::ik_dialer(client_dh, server_id_public, server_dh_public)))
NoiseConfig::ik_dialer(client_dh, server_id_public, server_dh_public),
upgrade::Version::V1))
} else {
Either::B(apply_inbound(output, NoiseConfig::xx(client_dh)))
}

View File

@ -217,7 +217,7 @@ mod tests {
let client = MemoryTransport.dial(listener_addr).unwrap()
.and_then(|c| {
upgrade::apply_outbound(c, Ping::default())
upgrade::apply_outbound(c, Ping::default(), upgrade::Version::V1)
.map_err(|e| panic!(e))
});

View File

@ -25,10 +25,9 @@ use libp2p_core::{
PeerId,
Negotiated,
identity,
muxing::StreamMuxer,
transport::{Transport, boxed::Boxed},
either::EitherError,
upgrade::UpgradeError
upgrade::{self, UpgradeError}
};
use libp2p_ping::*;
use libp2p_yamux::{self as yamux, Yamux};
@ -36,7 +35,7 @@ use libp2p_secio::{SecioConfig, SecioOutput, SecioError};
use libp2p_swarm::Swarm;
use libp2p_tcp::{TcpConfig, TcpTransStream};
use futures::{future, prelude::*};
use std::{fmt, io, time::Duration, sync::mpsc::sync_channel};
use std::{io, time::Duration, sync::mpsc::sync_channel};
use tokio::runtime::Runtime;
#[test]
@ -114,7 +113,7 @@ fn mk_transport() -> (
let peer_id = id_keys.public().into_peer_id();
let transport = TcpConfig::new()
.nodelay(true)
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(id_keys))
.multiplex(yamux::Config::default())
.boxed();

View File

@ -42,7 +42,7 @@ const SHA_256: &str = "SHA256";
const SHA_512: &str = "SHA512";
pub(crate) const DEFAULT_AGREEMENTS_PROPOSITION: &str = "P-256,P-384";
pub(crate) const DEFAULT_CIPHERS_PROPOSITION: &str = "AES-128,AES-256,TwofishCTR";
pub(crate) const DEFAULT_CIPHERS_PROPOSITION: &str = "NULL"; // "AES-128,AES-256,TwofishCTR";
pub(crate) const DEFAULT_DIGESTS_PROPOSITION: &str = "SHA256,SHA512";
/// Return a proposition string from the given sequence of `KeyAgreement` values.

View File

@ -31,7 +31,7 @@
//! # fn main() {
//! use futures::Future;
//! use libp2p_secio::{SecioConfig, SecioOutput};
//! use libp2p_core::{PeerId, Multiaddr, identity};
//! use libp2p_core::{PeerId, Multiaddr, identity, upgrade};
//! use libp2p_core::transport::Transport;
//! use libp2p_mplex::MplexConfig;
//! use libp2p_tcp::TcpConfig;
@ -41,7 +41,7 @@
//!
//! // Create a `Transport`.
//! let transport = TcpConfig::new()
//! .upgrade()
//! .upgrade(upgrade::Version::V1)
//! .authenticate(SecioConfig::new(local_keys.clone()))
//! .multiplex(MplexConfig::default());
//!

View File

@ -244,7 +244,7 @@ pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair)
-> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone
{
CommonTransport::new()
.upgrade()
.upgrade(core::upgrade::Version::V1)
.authenticate(secio::SecioConfig::new(keypair))
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))

View File

@ -48,7 +48,7 @@ use futures::prelude::*;
use libp2p_core::{
ConnectedPoint,
PeerId,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError},
upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeError},
};
use std::{cmp::Ordering, error, fmt, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
@ -244,6 +244,7 @@ pub trait ProtocolsHandler {
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct SubstreamProtocol<TUpgrade> {
upgrade: TUpgrade,
upgrade_protocol: upgrade::Version,
timeout: Duration,
}
@ -255,10 +256,18 @@ impl<TUpgrade> SubstreamProtocol<TUpgrade> {
pub fn new(upgrade: TUpgrade) -> SubstreamProtocol<TUpgrade> {
SubstreamProtocol {
upgrade,
upgrade_protocol: upgrade::Version::V1,
timeout: Duration::from_secs(10),
}
}
/// Sets the multistream-select protocol (version) to use for negotiating
/// protocols upgrades on outbound substreams.
pub fn with_upgrade_protocol(mut self, version: upgrade::Version) -> Self {
self.upgrade_protocol = version;
self
}
/// Maps a function over the protocol upgrade.
pub fn map_upgrade<U, F>(self, f: F) -> SubstreamProtocol<U>
where
@ -266,6 +275,7 @@ impl<TUpgrade> SubstreamProtocol<TUpgrade> {
{
SubstreamProtocol {
upgrade: f(self.upgrade),
upgrade_protocol: self.upgrade_protocol,
timeout: self.timeout,
}
}
@ -287,8 +297,8 @@ impl<TUpgrade> SubstreamProtocol<TUpgrade> {
}
/// Converts the substream protocol configuration into the contained upgrade.
pub fn into_upgrade(self) -> TUpgrade {
self.upgrade
pub fn into_upgrade(self) -> (upgrade::Version, TUpgrade) {
(self.upgrade_protocol, self.upgrade)
}
}
@ -460,7 +470,7 @@ where T: ProtocolsHandler
}
fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
self.listen_protocol().into_upgrade()
self.listen_protocol().into_upgrade().1
}
}

View File

@ -111,7 +111,7 @@ where
)>,
/// For each outbound substream request, how to upgrade it. The first element of the tuple
/// is the unique identifier (see `unique_dial_upgrade_id`).
queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>,
queued_dial_upgrades: Vec<(u64, (upgrade::Version, TProtoHandler::OutboundProtocol))>,
/// Unique identifier assigned to each queued dial upgrade.
unique_dial_upgrade_id: u64,
/// The currently planned connection & handler shutdown.
@ -197,7 +197,7 @@ where
NodeHandlerEndpoint::Listener => {
let protocol = self.handler.listen_protocol();
let timeout = protocol.timeout().clone();
let upgrade = upgrade::apply_inbound(substream, protocol.into_upgrade());
let upgrade = upgrade::apply_inbound(substream, protocol.into_upgrade().1);
let with_timeout = Timeout::new(upgrade, timeout);
self.negotiating_in.push(with_timeout);
}
@ -214,8 +214,8 @@ where
}
};
let (_, proto_upgrade) = self.queued_dial_upgrades.remove(pos);
let upgrade = upgrade::apply_outbound(substream, proto_upgrade);
let (_, (version, upgrade)) = self.queued_dial_upgrades.remove(pos);
let upgrade = upgrade::apply_outbound(substream, upgrade, version);
let with_timeout = Timeout::new(upgrade, timeout);
self.negotiating_out.push((user_data, with_timeout));
}

View File

@ -126,8 +126,8 @@ where
let proto1 = self.proto1.listen_protocol();
let proto2 = self.proto2.listen_protocol();
let timeout = std::cmp::max(proto1.timeout(), proto2.timeout()).clone();
SubstreamProtocol::new(SelectUpgrade::new(proto1.into_upgrade(), proto2.into_upgrade()))
.with_timeout(timeout)
let choice = SelectUpgrade::new(proto1.into_upgrade().1, proto2.into_upgrade().1);
SubstreamProtocol::new(choice).with_timeout(timeout)
}
fn inject_fully_negotiated_outbound(&mut self, protocol: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output, endpoint: Self::OutboundOpenInfo) {