mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-07-31 00:41:59 +00:00
[multistream-select] Listener conformity for failed negotiations. (#1871)
* [multistream-select] Listener conformity for failed negotiations. When `V1Lazy` is used and the listener does not support the optimistic (and singular) proposal of the dialer, it currently happens that dialer and listener get a different outcome of the negotiation. The dialer eventually detects the failed negotiation as soon as it tries to read from the stream, but the listener either encounters an invalid message or unexpected premature EOF, depending on the payload that the dialer sent prematurely after its protocol proposal. In these cases the listener must be lenient and fail the negotiation "normally", i.e. not with a protocol violation or an I/O error. * Update misc/multistream-select/src/tests.rs Co-authored-by: Max Inden <mail@max-inden.de> * Refine error handling. Only be lenient with garbage or sudden EOF when reading just after having sent a protocol rejection. * Update misc/multistream-select/src/listener_select.rs Co-authored-by: Max Inden <mail@max-inden.de> Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@@ -57,7 +57,8 @@ where
|
||||
protocols: SmallVec::from_iter(protocols),
|
||||
state: State::RecvHeader {
|
||||
io: MessageIO::new(inner)
|
||||
}
|
||||
},
|
||||
last_sent_na: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +73,14 @@ where
|
||||
// TODO: It would be nice if eventually N = Protocol, which has a
|
||||
// few more implications on the API.
|
||||
protocols: SmallVec<[(N, Protocol); 8]>,
|
||||
state: State<R, N>
|
||||
state: State<R, N>,
|
||||
/// Whether the last message sent was a protocol rejection (i.e. `na\n`).
|
||||
///
|
||||
/// If the listener reads garbage or EOF after such a rejection,
|
||||
/// the dialer is likely using `V1Lazy` and negotiation must be
|
||||
/// considered failed, but not with a protocol violation or I/O
|
||||
/// error.
|
||||
last_sent_na: bool,
|
||||
}
|
||||
|
||||
enum State<R, N>
|
||||
@@ -166,7 +174,30 @@ where
|
||||
*this.state = State::RecvMessage { io };
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(From::from(err))),
|
||||
Poll::Ready(Some(Err(err))) => {
|
||||
if *this.last_sent_na {
|
||||
// When we read garbage or EOF after having already rejected a
|
||||
// protocol, the dialer is most likely using `V1Lazy` and has
|
||||
// optimistically settled on this protocol, so this is really a
|
||||
// failed negotiation, not a protocol violation. In this case
|
||||
// the dialer also raises `NegotiationError::Failed` when finally
|
||||
// reading the `N/A` response.
|
||||
if let ProtocolError::InvalidMessage = &err {
|
||||
log::trace!("Listener: Negotiation failed with invalid \
|
||||
message after protocol rejection.");
|
||||
return Poll::Ready(Err(NegotiationError::Failed))
|
||||
}
|
||||
if let ProtocolError::IoError(e) = &err {
|
||||
if e.kind() == std::io::ErrorKind::UnexpectedEof {
|
||||
log::trace!("Listener: Negotiation failed with EOF \
|
||||
after protocol rejection.");
|
||||
return Poll::Ready(Err(NegotiationError::Failed))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Poll::Ready(Err(From::from(err)))
|
||||
}
|
||||
};
|
||||
|
||||
match msg {
|
||||
@@ -209,6 +240,12 @@ where
|
||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
|
||||
}
|
||||
|
||||
if let Message::NotAvailable = &message {
|
||||
*this.last_sent_na = true;
|
||||
} else {
|
||||
*this.last_sent_na = false;
|
||||
}
|
||||
|
||||
if let Err(err) = Pin::new(&mut io).start_send(message) {
|
||||
return Poll::Ready(Err(From::from(err)));
|
||||
}
|
||||
|
@@ -18,7 +18,7 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Contains the unit tests of the library.
|
||||
//! Integration tests for protocol negotiation.
|
||||
|
||||
#![cfg(test)]
|
||||
|
||||
@@ -74,16 +74,23 @@ fn select_proto_basic() {
|
||||
async_std::task::block_on(run(Version::V1Lazy));
|
||||
}
|
||||
|
||||
/// Tests the expected behaviour of failed negotiations.
|
||||
#[test]
|
||||
fn no_protocol_found() {
|
||||
async fn run(version: Version) {
|
||||
fn negotiation_failed() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
async fn run(Test {
|
||||
version,
|
||||
listen_protos,
|
||||
dial_protos,
|
||||
dial_payload
|
||||
}: Test) {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let listener_addr = listener.local_addr().unwrap();
|
||||
|
||||
let server = async_std::task::spawn(async move {
|
||||
let connec = listener.accept().await.unwrap().0;
|
||||
let protos = vec![b"/proto1", b"/proto2"];
|
||||
let io = match listener_select_proto(connec, protos).await {
|
||||
let io = match listener_select_proto(connec, listen_protos).await {
|
||||
Ok((_, io)) => io,
|
||||
Err(NegotiationError::Failed) => return,
|
||||
Err(NegotiationError::ProtocolError(e)) => panic!("Unexpected protocol error {}", e),
|
||||
@@ -96,12 +103,15 @@ fn no_protocol_found() {
|
||||
|
||||
let client = async_std::task::spawn(async move {
|
||||
let connec = TcpStream::connect(&listener_addr).await.unwrap();
|
||||
let protos = vec![b"/proto3", b"/proto4"];
|
||||
let io = match dialer_select_proto(connec, protos.into_iter(), version).await {
|
||||
let mut io = match dialer_select_proto(connec, dial_protos.into_iter(), version).await {
|
||||
Err(NegotiationError::Failed) => return,
|
||||
Ok((_, io)) => io,
|
||||
Err(_) => panic!()
|
||||
};
|
||||
// The dialer may write a payload that is even sent before it
|
||||
// got confirmation of the last proposed protocol, when `V1Lazy`
|
||||
// is used.
|
||||
io.write_all(&dial_payload).await.unwrap();
|
||||
match io.complete().await {
|
||||
Err(NegotiationError::Failed) => {},
|
||||
_ => panic!(),
|
||||
@@ -112,8 +122,59 @@ fn no_protocol_found() {
|
||||
client.await;
|
||||
}
|
||||
|
||||
async_std::task::block_on(run(Version::V1));
|
||||
async_std::task::block_on(run(Version::V1Lazy));
|
||||
/// Parameters for a single test run.
|
||||
#[derive(Clone)]
|
||||
struct Test {
|
||||
version: Version,
|
||||
listen_protos: Vec<&'static str>,
|
||||
dial_protos: Vec<&'static str>,
|
||||
dial_payload: Vec<u8>,
|
||||
}
|
||||
|
||||
// Disjunct combinations of listen and dial protocols to test.
|
||||
//
|
||||
// The choices here cover the main distinction between a single
|
||||
// and multiple protocols.
|
||||
let protos = vec!{
|
||||
(vec!["/proto1"], vec!["/proto2"]),
|
||||
(vec!["/proto1", "/proto2"], vec!["/proto3", "/proto4"]),
|
||||
};
|
||||
|
||||
// The payloads that the dialer sends after "successful" negotiation,
|
||||
// which may be sent even before the dialer got protocol confirmation
|
||||
// when `V1Lazy` is used.
|
||||
//
|
||||
// The choices here cover the specific situations that can arise with
|
||||
// `V1Lazy` and which must nevertheless behave identically to `V1` w.r.t.
|
||||
// the outcome of the negotiation.
|
||||
let payloads = vec!{
|
||||
// No payload, in which case all versions should behave identically
|
||||
// in any case, i.e. the baseline test.
|
||||
vec![],
|
||||
// With this payload and `V1Lazy`, the listener interprets the first
|
||||
// `1` as a message length and encounters an invalid message (the
|
||||
// second `1`). The listener is nevertheless expected to fail
|
||||
// negotiation normally, just like with `V1`.
|
||||
vec![1,1],
|
||||
// With this payload and `V1Lazy`, the listener interprets the first
|
||||
// `42` as a message length and encounters unexpected EOF trying to
|
||||
// read a message of that length. The listener is nevertheless expected
|
||||
// to fail negotiation normally, just like with `V1`
|
||||
vec![42,1],
|
||||
};
|
||||
|
||||
for (listen_protos, dial_protos) in protos {
|
||||
for dial_payload in payloads.clone() {
|
||||
for &version in &[Version::V1, Version::V1Lazy] {
|
||||
async_std::task::block_on(run(Test {
|
||||
version,
|
||||
listen_protos: listen_protos.clone(),
|
||||
dial_protos: dial_protos.clone(),
|
||||
dial_payload: dial_payload.clone(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
Reference in New Issue
Block a user