core/src/transport: Poll Transport directly, remove Transport::Listener (#2652)

Remove the concept of individual `Transport::Listener` streams from `Transport`.
Instead the `Transport` is polled directly via `Transport::poll`. The
`Transport` is now responsible for driving its listeners.
This commit is contained in:
Elena Frank
2022-07-04 04:16:57 +02:00
committed by GitHub
parent b28cdb31f9
commit 62622a1bad
59 changed files with 2094 additions and 2044 deletions

View File

@ -23,12 +23,13 @@ use crate::v2::client::RelayedConnection;
use crate::v2::RequestId;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::future::{ready, BoxFuture, Future, FutureExt, Ready};
use futures::future::{ready, BoxFuture, FutureExt, Ready};
use futures::ready;
use futures::sink::SinkExt;
use futures::stream::SelectAll;
use futures::stream::{Stream, StreamExt};
use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::{ListenerEvent, TransportError};
use libp2p_core::transport::{ListenerId, TransportError, TransportEvent};
use libp2p_core::{PeerId, Transport};
use std::collections::VecDeque;
use std::pin::Pin;
@ -85,9 +86,10 @@ use thiserror::Error;
/// .with(Protocol::P2pCircuit); // Signal to listen via remote relay node.
/// transport.listen_on(relay_addr).unwrap();
/// ```
#[derive(Clone)]
pub struct ClientTransport {
to_behaviour: mpsc::Sender<TransportToBehaviourMsg>,
pending_to_behaviour: VecDeque<TransportToBehaviourMsg>,
listeners: SelectAll<RelayListener>,
}
impl ClientTransport {
@ -112,22 +114,22 @@ impl ClientTransport {
/// ```
pub(crate) fn new() -> (Self, mpsc::Receiver<TransportToBehaviourMsg>) {
let (to_behaviour, from_transport) = mpsc::channel(0);
(ClientTransport { to_behaviour }, from_transport)
let transport = ClientTransport {
to_behaviour,
pending_to_behaviour: VecDeque::new(),
listeners: SelectAll::new(),
};
(transport, from_transport)
}
}
impl Transport for ClientTransport {
type Output = RelayedConnection;
type Error = RelayError;
type Listener = RelayListener;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = RelayedDial;
fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<Self::Listener, TransportError<Self::Error>> {
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? {
RelayedMultiaddr {
relay_peer_id: None,
@ -147,25 +149,31 @@ impl Transport for ClientTransport {
};
let (to_listener, from_behaviour) = mpsc::channel(0);
let mut to_behaviour = self.to_behaviour.clone();
let msg_to_behaviour = Some(
async move {
to_behaviour
.send(TransportToBehaviourMsg::ListenReq {
relay_peer_id,
relay_addr,
to_listener,
})
.await
}
.boxed(),
);
self.pending_to_behaviour
.push_back(TransportToBehaviourMsg::ListenReq {
relay_peer_id,
relay_addr,
to_listener,
});
Ok(RelayListener {
queued_new_addresses: Default::default(),
let listener_id = ListenerId::new();
let listener = RelayListener {
listener_id,
queued_events: Default::default(),
from_behaviour,
msg_to_behaviour,
})
is_closed: false,
};
self.listeners.push(listener);
Ok(listener_id)
}
fn remove_listener(&mut self, id: ListenerId) -> bool {
if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
listener.close(Ok(()));
true
} else {
false
}
}
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
@ -217,6 +225,35 @@ impl Transport for ClientTransport {
fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>
where
Self: Sized,
{
loop {
if !self.pending_to_behaviour.is_empty() {
match self.to_behaviour.poll_ready(cx) {
Poll::Ready(Ok(())) => {
let msg = self
.pending_to_behaviour
.pop_front()
.expect("Called !is_empty().");
let _ = self.to_behaviour.start_send(msg);
continue;
}
Poll::Ready(Err(_)) => unreachable!("Receiver is never dropped."),
Poll::Pending => {}
}
}
match self.listeners.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => return Poll::Ready(event),
_ => return Poll::Pending,
}
}
}
}
#[derive(Default)]
@ -282,64 +319,87 @@ fn parse_relayed_multiaddr(
}
pub struct RelayListener {
queued_new_addresses: VecDeque<Multiaddr>,
listener_id: ListenerId,
/// Queue of events to report when polled.
queued_events: VecDeque<<Self as Stream>::Item>,
/// Channel for messages from the behaviour [`Handler`][super::handler::Handler].
from_behaviour: mpsc::Receiver<ToListenerMsg>,
msg_to_behaviour: Option<BoxFuture<'static, Result<(), mpsc::SendError>>>,
/// The listener can be closed either manually with [`Transport::remove_listener`] or if
/// the sender side of the `from_behaviour` channel is dropped.
is_closed: bool,
}
impl Unpin for RelayListener {}
impl RelayListener {
/// Close the listener.
///
/// This will create a [`TransportEvent::ListenerClosed`] event
/// and terminate the stream once all remaining events in queue have
/// been reported.
fn close(&mut self, reason: Result<(), RelayError>) {
self.queued_events
.push_back(TransportEvent::ListenerClosed {
listener_id: self.listener_id,
reason,
});
self.is_closed = true;
}
}
impl Stream for RelayListener {
type Item =
Result<ListenerEvent<Ready<Result<RelayedConnection, RelayError>>, RelayError>, RelayError>;
type Item = TransportEvent<<ClientTransport as Transport>::ListenerUpgrade, RelayError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(msg) = &mut self.msg_to_behaviour {
match Future::poll(msg.as_mut(), cx) {
Poll::Ready(Ok(())) => self.msg_to_behaviour = None,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Pending => {}
}
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(Some(event));
}
if let Some(addr) = self.queued_new_addresses.pop_front() {
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(addr))));
if self.is_closed {
// Terminate the stream if the listener closed and all remaining events have been reported.
return Poll::Ready(None);
}
let msg = match ready!(self.from_behaviour.poll_next_unpin(cx)) {
Some(msg) => msg,
None => {
// Sender of `from_behaviour` has been dropped, signaling listener to close.
return Poll::Ready(None);
self.close(Ok(()));
continue;
}
};
let result = match msg {
match msg {
ToListenerMsg::Reservation(Ok(Reservation { addrs })) => {
debug_assert!(
self.queued_new_addresses.is_empty(),
self.queued_events.is_empty(),
"Assert empty due to previous `pop_front` attempt."
);
// Returned as [`ListenerEvent::NewAddress`] in next iteration of loop.
self.queued_new_addresses = addrs.into();
continue;
self.queued_events = addrs
.into_iter()
.map(|listen_addr| TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr,
})
.collect();
}
ToListenerMsg::IncomingRelayedConnection {
stream,
src_peer_id,
relay_addr,
relay_peer_id: _,
} => Ok(ListenerEvent::Upgrade {
upgrade: ready(Ok(stream)),
local_addr: relay_addr.with(Protocol::P2pCircuit),
remote_addr: Protocol::P2p(src_peer_id.into()).into(),
}),
ToListenerMsg::Reservation(Err(())) => Err(RelayError::Reservation),
};
} => {
let listener_id = self.listener_id;
return Poll::Ready(Some(result));
self.queued_events.push_back(TransportEvent::Incoming {
upgrade: ready(Ok(stream)),
listener_id,
local_addr: relay_addr.with(Protocol::P2pCircuit),
send_back_addr: Protocol::P2p(src_peer_id.into()).into(),
})
}
ToListenerMsg::Reservation(Err(())) => self.close(Err(RelayError::Reservation)),
};
}
}
}