mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-14 18:41:22 +00:00
misc/multistream-select/: Remove parallel dialing optimization (#2934)
This is to avoid the usage of the now optional `ls` command, and stay compatible with go-multistream. Closes #2925
This commit is contained in:
committed by
GitHub
parent
749ff00a79
commit
c71115d055
@ -24,7 +24,7 @@ libsecp256k1 = { version = "0.7.0", optional = true }
|
|||||||
log = "0.4"
|
log = "0.4"
|
||||||
multiaddr = { version = "0.14.0" }
|
multiaddr = { version = "0.14.0" }
|
||||||
multihash = { version = "0.16", default-features = false, features = ["std", "multihash-impl", "identity", "sha2"] }
|
multihash = { version = "0.16", default-features = false, features = ["std", "multihash-impl", "identity", "sha2"] }
|
||||||
multistream-select = { version = "0.11", path = "../misc/multistream-select" }
|
multistream-select = { version = "0.12", path = "../misc/multistream-select" }
|
||||||
p256 = { version = "0.11.1", default-features = false, features = ["ecdsa"], optional = true }
|
p256 = { version = "0.11.1", default-features = false, features = ["ecdsa"], optional = true }
|
||||||
parking_lot = "0.12.0"
|
parking_lot = "0.12.0"
|
||||||
pin-project = "1.0.0"
|
pin-project = "1.0.0"
|
||||||
|
@ -1,3 +1,9 @@
|
|||||||
|
# 0.12.0 [unreleased]
|
||||||
|
|
||||||
|
- Remove parallel dialing optimization, to avoid requiring the use of the `ls` command. See [PR 2934].
|
||||||
|
|
||||||
|
[PR 2934]: https://github.com/libp2p/rust-libp2p/pull/2934
|
||||||
|
|
||||||
# 0.11.0 [2022-01-27]
|
# 0.11.0 [2022-01-27]
|
||||||
|
|
||||||
- Migrate to Rust edition 2021 (see [PR 2339]).
|
- Migrate to Rust edition 2021 (see [PR 2339]).
|
||||||
|
@ -3,7 +3,7 @@ name = "multistream-select"
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
rust-version = "1.56.1"
|
rust-version = "1.56.1"
|
||||||
description = "Multistream-select negotiation protocol for libp2p"
|
description = "Multistream-select negotiation protocol for libp2p"
|
||||||
version = "0.11.0"
|
version = "0.12.0"
|
||||||
authors = ["Parity Technologies <admin@parity.io>"]
|
authors = ["Parity Technologies <admin@parity.io>"]
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
repository = "https://github.com/libp2p/rust-libp2p"
|
repository = "https://github.com/libp2p/rust-libp2p"
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
use crate::protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError};
|
use crate::protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError};
|
||||||
use crate::{Negotiated, NegotiationError, Version};
|
use crate::{Negotiated, NegotiationError, Version};
|
||||||
|
|
||||||
use futures::{future::Either, prelude::*};
|
use futures::prelude::*;
|
||||||
use std::{
|
use std::{
|
||||||
convert::TryFrom as _,
|
convert::TryFrom as _,
|
||||||
iter, mem,
|
iter, mem,
|
||||||
@ -39,12 +39,6 @@ use std::{
|
|||||||
/// returned `Future` resolves with the name of the negotiated protocol and
|
/// returned `Future` resolves with the name of the negotiated protocol and
|
||||||
/// a [`Negotiated`] I/O stream.
|
/// a [`Negotiated`] I/O stream.
|
||||||
///
|
///
|
||||||
/// The chosen message flow for protocol negotiation depends on the numbers of
|
|
||||||
/// supported protocols given. That is, this function delegates to serial or
|
|
||||||
/// parallel variant based on the number of protocols given. The number of
|
|
||||||
/// protocols is determined through the `size_hint` of the given iterator and
|
|
||||||
/// thus an inaccurate size estimate may result in a suboptimal choice.
|
|
||||||
///
|
|
||||||
/// Within the scope of this library, a dialer always commits to a specific
|
/// Within the scope of this library, a dialer always commits to a specific
|
||||||
/// multistream-select [`Version`], whereas a listener always supports
|
/// multistream-select [`Version`], whereas a listener always supports
|
||||||
/// all versions supported by this library. Frictionless multistream-select
|
/// all versions supported by this library. Frictionless multistream-select
|
||||||
@ -55,92 +49,32 @@ pub fn dialer_select_proto<R, I>(
|
|||||||
protocols: I,
|
protocols: I,
|
||||||
version: Version,
|
version: Version,
|
||||||
) -> DialerSelectFuture<R, I::IntoIter>
|
) -> DialerSelectFuture<R, I::IntoIter>
|
||||||
where
|
|
||||||
R: AsyncRead + AsyncWrite,
|
|
||||||
I: IntoIterator,
|
|
||||||
I::Item: AsRef<[u8]>,
|
|
||||||
{
|
|
||||||
let iter = protocols.into_iter();
|
|
||||||
// We choose between the "serial" and "parallel" strategies based on the number of protocols.
|
|
||||||
if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) {
|
|
||||||
Either::Left(dialer_select_proto_serial(inner, iter, version))
|
|
||||||
} else {
|
|
||||||
Either::Right(dialer_select_proto_parallel(inner, iter, version))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer
|
|
||||||
/// either trying protocols in-order, or by requesting all protocols supported
|
|
||||||
/// by the remote upfront, from which the first protocol found in the dialer's
|
|
||||||
/// list of protocols is selected.
|
|
||||||
pub type DialerSelectFuture<R, I> = Either<DialerSelectSeq<R, I>, DialerSelectPar<R, I>>;
|
|
||||||
|
|
||||||
/// Returns a `Future` that negotiates a protocol on the given I/O stream.
|
|
||||||
///
|
|
||||||
/// Just like [`dialer_select_proto`] but always using an iterative message flow,
|
|
||||||
/// trying the given list of supported protocols one-by-one.
|
|
||||||
///
|
|
||||||
/// This strategy is preferable if the dialer only supports a few protocols.
|
|
||||||
pub(crate) fn dialer_select_proto_serial<R, I>(
|
|
||||||
inner: R,
|
|
||||||
protocols: I,
|
|
||||||
version: Version,
|
|
||||||
) -> DialerSelectSeq<R, I::IntoIter>
|
|
||||||
where
|
where
|
||||||
R: AsyncRead + AsyncWrite,
|
R: AsyncRead + AsyncWrite,
|
||||||
I: IntoIterator,
|
I: IntoIterator,
|
||||||
I::Item: AsRef<[u8]>,
|
I::Item: AsRef<[u8]>,
|
||||||
{
|
{
|
||||||
let protocols = protocols.into_iter().peekable();
|
let protocols = protocols.into_iter().peekable();
|
||||||
DialerSelectSeq {
|
DialerSelectFuture {
|
||||||
version,
|
version,
|
||||||
protocols,
|
protocols,
|
||||||
state: SeqState::SendHeader {
|
state: State::SendHeader {
|
||||||
io: MessageIO::new(inner),
|
io: MessageIO::new(inner),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a `Future` that negotiates a protocol on the given I/O stream.
|
/// A `Future` returned by [`dialer_select_proto`] which negotiates
|
||||||
///
|
|
||||||
/// Just like [`dialer_select_proto`] but always using a message flow that first
|
|
||||||
/// requests all supported protocols from the remote, selecting the first
|
|
||||||
/// protocol from the given list of supported protocols that is supported
|
|
||||||
/// by the remote.
|
|
||||||
///
|
|
||||||
/// This strategy may be beneficial if the dialer supports many protocols
|
|
||||||
/// and it is unclear whether the remote supports one of the first few.
|
|
||||||
pub(crate) fn dialer_select_proto_parallel<R, I>(
|
|
||||||
inner: R,
|
|
||||||
protocols: I,
|
|
||||||
version: Version,
|
|
||||||
) -> DialerSelectPar<R, I::IntoIter>
|
|
||||||
where
|
|
||||||
R: AsyncRead + AsyncWrite,
|
|
||||||
I: IntoIterator,
|
|
||||||
I::Item: AsRef<[u8]>,
|
|
||||||
{
|
|
||||||
let protocols = protocols.into_iter();
|
|
||||||
DialerSelectPar {
|
|
||||||
version,
|
|
||||||
protocols,
|
|
||||||
state: ParState::SendHeader {
|
|
||||||
io: MessageIO::new(inner),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A `Future` returned by [`dialer_select_proto_serial`] which negotiates
|
|
||||||
/// a protocol iteratively by considering one protocol after the other.
|
/// a protocol iteratively by considering one protocol after the other.
|
||||||
#[pin_project::pin_project]
|
#[pin_project::pin_project]
|
||||||
pub struct DialerSelectSeq<R, I: Iterator> {
|
pub struct DialerSelectFuture<R, I: Iterator> {
|
||||||
// TODO: It would be nice if eventually N = I::Item = Protocol.
|
// TODO: It would be nice if eventually N = I::Item = Protocol.
|
||||||
protocols: iter::Peekable<I>,
|
protocols: iter::Peekable<I>,
|
||||||
state: SeqState<R, I::Item>,
|
state: State<R, I::Item>,
|
||||||
version: Version,
|
version: Version,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum SeqState<R, N> {
|
enum State<R, N> {
|
||||||
SendHeader { io: MessageIO<R> },
|
SendHeader { io: MessageIO<R> },
|
||||||
SendProtocol { io: MessageIO<R>, protocol: N },
|
SendProtocol { io: MessageIO<R>, protocol: N },
|
||||||
FlushProtocol { io: MessageIO<R>, protocol: N },
|
FlushProtocol { io: MessageIO<R>, protocol: N },
|
||||||
@ -148,7 +82,7 @@ enum SeqState<R, N> {
|
|||||||
Done,
|
Done,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R, I> Future for DialerSelectSeq<R, I>
|
impl<R, I> Future for DialerSelectFuture<R, I>
|
||||||
where
|
where
|
||||||
// The Unpin bound here is required because we produce a `Negotiated<R>` as the output.
|
// The Unpin bound here is required because we produce a `Negotiated<R>` as the output.
|
||||||
// It also makes the implementation considerably easier to write.
|
// It also makes the implementation considerably easier to write.
|
||||||
@ -162,12 +96,12 @@ where
|
|||||||
let this = self.project();
|
let this = self.project();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match mem::replace(this.state, SeqState::Done) {
|
match mem::replace(this.state, State::Done) {
|
||||||
SeqState::SendHeader { mut io } => {
|
State::SendHeader { mut io } => {
|
||||||
match Pin::new(&mut io).poll_ready(cx)? {
|
match Pin::new(&mut io).poll_ready(cx)? {
|
||||||
Poll::Ready(()) => {}
|
Poll::Ready(()) => {}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
*this.state = SeqState::SendHeader { io };
|
*this.state = State::SendHeader { io };
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -181,14 +115,14 @@ where
|
|||||||
|
|
||||||
// The dialer always sends the header and the first protocol
|
// The dialer always sends the header and the first protocol
|
||||||
// proposal in one go for efficiency.
|
// proposal in one go for efficiency.
|
||||||
*this.state = SeqState::SendProtocol { io, protocol };
|
*this.state = State::SendProtocol { io, protocol };
|
||||||
}
|
}
|
||||||
|
|
||||||
SeqState::SendProtocol { mut io, protocol } => {
|
State::SendProtocol { mut io, protocol } => {
|
||||||
match Pin::new(&mut io).poll_ready(cx)? {
|
match Pin::new(&mut io).poll_ready(cx)? {
|
||||||
Poll::Ready(()) => {}
|
Poll::Ready(()) => {}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
*this.state = SeqState::SendProtocol { io, protocol };
|
*this.state = State::SendProtocol { io, protocol };
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -200,10 +134,10 @@ where
|
|||||||
log::debug!("Dialer: Proposed protocol: {}", p);
|
log::debug!("Dialer: Proposed protocol: {}", p);
|
||||||
|
|
||||||
if this.protocols.peek().is_some() {
|
if this.protocols.peek().is_some() {
|
||||||
*this.state = SeqState::FlushProtocol { io, protocol }
|
*this.state = State::FlushProtocol { io, protocol }
|
||||||
} else {
|
} else {
|
||||||
match this.version {
|
match this.version {
|
||||||
Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol },
|
Version::V1 => *this.state = State::FlushProtocol { io, protocol },
|
||||||
// This is the only effect that `V1Lazy` has compared to `V1`:
|
// This is the only effect that `V1Lazy` has compared to `V1`:
|
||||||
// Optimistically settling on the only protocol that
|
// Optimistically settling on the only protocol that
|
||||||
// the dialer supports for this negotiation. Notably,
|
// the dialer supports for this negotiation. Notably,
|
||||||
@ -218,21 +152,21 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SeqState::FlushProtocol { mut io, protocol } => {
|
State::FlushProtocol { mut io, protocol } => {
|
||||||
match Pin::new(&mut io).poll_flush(cx)? {
|
match Pin::new(&mut io).poll_flush(cx)? {
|
||||||
Poll::Ready(()) => *this.state = SeqState::AwaitProtocol { io, protocol },
|
Poll::Ready(()) => *this.state = State::AwaitProtocol { io, protocol },
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
*this.state = SeqState::FlushProtocol { io, protocol };
|
*this.state = State::FlushProtocol { io, protocol };
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SeqState::AwaitProtocol { mut io, protocol } => {
|
State::AwaitProtocol { mut io, protocol } => {
|
||||||
let msg = match Pin::new(&mut io).poll_next(cx)? {
|
let msg = match Pin::new(&mut io).poll_next(cx)? {
|
||||||
Poll::Ready(Some(msg)) => msg,
|
Poll::Ready(Some(msg)) => msg,
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
*this.state = SeqState::AwaitProtocol { io, protocol };
|
*this.state = State::AwaitProtocol { io, protocol };
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
// Treat EOF error as [`NegotiationError::Failed`], not as
|
// Treat EOF error as [`NegotiationError::Failed`], not as
|
||||||
@ -243,7 +177,7 @@ where
|
|||||||
|
|
||||||
match msg {
|
match msg {
|
||||||
Message::Header(v) if v == HeaderLine::from(*this.version) => {
|
Message::Header(v) if v == HeaderLine::from(*this.version) => {
|
||||||
*this.state = SeqState::AwaitProtocol { io, protocol };
|
*this.state = State::AwaitProtocol { io, protocol };
|
||||||
}
|
}
|
||||||
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
|
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
|
||||||
log::debug!("Dialer: Received confirmation for protocol: {}", p);
|
log::debug!("Dialer: Received confirmation for protocol: {}", p);
|
||||||
@ -256,148 +190,13 @@ where
|
|||||||
String::from_utf8_lossy(protocol.as_ref())
|
String::from_utf8_lossy(protocol.as_ref())
|
||||||
);
|
);
|
||||||
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
|
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
|
||||||
*this.state = SeqState::SendProtocol { io, protocol }
|
*this.state = State::SendProtocol { io, protocol }
|
||||||
}
|
}
|
||||||
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
|
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SeqState::Done => panic!("SeqState::poll called after completion"),
|
State::Done => panic!("State::poll called after completion"),
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A `Future` returned by [`dialer_select_proto_parallel`] which negotiates
|
|
||||||
/// a protocol selectively by considering all supported protocols of the remote
|
|
||||||
/// "in parallel".
|
|
||||||
#[pin_project::pin_project]
|
|
||||||
pub struct DialerSelectPar<R, I: Iterator> {
|
|
||||||
protocols: I,
|
|
||||||
state: ParState<R, I::Item>,
|
|
||||||
version: Version,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ParState<R, N> {
|
|
||||||
SendHeader { io: MessageIO<R> },
|
|
||||||
SendProtocolsRequest { io: MessageIO<R> },
|
|
||||||
Flush { io: MessageIO<R> },
|
|
||||||
RecvProtocols { io: MessageIO<R> },
|
|
||||||
SendProtocol { io: MessageIO<R>, protocol: N },
|
|
||||||
Done,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R, I> Future for DialerSelectPar<R, I>
|
|
||||||
where
|
|
||||||
// The Unpin bound here is required because we produce a `Negotiated<R>` as the output.
|
|
||||||
// It also makes the implementation considerably easier to write.
|
|
||||||
R: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
I: Iterator,
|
|
||||||
I::Item: AsRef<[u8]>,
|
|
||||||
{
|
|
||||||
type Output = Result<(I::Item, Negotiated<R>), NegotiationError>;
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let this = self.project();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match mem::replace(this.state, ParState::Done) {
|
|
||||||
ParState::SendHeader { mut io } => {
|
|
||||||
match Pin::new(&mut io).poll_ready(cx)? {
|
|
||||||
Poll::Ready(()) => {}
|
|
||||||
Poll::Pending => {
|
|
||||||
*this.state = ParState::SendHeader { io };
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let msg = Message::Header(HeaderLine::from(*this.version));
|
|
||||||
if let Err(err) = Pin::new(&mut io).start_send(msg) {
|
|
||||||
return Poll::Ready(Err(From::from(err)));
|
|
||||||
}
|
|
||||||
|
|
||||||
*this.state = ParState::SendProtocolsRequest { io };
|
|
||||||
}
|
|
||||||
|
|
||||||
ParState::SendProtocolsRequest { mut io } => {
|
|
||||||
match Pin::new(&mut io).poll_ready(cx)? {
|
|
||||||
Poll::Ready(()) => {}
|
|
||||||
Poll::Pending => {
|
|
||||||
*this.state = ParState::SendProtocolsRequest { io };
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Err(err) = Pin::new(&mut io).start_send(Message::ListProtocols) {
|
|
||||||
return Poll::Ready(Err(From::from(err)));
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("Dialer: Requested supported protocols.");
|
|
||||||
*this.state = ParState::Flush { io }
|
|
||||||
}
|
|
||||||
|
|
||||||
ParState::Flush { mut io } => match Pin::new(&mut io).poll_flush(cx)? {
|
|
||||||
Poll::Ready(()) => *this.state = ParState::RecvProtocols { io },
|
|
||||||
Poll::Pending => {
|
|
||||||
*this.state = ParState::Flush { io };
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
ParState::RecvProtocols { mut io } => {
|
|
||||||
let msg = match Pin::new(&mut io).poll_next(cx)? {
|
|
||||||
Poll::Ready(Some(msg)) => msg,
|
|
||||||
Poll::Pending => {
|
|
||||||
*this.state = ParState::RecvProtocols { io };
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
// Treat EOF error as [`NegotiationError::Failed`], not as
|
|
||||||
// [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O
|
|
||||||
// stream as a permissible way to "gracefully" fail a negotiation.
|
|
||||||
Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)),
|
|
||||||
};
|
|
||||||
|
|
||||||
match &msg {
|
|
||||||
Message::Header(h) if h == &HeaderLine::from(*this.version) => {
|
|
||||||
*this.state = ParState::RecvProtocols { io }
|
|
||||||
}
|
|
||||||
Message::Protocols(supported) => {
|
|
||||||
let protocol = this
|
|
||||||
.protocols
|
|
||||||
.by_ref()
|
|
||||||
.find(|p| supported.iter().any(|s| s.as_ref() == p.as_ref()))
|
|
||||||
.ok_or(NegotiationError::Failed)?;
|
|
||||||
log::debug!(
|
|
||||||
"Dialer: Found supported protocol: {}",
|
|
||||||
String::from_utf8_lossy(protocol.as_ref())
|
|
||||||
);
|
|
||||||
*this.state = ParState::SendProtocol { io, protocol };
|
|
||||||
}
|
|
||||||
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ParState::SendProtocol { mut io, protocol } => {
|
|
||||||
match Pin::new(&mut io).poll_ready(cx)? {
|
|
||||||
Poll::Ready(()) => {}
|
|
||||||
Poll::Pending => {
|
|
||||||
*this.state = ParState::SendProtocol { io, protocol };
|
|
||||||
return Poll::Pending;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let p = Protocol::try_from(protocol.as_ref())?;
|
|
||||||
if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
|
|
||||||
return Poll::Ready(Err(From::from(err)));
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("Dialer: Expecting proposed protocol: {}", p);
|
|
||||||
let io = Negotiated::expecting(io.into_reader(), p, None);
|
|
||||||
|
|
||||||
return Poll::Ready(Ok((protocol, io)));
|
|
||||||
}
|
|
||||||
|
|
||||||
ParState::Done => panic!("ParState::poll called after completion"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,7 +92,6 @@ mod length_delimited;
|
|||||||
mod listener_select;
|
mod listener_select;
|
||||||
mod negotiated;
|
mod negotiated;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
mod tests;
|
|
||||||
|
|
||||||
pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
|
pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
|
||||||
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};
|
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};
|
||||||
|
@ -20,14 +20,9 @@
|
|||||||
|
|
||||||
//! Integration tests for protocol negotiation.
|
//! Integration tests for protocol negotiation.
|
||||||
|
|
||||||
#![cfg(test)]
|
|
||||||
|
|
||||||
use crate::dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial};
|
|
||||||
use crate::{dialer_select_proto, listener_select_proto};
|
|
||||||
use crate::{NegotiationError, Version};
|
|
||||||
|
|
||||||
use async_std::net::{TcpListener, TcpStream};
|
use async_std::net::{TcpListener, TcpStream};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn select_proto_basic() {
|
fn select_proto_basic() {
|
||||||
@ -181,67 +176,3 @@ fn negotiation_failed() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn select_proto_parallel() {
|
|
||||||
async fn run(version: Version) {
|
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
||||||
let listener_addr = listener.local_addr().unwrap();
|
|
||||||
|
|
||||||
let server = async_std::task::spawn(async move {
|
|
||||||
let connec = listener.accept().await.unwrap().0;
|
|
||||||
let protos = vec![b"/proto1", b"/proto2"];
|
|
||||||
let (proto, io) = listener_select_proto(connec, protos).await.unwrap();
|
|
||||||
assert_eq!(proto, b"/proto2");
|
|
||||||
io.complete().await.unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
let client = async_std::task::spawn(async move {
|
|
||||||
let connec = TcpStream::connect(&listener_addr).await.unwrap();
|
|
||||||
let protos = vec![b"/proto3", b"/proto2"];
|
|
||||||
let (proto, io) = dialer_select_proto_parallel(connec, protos.into_iter(), version)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(proto, b"/proto2");
|
|
||||||
io.complete().await.unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
server.await;
|
|
||||||
client.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
async_std::task::block_on(run(Version::V1));
|
|
||||||
async_std::task::block_on(run(Version::V1Lazy));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn select_proto_serial() {
|
|
||||||
async fn run(version: Version) {
|
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
||||||
let listener_addr = listener.local_addr().unwrap();
|
|
||||||
|
|
||||||
let server = async_std::task::spawn(async move {
|
|
||||||
let connec = listener.accept().await.unwrap().0;
|
|
||||||
let protos = vec![b"/proto1", b"/proto2"];
|
|
||||||
let (proto, io) = listener_select_proto(connec, protos).await.unwrap();
|
|
||||||
assert_eq!(proto, b"/proto2");
|
|
||||||
io.complete().await.unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
let client = async_std::task::spawn(async move {
|
|
||||||
let connec = TcpStream::connect(&listener_addr).await.unwrap();
|
|
||||||
let protos = vec![b"/proto3", b"/proto2"];
|
|
||||||
let (proto, io) = dialer_select_proto_serial(connec, protos.into_iter(), version)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(proto, b"/proto2");
|
|
||||||
io.complete().await.unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
server.await;
|
|
||||||
client.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
async_std::task::block_on(run(Version::V1));
|
|
||||||
async_std::task::block_on(run(Version::V1Lazy));
|
|
||||||
}
|
|
Reference in New Issue
Block a user