mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-14 10:31:21 +00:00
[multistream] Make the lazy variant more interoperable. (#1855)
* Make the lazy variant interoperable. The remaining optimisation for `V1Lazy` for a listener in the negotiation, whereby the listener delays flushing of the multistream version header, is hereby removed. The remaining effect of `V1Lazy` is only on the side of the dialer, which delays flushing of its singular protocol proposal in order to send it together with the first application data (or an attempt is made to read from the negotiated stream, which similarly triggers a flush of the protocol proposal). This permits `V1Lazy` dialers to be interoperable with `V1` listeners. The remaining theoretical pitfall whereby application data gets misinterpreted as another protocol proposal by a listener remains, however unlikely. `V1` remains the default, but we may eventually risk just making this lazy dialer flush a part of the default `V1` implementation, removing the dedicated `V1Lazy` version identifier. * Update CHANGELOG * Separate versions from mere header lines. Every multistream-select version maps to a specific header line, but there may be different variants of the same multistream-select version using the same header line, i.e. the same wire protocol. * Cleanup * Update misc/multistream-select/CHANGELOG.md
This commit is contained in:
@ -1,5 +1,15 @@
|
||||
# 0.9.0 [unreleased]
|
||||
|
||||
- Make the `V1Lazy` upgrade strategy more interoperable with `V1`. Specifically,
|
||||
the listener now behaves identically with `V1` and `V1Lazy`. Furthermore, the
|
||||
multistream-select protocol header is now also identical, making `V1` and `V1Lazy`
|
||||
indistinguishable on the wire. The remaining central effect of `V1Lazy` is that the dialer,
|
||||
if it only supports a single protocol in a negotiation, optimistically settles on that
|
||||
protocol without immediately flushing the negotiation data (i.e. protocol proposal)
|
||||
and without waiting for the corresponding confirmation before it is able to start
|
||||
sending application data, expecting the used protocol to be confirmed with
|
||||
the response.
|
||||
|
||||
- Fix the encoding and decoding of `ls` responses to
|
||||
be spec-compliant and interoperable with other implementations.
|
||||
For a clean upgrade, `0.8.4` must already be deployed.
|
||||
|
@ -20,8 +20,8 @@
|
||||
|
||||
//! Protocol negotiation strategies for the peer acting as the dialer.
|
||||
|
||||
use crate::{Negotiated, NegotiationError};
|
||||
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version};
|
||||
use crate::{Negotiated, NegotiationError, Version};
|
||||
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine};
|
||||
|
||||
use futures::{future::Either, prelude::*};
|
||||
use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}};
|
||||
@ -41,7 +41,7 @@ use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}};
|
||||
/// thus an inaccurate size estimate may result in a suboptimal choice.
|
||||
///
|
||||
/// Within the scope of this library, a dialer always commits to a specific
|
||||
/// multistream-select protocol [`Version`], whereas a listener always supports
|
||||
/// multistream-select [`Version`], whereas a listener always supports
|
||||
/// all versions supported by this library. Frictionless multistream-select
|
||||
/// protocol upgrades may thus proceed by deployments with updated listeners,
|
||||
/// eventually followed by deployments of dialers choosing the newer protocol.
|
||||
@ -181,11 +181,15 @@ where
|
||||
},
|
||||
}
|
||||
|
||||
if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) {
|
||||
let h = HeaderLine::from(*this.version);
|
||||
if let Err(err) = Pin::new(&mut io).start_send(Message::Header(h)) {
|
||||
return Poll::Ready(Err(From::from(err)));
|
||||
}
|
||||
|
||||
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
|
||||
|
||||
// The dialer always sends the header and the first protocol
|
||||
// proposal in one go for efficiency.
|
||||
*this.state = SeqState::SendProtocol { io, protocol };
|
||||
}
|
||||
|
||||
@ -209,9 +213,14 @@ where
|
||||
} else {
|
||||
match this.version {
|
||||
Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol },
|
||||
// This is the only effect that `V1Lazy` has compared to `V1`:
|
||||
// Optimistically settling on the only protocol that
|
||||
// the dialer supports for this negotiation. Notably,
|
||||
// the dialer expects a regular `V1` response.
|
||||
Version::V1Lazy => {
|
||||
log::debug!("Dialer: Expecting proposed protocol: {}", p);
|
||||
let io = Negotiated::expecting(io.into_reader(), p, *this.version);
|
||||
let hl = HeaderLine::from(Version::V1Lazy);
|
||||
let io = Negotiated::expecting(io.into_reader(), p, Some(hl));
|
||||
return Poll::Ready(Ok((protocol, io)))
|
||||
}
|
||||
}
|
||||
@ -242,7 +251,7 @@ where
|
||||
};
|
||||
|
||||
match msg {
|
||||
Message::Header(v) if v == *this.version => {
|
||||
Message::Header(v) if v == HeaderLine::from(*this.version) => {
|
||||
*this.state = SeqState::AwaitProtocol { io, protocol };
|
||||
}
|
||||
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
|
||||
@ -318,7 +327,8 @@ where
|
||||
},
|
||||
}
|
||||
|
||||
if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) {
|
||||
let msg = Message::Header(HeaderLine::from(*this.version));
|
||||
if let Err(err) = Pin::new(&mut io).start_send(msg) {
|
||||
return Poll::Ready(Err(From::from(err)));
|
||||
}
|
||||
|
||||
@ -366,7 +376,7 @@ where
|
||||
};
|
||||
|
||||
match &msg {
|
||||
Message::Header(v) if v == this.version => {
|
||||
Message::Header(h) if h == &HeaderLine::from(*this.version) => {
|
||||
*this.state = ParState::RecvProtocols { io }
|
||||
}
|
||||
Message::Protocols(supported) => {
|
||||
@ -395,9 +405,10 @@ where
|
||||
if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
|
||||
return Poll::Ready(Err(From::from(err)));
|
||||
}
|
||||
log::debug!("Dialer: Expecting proposed protocol: {}", p);
|
||||
|
||||
let io = Negotiated::expecting(io.into_reader(), p, *this.version);
|
||||
log::debug!("Dialer: Expecting proposed protocol: {}", p);
|
||||
let io = Negotiated::expecting(io.into_reader(), p, None);
|
||||
|
||||
return Poll::Ready(Ok((protocol, io)))
|
||||
}
|
||||
|
||||
|
@ -48,28 +48,23 @@
|
||||
//!
|
||||
//! ## [`Negotiated`](self::Negotiated)
|
||||
//!
|
||||
//! When a dialer or listener participating in a negotiation settles
|
||||
//! on a protocol to use, the [`DialerSelectFuture`] respectively
|
||||
//! [`ListenerSelectFuture`] yields a [`Negotiated`](self::Negotiated)
|
||||
//! I/O stream.
|
||||
//!
|
||||
//! Notably, when a `DialerSelectFuture` resolves to a `Negotiated`, it may not yet
|
||||
//! have written the last negotiation message to the underlying I/O stream and may
|
||||
//! still be expecting confirmation for that protocol, despite having settled on
|
||||
//! a protocol to use.
|
||||
//!
|
||||
//! Similarly, when a `ListenerSelectFuture` resolves to a `Negotiated`, it may not
|
||||
//! yet have sent the last negotiation message despite having settled on a protocol
|
||||
//! proposed by the dialer that it supports.
|
||||
//!
|
||||
//! This behaviour allows both the dialer and the listener to send data
|
||||
//! relating to the negotiated protocol together with the last negotiation
|
||||
//! message(s), which, in the case of the dialer only supporting a single
|
||||
//! protocol, results in 0-RTT negotiation. Note, however, that a dialer
|
||||
//! that performs multiple 0-RTT negotiations in sequence for different
|
||||
//! protocols layered on top of each other may trigger undesirable behaviour
|
||||
//! for a listener not supporting one of the intermediate protocols.
|
||||
//! See [`dialer_select_proto`](self::dialer_select_proto).
|
||||
//! A `Negotiated` represents an I/O stream that has settled on a protocol
|
||||
//! to use. By default, with [`Version::V1`], protocol negotiation is always
|
||||
//! at least one dedicated round-trip message exchange, before application
|
||||
//! data for the negotiated protocol can be sent by the dialer. There is
|
||||
//! a variant [`Version::V1Lazy`] that permits 0-RTT negotiation if the
|
||||
//! dialer only supports a single protocol. In that case, when a dialer
|
||||
//! settles on a protocol to use, the [`DialerSelectFuture`] yields a
|
||||
//! [`Negotiated`](self::Negotiated) I/O stream before the negotiation
|
||||
//! data has been flushed. It is then expecting confirmation for that protocol
|
||||
//! as the first messages read from the stream. This behaviour allows the dialer
|
||||
//! to immediately send data relating to the negotiated protocol together with the
|
||||
//! remaining negotiation message(s). Note, however, that a dialer that performs
|
||||
//! multiple 0-RTT negotiations in sequence for different protocols layered on
|
||||
//! top of each other may trigger undesirable behaviour for a listener not
|
||||
//! supporting one of the intermediate protocols. See
|
||||
//! [`dialer_select_proto`](self::dialer_select_proto) and the documentation
|
||||
//! of [`Version::V1Lazy`] for further details.
|
||||
//!
|
||||
//! ## Examples
|
||||
//!
|
||||
@ -100,6 +95,54 @@ mod protocol;
|
||||
mod tests;
|
||||
|
||||
pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError};
|
||||
pub use self::protocol::{ProtocolError, Version};
|
||||
pub use self::protocol::ProtocolError;
|
||||
pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
|
||||
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};
|
||||
|
||||
/// Supported multistream-select versions.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum Version {
|
||||
/// Version 1 of the multistream-select protocol. See [1] and [2].
|
||||
///
|
||||
/// [1]: https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation
|
||||
/// [2]: https://github.com/multiformats/multistream-select
|
||||
V1,
|
||||
/// A "lazy" variant of version 1 that is identical on the wire but whereby
|
||||
/// the dialer delays flushing protocol negotiation data in order to combine
|
||||
/// it with initial application data, thus performing 0-RTT negotiation.
|
||||
///
|
||||
/// This strategy is only applicable for the node with the role of "dialer"
|
||||
/// in the negotiation and only if the dialer supports just a single
|
||||
/// application protocol. In that case the dialer immedidately "settles"
|
||||
/// on that protocol, buffering the negotiation messages to be sent
|
||||
/// with the first round of application protocol data (or an attempt
|
||||
/// is made to read from the `Negotiated` I/O stream).
|
||||
///
|
||||
/// A listener will behave identically to `V1`. This ensures interoperability with `V1`.
|
||||
/// Notably, it will immediately send the multistream header as well as the protocol
|
||||
/// confirmation, resulting in multiple frames being sent on the underlying transport.
|
||||
/// Nevertheless, if the listener supports the protocol that the dialer optimistically
|
||||
/// settled on, it can be a 0-RTT negotiation.
|
||||
///
|
||||
/// > **Note**: `V1Lazy` is specific to `rust-libp2p`. The wire protocol is identical to `V1`
|
||||
/// > and generally interoperable with peers only supporting `V1`. Nevertheless, there is a
|
||||
/// > pitfall that is rarely encountered: When nesting multiple protocol negotiations, the
|
||||
/// > listener should either be known to support all of the dialer's optimistically chosen
|
||||
/// > protocols or there is must be no intermediate protocol without a payload and none of
|
||||
/// > the protocol payloads must have the potential for being mistaken for a multistream-select
|
||||
/// > protocol message. This avoids rare edge-cases whereby the listener may not recognize
|
||||
/// > upgrade boundaries and erroneously process a request despite not supporting one of
|
||||
/// > the intermediate protocols that the dialer committed to. See [1] and [2].
|
||||
///
|
||||
/// [1]: https://github.com/multiformats/go-multistream/issues/20
|
||||
/// [2]: https://github.com/libp2p/rust-libp2p/pull/1212
|
||||
V1Lazy,
|
||||
// Draft: https://github.com/libp2p/specs/pull/95
|
||||
// V2,
|
||||
}
|
||||
|
||||
impl Default for Version {
|
||||
fn default() -> Self {
|
||||
Version::V1
|
||||
}
|
||||
}
|
@ -22,7 +22,7 @@
|
||||
//! in a multistream-select protocol negotiation.
|
||||
|
||||
use crate::{Negotiated, NegotiationError};
|
||||
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version};
|
||||
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine};
|
||||
|
||||
use futures::prelude::*;
|
||||
use smallvec::SmallVec;
|
||||
@ -81,7 +81,7 @@ where
|
||||
N: AsRef<[u8]>
|
||||
{
|
||||
RecvHeader { io: MessageIO<R> },
|
||||
SendHeader { io: MessageIO<R>, version: Version },
|
||||
SendHeader { io: MessageIO<R> },
|
||||
RecvMessage { io: MessageIO<R> },
|
||||
SendMessage {
|
||||
io: MessageIO<R>,
|
||||
@ -111,8 +111,10 @@ where
|
||||
match mem::replace(this.state, State::Done) {
|
||||
State::RecvHeader { mut io } => {
|
||||
match io.poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(Message::Header(version)))) => {
|
||||
*this.state = State::SendHeader { io, version }
|
||||
Poll::Ready(Some(Ok(Message::Header(h)))) => {
|
||||
match h {
|
||||
HeaderLine::V1 => *this.state = State::SendHeader { io }
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(_))) => {
|
||||
return Poll::Ready(Err(ProtocolError::InvalidMessage.into()))
|
||||
@ -129,24 +131,22 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
State::SendHeader { mut io, version } => {
|
||||
State::SendHeader { mut io } => {
|
||||
match Pin::new(&mut io).poll_ready(cx) {
|
||||
Poll::Pending => {
|
||||
*this.state = State::SendHeader { io, version };
|
||||
*this.state = State::SendHeader { io };
|
||||
return Poll::Pending
|
||||
},
|
||||
Poll::Ready(Ok(())) => {},
|
||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
|
||||
}
|
||||
|
||||
if let Err(err) = Pin::new(&mut io).start_send(Message::Header(version)) {
|
||||
let msg = Message::Header(HeaderLine::V1);
|
||||
if let Err(err) = Pin::new(&mut io).start_send(msg) {
|
||||
return Poll::Ready(Err(From::from(err)));
|
||||
}
|
||||
|
||||
*this.state = match version {
|
||||
Version::V1 => State::Flush { io, protocol: None },
|
||||
Version::V1Lazy => State::RecvMessage { io },
|
||||
}
|
||||
*this.state = State::Flush { io, protocol: None };
|
||||
}
|
||||
|
||||
State::RecvMessage { mut io } => {
|
||||
|
@ -18,7 +18,7 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError};
|
||||
use crate::protocol::{Protocol, MessageReader, Message, ProtocolError, HeaderLine};
|
||||
|
||||
use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready};
|
||||
use pin_project::pin_project;
|
||||
@ -80,8 +80,12 @@ impl<TInner> Negotiated<TInner> {
|
||||
|
||||
/// Creates a `Negotiated` in state [`State::Expecting`] that is still
|
||||
/// expecting confirmation of the given `protocol`.
|
||||
pub(crate) fn expecting(io: MessageReader<TInner>, protocol: Protocol, version: Version) -> Self {
|
||||
Negotiated { state: State::Expecting { io, protocol, version } }
|
||||
pub(crate) fn expecting(
|
||||
io: MessageReader<TInner>,
|
||||
protocol: Protocol,
|
||||
header: Option<HeaderLine>
|
||||
) -> Self {
|
||||
Negotiated { state: State::Expecting { io, protocol, header } }
|
||||
}
|
||||
|
||||
/// Polls the `Negotiated` for completion.
|
||||
@ -111,11 +115,11 @@ impl<TInner> Negotiated<TInner> {
|
||||
// Read outstanding protocol negotiation messages.
|
||||
loop {
|
||||
match mem::replace(&mut *this.state, State::Invalid) {
|
||||
State::Expecting { mut io, protocol, version } => {
|
||||
State::Expecting { mut io, header, protocol } => {
|
||||
let msg = match Pin::new(&mut io).poll_next(cx)? {
|
||||
Poll::Ready(Some(msg)) => msg,
|
||||
Poll::Pending => {
|
||||
*this.state = State::Expecting { io, protocol, version };
|
||||
*this.state = State::Expecting { io, header, protocol };
|
||||
return Poll::Pending
|
||||
},
|
||||
Poll::Ready(None) => {
|
||||
@ -124,9 +128,9 @@ impl<TInner> Negotiated<TInner> {
|
||||
}
|
||||
};
|
||||
|
||||
if let Message::Header(v) = &msg {
|
||||
if *v == version {
|
||||
*this.state = State::Expecting { io, protocol, version };
|
||||
if let Message::Header(h) = &msg {
|
||||
if Some(h) == header.as_ref() {
|
||||
*this.state = State::Expecting { io, protocol, header: None };
|
||||
continue
|
||||
}
|
||||
}
|
||||
@ -165,10 +169,11 @@ enum State<R> {
|
||||
/// The underlying I/O stream.
|
||||
#[pin]
|
||||
io: MessageReader<R>,
|
||||
/// The expected protocol (i.e. name and version).
|
||||
/// The expected negotiation header/preamble (i.e. multistream-select version),
|
||||
/// if one is still expected to be received.
|
||||
header: Option<HeaderLine>,
|
||||
/// The expected application protocol (i.e. name and version).
|
||||
protocol: Protocol,
|
||||
/// The expected multistream-select protocol version.
|
||||
version: Version
|
||||
},
|
||||
|
||||
/// In this state, a protocol has been agreed upon and I/O
|
||||
|
@ -25,6 +25,7 @@
|
||||
//! `Stream` and `Sink` implementations of `MessageIO` and
|
||||
//! `MessageReader`.
|
||||
|
||||
use crate::Version;
|
||||
use crate::length_delimited::{LengthDelimited, LengthDelimitedReader};
|
||||
|
||||
use bytes::{Bytes, BytesMut, BufMut};
|
||||
@ -37,71 +38,25 @@ const MAX_PROTOCOLS: usize = 1000;
|
||||
|
||||
/// The encoded form of a multistream-select 1.0.0 header message.
|
||||
const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n";
|
||||
/// The encoded form of a multistream-select 1.0.0 header message.
|
||||
const MSG_MULTISTREAM_1_0_LAZY: &[u8] = b"/multistream-lazy/1\n";
|
||||
/// The encoded form of a multistream-select 'na' message.
|
||||
const MSG_PROTOCOL_NA: &[u8] = b"na\n";
|
||||
/// The encoded form of a multistream-select 'ls' message.
|
||||
const MSG_LS: &[u8] = b"ls\n";
|
||||
|
||||
/// Supported multistream-select protocol versions.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum Version {
|
||||
/// Version 1 of the multistream-select protocol. See [1] and [2].
|
||||
/// The multistream-select header lines preceeding negotiation.
|
||||
///
|
||||
/// [1]: https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation
|
||||
/// [2]: https://github.com/multiformats/multistream-select
|
||||
/// Every [`Version`] has a corresponding header line.
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum HeaderLine {
|
||||
/// The `/multistream/1.0.0` header line.
|
||||
V1,
|
||||
/// A lazy variant of version 1 that is identical on the wire but delays
|
||||
/// sending of protocol negotiation data as much as possible.
|
||||
///
|
||||
/// Delaying the sending of protocol negotiation data can result in
|
||||
/// significantly fewer network roundtrips used for the negotiation,
|
||||
/// up to 0-RTT negotiation.
|
||||
///
|
||||
/// 0-RTT negotiation is achieved if the dialer supports only a single
|
||||
/// application protocol. In that case the dialer immedidately settles
|
||||
/// on that protocol, buffering the negotiation messages to be sent
|
||||
/// with the first round of application protocol data (or an attempt
|
||||
/// is made to read from the `Negotiated` I/O stream).
|
||||
///
|
||||
/// A listener receiving a `V1Lazy` header will similarly delay sending
|
||||
/// of the protocol confirmation. Though typically the listener will need
|
||||
/// to read the request data before sending its response, thus triggering
|
||||
/// sending of the protocol confirmation, which, in absence of additional
|
||||
/// buffering on lower layers will result in at least two response frames
|
||||
/// to be sent.
|
||||
///
|
||||
/// `V1Lazy` is specific to `rust-libp2p`: While the wire protocol
|
||||
/// is identical to `V1`, delayed sending of protocol negotiation frames
|
||||
/// is only safe under the following assumptions:
|
||||
///
|
||||
/// 1. The dialer is assumed to always send the first multistream-select
|
||||
/// protocol message immediately after the multistream header, without
|
||||
/// first waiting for confirmation of that header. Since the listener
|
||||
/// delays sending the protocol confirmation, a deadlock situation may
|
||||
/// otherwise occurs that is only resolved by a timeout. This assumption
|
||||
/// is trivially satisfied if both peers support and use `V1Lazy`.
|
||||
///
|
||||
/// 2. When nesting multiple protocol negotiations, the listener is either
|
||||
/// known to support all of the dialer's optimistically chosen protocols
|
||||
/// or there is no intermediate protocol without a payload and none of
|
||||
/// the protocol payloads has the potential for being mistaken for a
|
||||
/// multistream-select protocol message. This avoids rare edge-cases whereby
|
||||
/// the listener may not recognize upgrade boundaries and erroneously
|
||||
/// process a request despite not supporting one of the intermediate
|
||||
/// protocols that the dialer committed to. See [1] and [2].
|
||||
///
|
||||
/// [1]: https://github.com/multiformats/go-multistream/issues/20
|
||||
/// [2]: https://github.com/libp2p/rust-libp2p/pull/1212
|
||||
V1Lazy,
|
||||
// Draft: https://github.com/libp2p/specs/pull/95
|
||||
// V2,
|
||||
}
|
||||
|
||||
impl Default for Version {
|
||||
fn default() -> Self {
|
||||
Version::V1
|
||||
impl From<Version> for HeaderLine {
|
||||
fn from(v: Version) -> HeaderLine {
|
||||
match v {
|
||||
Version::V1 | Version::V1Lazy => HeaderLine::V1,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,7 +103,7 @@ impl fmt::Display for Protocol {
|
||||
pub enum Message {
|
||||
/// A header message identifies the multistream-select protocol
|
||||
/// that the sender wishes to speak.
|
||||
Header(Version),
|
||||
Header(HeaderLine),
|
||||
/// A protocol message identifies a protocol request or acknowledgement.
|
||||
Protocol(Protocol),
|
||||
/// A message through which a peer requests the complete list of
|
||||
@ -164,16 +119,11 @@ impl Message {
|
||||
/// Encodes a `Message` into its byte representation.
|
||||
pub fn encode(&self, dest: &mut BytesMut) -> Result<(), ProtocolError> {
|
||||
match self {
|
||||
Message::Header(Version::V1) => {
|
||||
Message::Header(HeaderLine::V1) => {
|
||||
dest.reserve(MSG_MULTISTREAM_1_0.len());
|
||||
dest.put(MSG_MULTISTREAM_1_0);
|
||||
Ok(())
|
||||
}
|
||||
Message::Header(Version::V1Lazy) => {
|
||||
dest.reserve(MSG_MULTISTREAM_1_0_LAZY.len());
|
||||
dest.put(MSG_MULTISTREAM_1_0_LAZY);
|
||||
Ok(())
|
||||
}
|
||||
Message::Protocol(p) => {
|
||||
let len = p.0.as_ref().len() + 1; // + 1 for \n
|
||||
dest.reserve(len);
|
||||
@ -209,12 +159,8 @@ impl Message {
|
||||
|
||||
/// Decodes a `Message` from its byte representation.
|
||||
pub fn decode(mut msg: Bytes) -> Result<Message, ProtocolError> {
|
||||
if msg == MSG_MULTISTREAM_1_0_LAZY {
|
||||
return Ok(Message::Header(Version::V1Lazy))
|
||||
}
|
||||
|
||||
if msg == MSG_MULTISTREAM_1_0 {
|
||||
return Ok(Message::Header(Version::V1))
|
||||
return Ok(Message::Header(HeaderLine::V1))
|
||||
}
|
||||
|
||||
if msg == MSG_PROTOCOL_NA {
|
||||
@ -506,7 +452,7 @@ mod tests {
|
||||
impl Arbitrary for Message {
|
||||
fn arbitrary<G: Gen>(g: &mut G) -> Message {
|
||||
match g.gen_range(0, 5) {
|
||||
0 => Message::Header(Version::V1),
|
||||
0 => Message::Header(HeaderLine::V1),
|
||||
1 => Message::NotAvailable,
|
||||
2 => Message::ListProtocols,
|
||||
3 => Message::Protocol(Protocol::arbitrary(g)),
|
||||
|
@ -702,8 +702,8 @@ fn get_record_many() {
|
||||
..
|
||||
})) => {
|
||||
assert_eq!(id, qid);
|
||||
assert_eq!(records.len(), num_results);
|
||||
assert_eq!(records.first().unwrap().record, record);
|
||||
assert!(records.len() >= num_results);
|
||||
assert!(records.into_iter().all(|r| r.record == record));
|
||||
return Poll::Ready(());
|
||||
}
|
||||
// Ignore any other event.
|
||||
|
Reference in New Issue
Block a user