[multistream-select] Require remaining negotiation data to be flushed. (#1781)

* Require remaining negotiation data to be flushed.

There appears to still be an edge-case whereby the
`remaining` data to send w.r.t. protocol negotiation to send
is successfully written before a `poll_read` on a `Negotiated` stream,
but where the subsequent `poll_flush()` is pending.
Now `remaining` is empty and the next `poll_read()`
will go straight to reading from the underlying
I/O stream, despite the flush not having happened
yet, which can lead to a form of deadlock during
protocol negotiation.

Rather than complicating the existing code further in
order to accommodate for this case, it seems preferable
to simplify the code by giving up on this optimisation
that only affects the last negotiation protocol message
sent by the "listener". So we give up on the ability
to combine data sent by the "listener" immediately
after protocol negotiation together with the final
negotiation frame in the same transport-level frame/packet.

* Update changelog.

* Add missing comma.
This commit is contained in:
Roman Borschel
2020-10-01 12:29:51 +02:00
committed by GitHub
parent c19344dee7
commit 8cec457b5e
6 changed files with 68 additions and 190 deletions

View File

@ -1,5 +1,10 @@
# 0.8.3 [unreleased] # 0.8.3 [unreleased]
- Fix a potential deadlock during protocol negotiation due
to a missing flush, potentially resulting in sporadic protocol
upgrade timeouts.
[PR 1781](https://github.com/libp2p/rust-libp2p/pull/1781).
- Update dependencies. - Update dependencies.
# 0.8.2 [2020-06-22] # 0.8.2 [2020-06-22]

View File

@ -241,8 +241,7 @@ where
} }
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
log::debug!("Dialer: Received confirmation for protocol: {}", p); log::debug!("Dialer: Received confirmation for protocol: {}", p);
let (io, remaining) = io.into_inner(); let io = Negotiated::completed(io.into_inner());
let io = Negotiated::completed(io, remaining);
return Poll::Ready(Ok((protocol, io))); return Poll::Ready(Ok((protocol, io)));
} }
Message::NotAvailable => { Message::NotAvailable => {

View File

@ -76,22 +76,18 @@ impl<R> LengthDelimited<R> {
} }
} }
/// Drops the [`LengthDelimited`] resource, yielding the underlying I/O stream /// Drops the [`LengthDelimited`] resource, yielding the underlying I/O stream.
/// together with the remaining write buffer containing the uvi-framed data
/// that has not yet been written to the underlying I/O stream.
///
/// The returned remaining write buffer may be prepended to follow-up
/// protocol data to send with a single `write`. Either way, if non-empty,
/// the write buffer _must_ eventually be written to the I/O stream
/// _before_ any follow-up data, in order to maintain a correct data stream.
/// ///
/// # Panic /// # Panic
/// ///
/// Will panic if called while there is data in the read buffer. The read buffer is /// Will panic if called while there is data in the read or write buffer.
/// guaranteed to be empty whenever `Stream::poll` yields a new `Bytes` frame. /// The read buffer is guaranteed to be empty whenever `Stream::poll` yields
pub fn into_inner(self) -> (R, BytesMut) { /// a new `Bytes` frame. The write buffer is guaranteed to be empty after
/// flushing.
pub fn into_inner(self) -> R {
assert!(self.read_buffer.is_empty()); assert!(self.read_buffer.is_empty());
(self.inner, self.write_buffer) assert!(self.write_buffer.is_empty());
self.inner
} }
/// Converts the [`LengthDelimited`] into a [`LengthDelimitedReader`], dropping the /// Converts the [`LengthDelimited`] into a [`LengthDelimitedReader`], dropping the
@ -303,7 +299,7 @@ impl<R> LengthDelimitedReader<R> {
/// yield a new `Message`. The write buffer is guaranteed to be empty whenever /// yield a new `Message`. The write buffer is guaranteed to be empty whenever
/// [`LengthDelimited::poll_write_buffer`] yields [`Poll::Ready`] or after /// [`LengthDelimited::poll_write_buffer`] yields [`Poll::Ready`] or after
/// the [`Sink`] has been completely flushed via [`Sink::poll_flush`]. /// the [`Sink`] has been completely flushed via [`Sink::poll_flush`].
pub fn into_inner(self) -> (R, BytesMut) { pub fn into_inner(self) -> R {
self.inner.into_inner() self.inner.into_inner()
} }
} }

View File

