Max Inden 76cb76ca7d
fix(multistream-select): don't wait for negotiation in poll_close
With `Version::V1Lazy` and negotiation of a single protocol, a stream initiator optimistically
sends application data right after proposing its protocol. More specifically an application can
write data via `AsyncWrite::poll_write` even though the remote has not yet confirmed the stream protocol.

This saves one round-trip.

``` mermaid
sequenceDiagram
A->>B: "/multistream/1.0.0"
A->>B: "/perf/1.0.0"
A->>B: <some-perf-protocol-data>
B->>A: "/multistream/1.0.0"
B->>A: "/perf/1.0.0"
B->>A: <some-perf-protocol-data>
```

When considering stream closing, i.e. `AsyncWrite::poll_close`, and using stream closing as an
operation in ones protocol, e.g. using stream closing to signal the end of a request, this becomes tricky.

The behavior without this commit was as following:

``` mermaid
sequenceDiagram
A->>B: "/multistream/1.0.0"
A->>B: "/perf/1.0.0"
A->>B: <some-perf-protocol-data>
Note left of A: Call `AsyncWrite::poll_close` which first waits for the<br/>optimistic multistream-select negotiation to finish, before closing the stream,<br/> i.e. setting the FIN bit.
B->>A: "/multistream/1.0.0"
B->>A: "/perf/1.0.0"
Note right of B: Waiting for A to close the stream (i.e. set the `FIN` bit)<br/>before sending the response.
A->>B: FIN
B->>A: <some-perf-protocol-data>
```

The above takes 2 round trips:

1. Send the optimistic multistream-select protocol proposals as well as the initiator protocol
payload and waits for the confirmation of the protocols.
2. Close the stream, i.e. sends the `FIN` bit and waits for the responder protocol payload.

This commit proposes that the stream initiator should not wait for the multistream-select protocol
confirmation when closing the stream, but close the stream within the first round-trip.

``` mermaid
sequenceDiagram
A->>B: "/multistream/1.0.0"
A->>B: "/perf/1.0.0"
A->>B: <some-perf-protocol-data>
A->>B: FIN
B->>A: "/multistream/1.0.0"
B->>A: "/perf/1.0.0"
B->>A: <some-perf-protocol-data>
```

This takes 1 round-trip.

The downside of this commit is, that the stream initiator will no longer notice a negotiation error
when closing the stream. They will only notice it when reading from the stream. E.g. say that B does
not support "/perf/1.0.0", A will only notice on `AsyncRead::poll_read`, not on
`AsyncWrite::poll_close`. This is problematic for protocols where A only sends data, but never
receives data, i.e. never calls `AsyncRead::poll_read`. Though one can argue that such protocol is
flawed in the first place. With a response-less protocol, as even if negotiation succceeds, A
doesn't know whether B received the protocol payload.

Pull-Request: #4019.
2023-06-06 21:00:20 +00:00

203 lines
7.4 KiB
Rust

// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Integration tests for protocol negotiation.
use futures::prelude::*;
use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version};
use std::time::Duration;
#[test]
fn select_proto_basic() {
async fn run(version: Version) {
let (client_connection, server_connection) = futures_ringbuf::Endpoint::pair(100, 100);
let server = async_std::task::spawn(async move {
let protos = vec!["/proto1", "/proto2"];
let (proto, mut io) = listener_select_proto(server_connection, protos)
.await
.unwrap();
assert_eq!(proto, "/proto2");
let mut out = vec![0; 32];
let n = io.read(&mut out).await.unwrap();
out.truncate(n);
assert_eq!(out, b"ping");
io.write_all(b"pong").await.unwrap();
io.flush().await.unwrap();
});
let client = async_std::task::spawn(async move {
let protos = vec!["/proto3", "/proto2"];
let (proto, mut io) =
dialer_select_proto(client_connection, protos.into_iter(), version)
.await
.unwrap();
assert_eq!(proto, "/proto2");
io.write_all(b"ping").await.unwrap();
io.flush().await.unwrap();
let mut out = vec![0; 32];
let n = io.read(&mut out).await.unwrap();
out.truncate(n);
assert_eq!(out, b"pong");
});
server.await;
client.await;
}
async_std::task::block_on(run(Version::V1));
async_std::task::block_on(run(Version::V1Lazy));
}
/// Tests the expected behaviour of failed negotiations.
#[test]
fn negotiation_failed() {
let _ = env_logger::try_init();
async fn run(
Test {
version,
listen_protos,
dial_protos,
dial_payload,
}: Test,
) {
let (client_connection, server_connection) = futures_ringbuf::Endpoint::pair(100, 100);
let server = async_std::task::spawn(async move {
let io = match listener_select_proto(server_connection, listen_protos).await {
Ok((_, io)) => io,
Err(NegotiationError::Failed) => return,
Err(NegotiationError::ProtocolError(e)) => {
panic!("Unexpected protocol error {e}")
}
};
match io.complete().await {
Err(NegotiationError::Failed) => {}
_ => panic!(),
}
});
let client =
async_std::task::spawn(async move {
let mut io =
match dialer_select_proto(client_connection, dial_protos.into_iter(), version)
.await
{
Err(NegotiationError::Failed) => return,
Ok((_, io)) => io,
Err(_) => panic!(),
};
// The dialer may write a payload that is even sent before it
// got confirmation of the last proposed protocol, when `V1Lazy`
// is used.
io.write_all(&dial_payload).await.unwrap();
match io.complete().await {
Err(NegotiationError::Failed) => {}
_ => panic!(),
}
});
server.await;
client.await;
}
/// Parameters for a single test run.
#[derive(Clone)]
struct Test {
version: Version,
listen_protos: Vec<&'static str>,
dial_protos: Vec<&'static str>,
dial_payload: Vec<u8>,
}
// Disjunct combinations of listen and dial protocols to test.
//
// The choices here cover the main distinction between a single
// and multiple protocols.
let protos = vec![
(vec!["/proto1"], vec!["/proto2"]),
(vec!["/proto1", "/proto2"], vec!["/proto3", "/proto4"]),
];
// The payloads that the dialer sends after "successful" negotiation,
// which may be sent even before the dialer got protocol confirmation
// when `V1Lazy` is used.
//
// The choices here cover the specific situations that can arise with
// `V1Lazy` and which must nevertheless behave identically to `V1` w.r.t.
// the outcome of the negotiation.
let payloads = vec![
// No payload, in which case all versions should behave identically
// in any case, i.e. the baseline test.
vec![],
// With this payload and `V1Lazy`, the listener interprets the first
// `1` as a message length and encounters an invalid message (the
// second `1`). The listener is nevertheless expected to fail
// negotiation normally, just like with `V1`.
vec![1, 1],
// With this payload and `V1Lazy`, the listener interprets the first
// `42` as a message length and encounters unexpected EOF trying to
// read a message of that length. The listener is nevertheless expected
// to fail negotiation normally, just like with `V1`
vec![42, 1],
];
for (listen_protos, dial_protos) in protos {
for dial_payload in payloads.clone() {
for &version in &[Version::V1, Version::V1Lazy] {
async_std::task::block_on(run(Test {
version,
listen_protos: listen_protos.clone(),
dial_protos: dial_protos.clone(),
dial_payload: dial_payload.clone(),
}))
}
}
}
}
#[async_std::test]
async fn v1_lazy_do_not_wait_for_negotiation_on_poll_close() {
let (client_connection, _server_connection) = futures_ringbuf::Endpoint::pair(1024 * 1024, 1);
let client = async_std::task::spawn(async move {
// Single protocol to allow for lazy (or optimistic) protocol negotiation.
let protos = vec!["/proto1"];
let (proto, mut io) =
dialer_select_proto(client_connection, protos.into_iter(), Version::V1Lazy)
.await
.unwrap();
assert_eq!(proto, "/proto1");
// client can close the connection even though protocol negotiation is not yet done, i.e.
// `_server_connection` had been untouched.
io.close().await.unwrap();
});
async_std::future::timeout(Duration::from_secs(10), client)
.await
.unwrap();
}