From c728824440fa055e6ee025ab87621e08eade89de Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Tue, 2 May 2023 13:49:07 +0200 Subject: [PATCH] feat(relay): hide internals of `Connection` Relayed connections to other peers are created from streams to the relay itself. Internally, such a connection has different states. These however are not relevant to the user and should be encapsulated to allow for more backwards-compatible changes. The only interface exposed is `AsyncRead` and `AsyncWrite`. Resolves: #3255. Pull-Request: #3829. --- protocols/relay/CHANGELOG.md | 4 ++ protocols/relay/src/priv_client.rs | 78 ++++++++++++++-------- protocols/relay/src/priv_client/handler.rs | 13 ++-- 3 files changed, 59 insertions(+), 36 deletions(-) diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index f718fc5a..4b3b9778 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -3,7 +3,11 @@ - Raise MSRV to 1.65. See [PR 3715]. +- Hide internals of `Connection` and expose only `AsyncRead` and `AsyncWrite`. + See [PR 3829]. + [PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715 +[PR 3829]: https://github.com/libp2p/rust-libp2p/pull/3829 ## 0.15.2 diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 2a24607a..9f250adf 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -45,7 +45,6 @@ use libp2p_swarm::{ }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; -use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; use transport::Transport; @@ -387,32 +386,43 @@ impl NetworkBehaviour for Behaviour { } } -/// A [`NegotiatedSubstream`] acting as a [`Connection`]. -pub enum Connection { +/// Represents a connection to another peer via a relay. +/// +/// Internally, this uses a stream to the relay. +pub struct Connection { + state: ConnectionState, +} + +enum ConnectionState { InboundAccepting { - accept: BoxFuture<'static, Result>, + accept: BoxFuture<'static, Result>, }, Operational { read_buffer: Bytes, substream: NegotiatedSubstream, + /// "Drop notifier" pattern to signal to the transport that the connection has been dropped. + /// + /// This is flagged as "dead-code" by the compiler because we never read from it here. + /// However, it is actual use is to trigger the `Canceled` error in the `Transport` when this `Sender` is dropped. + #[allow(dead_code)] drop_notifier: oneshot::Sender, }, } -impl Unpin for Connection {} +impl Unpin for ConnectionState {} -impl Connection { +impl ConnectionState { pub(crate) fn new_inbound( circuit: inbound_stop::Circuit, drop_notifier: oneshot::Sender, ) -> Self { - Connection::InboundAccepting { + ConnectionState::InboundAccepting { accept: async { let (substream, read_buffer) = circuit .accept() .await .map_err(|e| Error::new(ErrorKind::Other, e))?; - Ok(Connection::Operational { + Ok(ConnectionState::Operational { read_buffer, substream, drop_notifier, @@ -427,7 +437,7 @@ impl Connection { read_buffer: Bytes, drop_notifier: oneshot::Sender, ) -> Self { - Connection::Operational { + ConnectionState::Operational { substream, read_buffer, drop_notifier, @@ -442,11 +452,13 @@ impl AsyncWrite for Connection { buf: &[u8], ) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { substream, .. } => { + ConnectionState::Operational { substream, .. } => { return Pin::new(substream).poll_write(cx, buf); } } @@ -454,11 +466,13 @@ impl AsyncWrite for Connection { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { substream, .. } => { + ConnectionState::Operational { substream, .. } => { return Pin::new(substream).poll_flush(cx); } } @@ -466,11 +480,13 @@ impl AsyncWrite for Connection { } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { substream, .. } => { + ConnectionState::Operational { substream, .. } => { return Pin::new(substream).poll_close(cx); } } @@ -483,11 +499,13 @@ impl AsyncWrite for Connection { bufs: &[IoSlice], ) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { substream, .. } => { + ConnectionState::Operational { substream, .. } => { return Pin::new(substream).poll_write_vectored(cx, bufs); } } @@ -502,11 +520,13 @@ impl AsyncRead for Connection { buf: &mut [u8], ) -> Poll> { loop { - match self.deref_mut() { - Connection::InboundAccepting { accept } => { - *self = ready!(accept.poll_unpin(cx))?; + match &mut self.state { + ConnectionState::InboundAccepting { accept } => { + *self = Connection { + state: ready!(accept.poll_unpin(cx))?, + }; } - Connection::Operational { + ConnectionState::Operational { read_buffer, substream, .. diff --git a/protocols/relay/src/priv_client/handler.rs b/protocols/relay/src/priv_client/handler.rs index 504fab59..290b72e9 100644 --- a/protocols/relay/src/priv_client/handler.rs +++ b/protocols/relay/src/priv_client/handler.rs @@ -189,10 +189,11 @@ impl Handler { let (tx, rx) = oneshot::channel(); self.alive_lend_out_substreams.push(rx); - let connection = super::Connection::new_inbound(inbound_circuit, tx); + let connection = super::ConnectionState::new_inbound(inbound_circuit, tx); pending_msgs.push_back(transport::ToListenerMsg::IncomingRelayedConnection { - stream: connection, + // stream: connection, + stream: super::Connection { state: connection }, src_peer_id, relay_peer_id: self.remote_peer_id, relay_addr: self.remote_addr.clone(), @@ -271,11 +272,9 @@ impl Handler { OutboundOpenInfo::Connect { send_back }, ) => { let (tx, rx) = oneshot::channel(); - match send_back.send(Ok(super::Connection::new_outbound( - substream, - read_buffer, - tx, - ))) { + match send_back.send(Ok(super::Connection { + state: super::ConnectionState::new_outbound(substream, read_buffer, tx), + })) { Ok(()) => { self.alive_lend_out_substreams.push(rx); self.queued_events.push_back(ConnectionHandlerEvent::Custom(