feat(relay): hide internals of Connection

Relayed connections to other peers are created from streams to the relay itself. Internally, such a connection has different states. These however are not relevant to the user and should be encapsulated to allow for more backwards-compatible changes. The only interface exposed is `AsyncRead` and `AsyncWrite`.

Resolves: #3255.

Pull-Request: #3829.
This commit is contained in:
Thomas Coratger 2023-05-02 13:49:07 +02:00 committed by GitHub
parent 30d2c75206
commit c728824440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 36 deletions

View File

@ -3,7 +3,11 @@
- Raise MSRV to 1.65. - Raise MSRV to 1.65.
See [PR 3715]. See [PR 3715].
- Hide internals of `Connection` and expose only `AsyncRead` and `AsyncWrite`.
See [PR 3829].
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3829]: https://github.com/libp2p/rust-libp2p/pull/3829
## 0.15.2 ## 0.15.2

View File

@ -45,7 +45,6 @@ use libp2p_swarm::{
}; };
use std::collections::{hash_map, HashMap, VecDeque}; use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, ErrorKind, IoSlice}; use std::io::{Error, ErrorKind, IoSlice};
use std::ops::DerefMut;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use transport::Transport; use transport::Transport;
@ -387,32 +386,43 @@ impl NetworkBehaviour for Behaviour {
} }
} }
/// A [`NegotiatedSubstream`] acting as a [`Connection`]. /// Represents a connection to another peer via a relay.
pub enum Connection { ///
/// Internally, this uses a stream to the relay.
pub struct Connection {
state: ConnectionState,
}
enum ConnectionState {
InboundAccepting { InboundAccepting {
accept: BoxFuture<'static, Result<Connection, Error>>, accept: BoxFuture<'static, Result<ConnectionState, Error>>,
}, },
Operational { Operational {
read_buffer: Bytes, read_buffer: Bytes,
substream: NegotiatedSubstream, substream: NegotiatedSubstream,
/// "Drop notifier" pattern to signal to the transport that the connection has been dropped.
///
/// This is flagged as "dead-code" by the compiler because we never read from it here.
/// However, it is actual use is to trigger the `Canceled` error in the `Transport` when this `Sender` is dropped.
#[allow(dead_code)]
drop_notifier: oneshot::Sender<void::Void>, drop_notifier: oneshot::Sender<void::Void>,
}, },
} }
impl Unpin for Connection {} impl Unpin for ConnectionState {}
impl Connection { impl ConnectionState {
pub(crate) fn new_inbound( pub(crate) fn new_inbound(
circuit: inbound_stop::Circuit, circuit: inbound_stop::Circuit,
drop_notifier: oneshot::Sender<void::Void>, drop_notifier: oneshot::Sender<void::Void>,
) -> Self { ) -> Self {
Connection::InboundAccepting { ConnectionState::InboundAccepting {
accept: async { accept: async {
let (substream, read_buffer) = circuit let (substream, read_buffer) = circuit
.accept() .accept()
.await .await
.map_err(|e| Error::new(ErrorKind::Other, e))?; .map_err(|e| Error::new(ErrorKind::Other, e))?;
Ok(Connection::Operational { Ok(ConnectionState::Operational {
read_buffer, read_buffer,
substream, substream,
drop_notifier, drop_notifier,
@ -427,7 +437,7 @@ impl Connection {
read_buffer: Bytes, read_buffer: Bytes,
drop_notifier: oneshot::Sender<void::Void>, drop_notifier: oneshot::Sender<void::Void>,
) -> Self { ) -> Self {
Connection::Operational { ConnectionState::Operational {
substream, substream,
read_buffer, read_buffer,
drop_notifier, drop_notifier,
@ -442,11 +452,13 @@ impl AsyncWrite for Connection {
buf: &[u8], buf: &[u8],
) -> Poll<Result<usize, Error>> { ) -> Poll<Result<usize, Error>> {
loop { loop {
match self.deref_mut() { match &mut self.state {
Connection::InboundAccepting { accept } => { ConnectionState::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
} }
Connection::Operational { substream, .. } => { ConnectionState::Operational { substream, .. } => {
return Pin::new(substream).poll_write(cx, buf); return Pin::new(substream).poll_write(cx, buf);
} }
} }
@ -454,11 +466,13 @@ impl AsyncWrite for Connection {
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
loop { loop {
match self.deref_mut() { match &mut self.state {
Connection::InboundAccepting { accept } => { ConnectionState::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
} }
Connection::Operational { substream, .. } => { ConnectionState::Operational { substream, .. } => {
return Pin::new(substream).poll_flush(cx); return Pin::new(substream).poll_flush(cx);
} }
} }
@ -466,11 +480,13 @@ impl AsyncWrite for Connection {
} }
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> { fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
loop { loop {
match self.deref_mut() { match &mut self.state {
Connection::InboundAccepting { accept } => { ConnectionState::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
} }
Connection::Operational { substream, .. } => { ConnectionState::Operational { substream, .. } => {
return Pin::new(substream).poll_close(cx); return Pin::new(substream).poll_close(cx);
} }
} }
@ -483,11 +499,13 @@ impl AsyncWrite for Connection {
bufs: &[IoSlice], bufs: &[IoSlice],
) -> Poll<Result<usize, Error>> { ) -> Poll<Result<usize, Error>> {
loop { loop {
match self.deref_mut() { match &mut self.state {
Connection::InboundAccepting { accept } => { ConnectionState::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
} }
Connection::Operational { substream, .. } => { ConnectionState::Operational { substream, .. } => {
return Pin::new(substream).poll_write_vectored(cx, bufs); return Pin::new(substream).poll_write_vectored(cx, bufs);
} }
} }
@ -502,11 +520,13 @@ impl AsyncRead for Connection {
buf: &mut [u8], buf: &mut [u8],
) -> Poll<Result<usize, Error>> { ) -> Poll<Result<usize, Error>> {
loop { loop {
match self.deref_mut() { match &mut self.state {
Connection::InboundAccepting { accept } => { ConnectionState::InboundAccepting { accept } => {
*self = ready!(accept.poll_unpin(cx))?; *self = Connection {
state: ready!(accept.poll_unpin(cx))?,
};
} }
Connection::Operational { ConnectionState::Operational {
read_buffer, read_buffer,
substream, substream,
.. ..

View File

@ -189,10 +189,11 @@ impl Handler {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.alive_lend_out_substreams.push(rx); self.alive_lend_out_substreams.push(rx);
let connection = super::Connection::new_inbound(inbound_circuit, tx); let connection = super::ConnectionState::new_inbound(inbound_circuit, tx);
pending_msgs.push_back(transport::ToListenerMsg::IncomingRelayedConnection { pending_msgs.push_back(transport::ToListenerMsg::IncomingRelayedConnection {
stream: connection, // stream: connection,
stream: super::Connection { state: connection },
src_peer_id, src_peer_id,
relay_peer_id: self.remote_peer_id, relay_peer_id: self.remote_peer_id,
relay_addr: self.remote_addr.clone(), relay_addr: self.remote_addr.clone(),
@ -271,11 +272,9 @@ impl Handler {
OutboundOpenInfo::Connect { send_back }, OutboundOpenInfo::Connect { send_back },
) => { ) => {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
match send_back.send(Ok(super::Connection::new_outbound( match send_back.send(Ok(super::Connection {
substream, state: super::ConnectionState::new_outbound(substream, read_buffer, tx),
read_buffer, })) {
tx,
))) {
Ok(()) => { Ok(()) => {
self.alive_lend_out_substreams.push(rx); self.alive_lend_out_substreams.push(rx);
self.queued_events.push_back(ConnectionHandlerEvent::Custom( self.queued_events.push_back(ConnectionHandlerEvent::Custom(