@ -88,7 +88,10 @@ where
message: Message, message: Message,
protocol: Option<N> protocol: Option<N>
}, },
Flush { io: MessageIO<R> }, Flush {
io: MessageIO<R>,
protocol: Option<N>
},
Done Done
} }
@ -141,7 +144,7 @@ where
} }
*this.state = match version { *this.state = match version {
Version::V1 => State::Flush { io }, Version::V1 => State::Flush { io, protocol: None },
Version::V1Lazy => State::RecvMessage { io }, Version::V1Lazy => State::RecvMessage { io },
} }
} }
@ -204,28 +207,28 @@ where
return Poll::Ready(Err(From::from(err))); return Poll::Ready(Err(From::from(err)));
} }
// If a protocol has been selected, finish negotiation. *this.state = State::Flush { io, protocol };
// Otherwise flush the sink and expect to receive another
// message.
*this.state = match protocol {
Some(protocol) => {
log::debug!("Listener: sent confirmed protocol: {}",
String::from_utf8_lossy(protocol.as_ref()));
let (io, remaining) = io.into_inner();
let io = Negotiated::completed(io, remaining);
return Poll::Ready(Ok((protocol, io)));
}
None => State::Flush { io }
};
} }
State::Flush { mut io } => { State::Flush { mut io, protocol } => {
match Pin::new(&mut io).poll_flush(cx) { match Pin::new(&mut io).poll_flush(cx) {
Poll::Pending => { Poll::Pending => {
*this.state = State::Flush { io }; *this.state = State::Flush { io, protocol };
return Poll::Pending return Poll::Pending
}, },
Poll::Ready(Ok(())) => *this.state = State::RecvMessage { io }, Poll::Ready(Ok(())) => {
// If a protocol has been selected, finish negotiation.
// Otherwise expect to receive another message.
match protocol {
Some(protocol) => {
log::debug!("Listener: sent confirmed protocol: {}",
String::from_utf8_lossy(protocol.as_ref()));
let io = Negotiated::completed(io.into_inner());
return Poll::Ready(Ok((protocol, io)))
}
None => *this.state = State::RecvMessage { io }
}
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
} }
} }

View File

@ -20,7 +20,6 @@
use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError}; use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError};
use bytes::{BytesMut, Buf};
use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready}; use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready};
use pin_project::pin_project; use pin_project::pin_project;
use std::{error::Error, fmt, io, mem, pin::Pin, task::{Context, Poll}}; use std::{error::Error, fmt, io, mem, pin::Pin, task::{Context, Poll}};
@ -74,10 +73,9 @@ where
} }
impl<TInner> Negotiated<TInner> { impl<TInner> Negotiated<TInner> {
/// Creates a `Negotiated` in state [`State::Completed`], possibly /// Creates a `Negotiated` in state [`State::Completed`].
/// with `remaining` data to be sent. pub(crate) fn completed(io: TInner) -> Self {
pub(crate) fn completed(io: TInner, remaining: BytesMut) -> Self { Negotiated { state: State::Completed { io } }
Negotiated { state: State::Completed { io, remaining } }
} }
/// Creates a `Negotiated` in state [`State::Expecting`] that is still /// Creates a `Negotiated` in state [`State::Expecting`] that is still
@ -107,10 +105,7 @@ impl<TInner> Negotiated<TInner> {
let mut this = self.project(); let mut this = self.project();
match this.state.as_mut().project() { match this.state.as_mut().project() {
StateProj::Completed { remaining, .. } => { StateProj::Completed { .. } => return Poll::Ready(Ok(())),
debug_assert!(remaining.is_empty());
return Poll::Ready(Ok(()))
}
_ => {} _ => {}
} }
@ -139,8 +134,7 @@ impl<TInner> Negotiated<TInner> {
if let Message::Protocol(p) = &msg { if let Message::Protocol(p) = &msg {
if p.as_ref() == protocol.as_ref() { if p.as_ref() == protocol.as_ref() {
log::debug!("Negotiated: Received confirmation for protocol: {}", p); log::debug!("Negotiated: Received confirmation for protocol: {}", p);
let (io, remaining) = io.into_inner(); *this.state = State::Completed { io: io.into_inner() };
*this.state = State::Completed { io, remaining };
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
} }
@ -165,7 +159,8 @@ impl<TInner> Negotiated<TInner> {
#[derive(Debug)] #[derive(Debug)]
enum State<R> { enum State<R> {
/// In this state, a `Negotiated` is still expecting to /// In this state, a `Negotiated` is still expecting to
/// receive confirmation of the protocol it as settled on. /// receive confirmation of the protocol it has optimistically
/// settled on.
Expecting { Expecting {
/// The underlying I/O stream. /// The underlying I/O stream.
#[pin] #[pin]
@ -176,11 +171,9 @@ enum State<R> {
version: Version version: Version
}, },
/// In this state, a protocol has been agreed upon and may /// In this state, a protocol has been agreed upon and I/O
/// only be pending the sending of the final acknowledgement, /// on the underlying stream can commence.
/// which is prepended to / combined with the next write for Completed { #[pin] io: R },
/// efficiency.
Completed { #[pin] io: R, remaining: BytesMut },
/// Temporary state while moving the `io` resource from /// Temporary state while moving the `io` resource from
/// `Expecting` to `Completed`. /// `Expecting` to `Completed`.
@ -196,12 +189,9 @@ where
{ {
loop { loop {
match self.as_mut().project().state.project() { match self.as_mut().project().state.project() {
StateProj::Completed { io, remaining } => { StateProj::Completed { io } => {
// If protocol negotiation is complete and there is no // If protocol negotiation is complete, commence with reading.
// remaining data to be flushed, commence with reading. return io.poll_read(cx, buf)
if remaining.is_empty() {
return io.poll_read(cx, buf)
}
}, },
_ => {} _ => {}
} }
@ -230,12 +220,9 @@ where
{ {
loop { loop {
match self.as_mut().project().state.project() { match self.as_mut().project().state.project() {
StateProj::Completed { io, remaining } => { StateProj::Completed { io } => {
// If protocol negotiation is complete and there is no // If protocol negotiation is complete, commence with reading.
// remaining data to be flushed, commence with reading. return io.poll_read_vectored(cx, bufs)
if remaining.is_empty() {
return io.poll_read_vectored(cx, bufs)
}
}, },
_ => {} _ => {}
} }
@ -257,16 +244,7 @@ where
{ {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
match self.project().state.project() { match self.project().state.project() {
StateProj::Completed { mut io, remaining } => { StateProj::Completed { io } => io.poll_write(cx, buf),
while !remaining.is_empty() {
let n = ready!(io.as_mut().poll_write(cx, &remaining)?);
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
remaining.advance(n);
}
io.poll_write(cx, buf)
},
StateProj::Expecting { io, .. } => io.poll_write(cx, buf), StateProj::Expecting { io, .. } => io.poll_write(cx, buf),
StateProj::Invalid => panic!("Negotiated: Invalid state"), StateProj::Invalid => panic!("Negotiated: Invalid state"),
} }
@ -274,16 +252,7 @@ where
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match self.project().state.project() { match self.project().state.project() {
StateProj::Completed { mut io, remaining } => { StateProj::Completed { io } => io.poll_flush(cx),
while !remaining.is_empty() {
let n = ready!(io.as_mut().poll_write(cx, &remaining)?);
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
remaining.advance(n);
}
io.poll_flush(cx)
},
StateProj::Expecting { io, .. } => io.poll_flush(cx), StateProj::Expecting { io, .. } => io.poll_flush(cx),
StateProj::Invalid => panic!("Negotiated: Invalid state"), StateProj::Invalid => panic!("Negotiated: Invalid state"),
} }
@ -307,16 +276,7 @@ where
-> Poll<Result<usize, io::Error>> -> Poll<Result<usize, io::Error>>
{ {
match self.project().state.project() { match self.project().state.project() {
StateProj::Completed { mut io, remaining } => { StateProj::Completed { io } => io.poll_write_vectored(cx, bufs),
while !remaining.is_empty() {
let n = ready!(io.as_mut().poll_write(cx, &remaining)?);
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
remaining.advance(n);
}
io.poll_write_vectored(cx, bufs)
},
StateProj::Expecting { io, .. } => io.poll_write_vectored(cx, bufs), StateProj::Expecting { io, .. } => io.poll_write_vectored(cx, bufs),
StateProj::Invalid => panic!("Negotiated: Invalid state"), StateProj::Invalid => panic!("Negotiated: Invalid state"),
} }
@ -373,76 +333,3 @@ impl fmt::Display for NegotiationError {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use quickcheck::*;
use std::{io::Write, task::Poll};
/// An I/O resource with a fixed write capacity (total and per write op).
struct Capped { buf: Vec<u8>, step: usize }
impl AsyncRead for Capped {
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize, io::Error>> {
unreachable!()
}
}
impl AsyncWrite for Capped {
fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
if self.buf.len() + buf.len() > self.buf.capacity() {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()))
}
let len = usize::min(self.step, buf.len());
let n = Write::write(&mut self.buf, &buf[.. len]).unwrap();
Poll::Ready(Ok(n))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
#[test]
fn write_remaining() {
fn prop(rem: Vec<u8>, new: Vec<u8>, free: u8, step: u8) -> TestResult {
let cap = rem.len() + free as usize;
let step = u8::min(free, step) as usize + 1;
let buf = Capped { buf: Vec::with_capacity(cap), step };
let rem = BytesMut::from(&rem[..]);
let mut io = Negotiated::completed(buf, rem.clone());
let mut written = 0;
loop {
// Write until `new` has been fully written or the capped buffer runs
// over capacity and yields WriteZero.
match future::poll_fn(|cx| Pin::new(&mut io).poll_write(cx, &new[written..])).now_or_never().unwrap() {
Ok(n) =>
if let State::Completed { remaining, .. } = &io.state {
assert!(remaining.is_empty());
written += n;
if written == new.len() {
return TestResult::passed()
}
} else {
return TestResult::failed()
}
Err(e) if e.kind() == io::ErrorKind::WriteZero => {
if let State::Completed { .. } = &io.state {
assert!(rem.len() + new.len() > cap);
return TestResult::passed()
} else {
return TestResult::failed()
}
}
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
}
quickcheck(prop as fn(_,_,_,_) -> _)
}
}

View File

@ -289,23 +289,16 @@ impl<R> MessageIO<R> {
MessageReader { inner: self.inner.into_reader() } MessageReader { inner: self.inner.into_reader() }
} }
/// Drops the [`MessageIO`] resource, yielding the underlying I/O stream /// Drops the [`MessageIO`] resource, yielding the underlying I/O stream.
/// together with the remaining write buffer containing the protocol
/// negotiation frame data that has not yet been written to the I/O stream.
///
/// The returned remaining write buffer may be prepended to follow-up
/// protocol data to send with a single `write`. Either way, if non-empty,
/// the write buffer _must_ eventually be written to the I/O stream
/// _before_ any follow-up data, in order for protocol negotiation to
/// complete cleanly.
/// ///
/// # Panics /// # Panics
/// ///
/// Panics if the read buffer is not empty, meaning that an incoming /// Panics if the read buffer or write buffer is not empty, meaning that an incoming
/// protocol negotiation frame has been partially read. The read buffer /// protocol negotiation frame has been partially read or an outgoing frame
/// is guaranteed to be empty whenever `MessageIO::poll` returned /// has not yet been flushed. The read buffer is guaranteed to be empty whenever
/// a message. /// `MessageIO::poll` returned a message. The write buffer is guaranteed to be empty
pub fn into_inner(self) -> (R, BytesMut) { /// when the sink has been flushed.
pub fn into_inner(self) -> R {
self.inner.into_inner() self.inner.into_inner()
} }
} }
@ -365,19 +358,14 @@ impl<R> MessageReader<R> {
/// together with the remaining write buffer containing the protocol /// together with the remaining write buffer containing the protocol
/// negotiation frame data that has not yet been written to the I/O stream. /// negotiation frame data that has not yet been written to the I/O stream.
/// ///
/// The returned remaining write buffer may be prepended to follow-up
/// protocol data to send with a single `write`. Either way, if non-empty,
/// the write buffer _must_ eventually be written to the I/O stream
/// _before_ any follow-up data, in order for protocol negotiation to
/// complete cleanly.
///
/// # Panics /// # Panics
/// ///
/// Panics if the read buffer is not empty, meaning that an incoming /// Panics if the read buffer or write buffer is not empty, meaning that either
/// protocol negotiation frame has been partially read. The read buffer /// an incoming protocol negotiation frame has been partially read, or an
/// is guaranteed to be empty whenever `MessageReader::poll` returned /// outgoing frame has not yet been flushed. The read buffer is guaranteed to
/// a message. /// be empty whenever `MessageReader::poll` returned a message. The write
pub fn into_inner(self) -> (R, BytesMut) { /// buffer is guaranteed to be empty whenever the sink has been flushed.
pub fn into_inner(self) -> R {
self.inner.into_inner() self.inner.into_inner()
} }
} }