Make errors on listener non-fatal (#1427)

* Make errors on listener non-fatal

* Fix bad rename

* Some changes to trait bounds

* Fix noise tests

* Apply suggestions from code review

Co-Authored-By: Toralf Wittner <tw@dtex.org>

* Add reason for closure

* Fix intra-doc link

Co-authored-by: Toralf Wittner <tw@dtex.org>
This commit is contained in:
Pierre Krieger 2020-02-13 12:05:45 +01:00 committed by GitHub
parent e2ddab223d
commit bbed28b3ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 246 additions and 87 deletions

View File

@ -357,12 +357,12 @@ pub enum EitherListenStream<A, B> {
Second(#[pin] B),
}
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
impl<AStream, BStream, AInner, BInner, AError, BError> Stream for EitherListenStream<AStream, BStream>
where
AStream: TryStream<Ok = ListenerEvent<AInner>>,
BStream: TryStream<Ok = ListenerEvent<BInner>>,
AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
{
type Item = Result<ListenerEvent<EitherFuture<AInner, BInner>>, EitherError<AStream::Error, BStream::Error>>;
type Item = Result<ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>, EitherError<AError, BError>>;
#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@ -371,13 +371,13 @@ where
EitherListenStream::First(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First)))),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First).map_err(EitherError::A)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
},
EitherListenStream::Second(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second)))),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second).map_err(EitherError::B)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
},
}

View File

@ -60,7 +60,7 @@ use std::{collections::VecDeque, fmt, pin::Pin};
/// ListenersEvent::AddressExpired { listener_id, listen_addr } => {
/// println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr);
/// },
/// ListenersEvent::Closed { listener_id } => {
/// ListenersEvent::Closed { listener_id, .. } => {
/// println!("Listener {:?} has been closed", listener_id);
/// },
/// ListenersEvent::Error { listener_id, error } => {
@ -148,6 +148,9 @@ where
Closed {
/// The ID of the listener that closed.
listener_id: ListenerId,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), TTrans::Error>,
},
/// A listener errored.
///
@ -157,7 +160,7 @@ where
/// The ID of the listener that errored.
listener_id: ListenerId,
/// The error value.
error: <TTrans::Listener as TryStream>::Error
error: TTrans::Error,
}
}
@ -269,15 +272,24 @@ where
listen_addr: a
})
}
Poll::Ready(Some(Ok(ListenerEvent::Error(error)))) => {
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::Error {
listener_id: id,
error,
})
}
Poll::Ready(None) => {
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
reason: Ok(()),
})
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(ListenersEvent::Error {
return Poll::Ready(ListenersEvent::Closed {
listener_id: *listener_project.id,
error: err
reason: Err(err),
})
}
}
@ -320,7 +332,7 @@ where
impl<TTrans> fmt::Debug for ListenersEvent<TTrans>
where
TTrans: Transport,
<TTrans::Listener as TryStream>::Error: fmt::Debug,
TTrans::Error: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
@ -339,9 +351,10 @@ where
.field("listener_id", listener_id)
.field("local_addr", local_addr)
.finish(),
ListenersEvent::Closed { listener_id } => f
ListenersEvent::Closed { listener_id, reason } => f
.debug_struct("ListenersEvent::Closed")
.field("listener_id", listener_id)
.field("reason", reason)
.finish(),
ListenersEvent::Error { listener_id, error } => f
.debug_struct("ListenersEvent::Error")
@ -356,6 +369,7 @@ where
mod tests {
use super::*;
use crate::transport;
use futures::prelude::*;
#[test]
fn incoming_event() {
@ -388,4 +402,79 @@ mod tests {
}
});
}
#[test]
fn listener_event_error_isnt_fatal() {
// Tests that a listener continues to be polled even after producing
// a `ListenerEvent::Error`.
#[derive(Clone)]
struct DummyTrans;
impl transport::Transport for DummyTrans {
type Output = ();
type Error = std::io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
Ok(Box::pin(stream::unfold((), |()| async move {
Some((Ok(ListenerEvent::Error(std::io::Error::from(std::io::ErrorKind::Other))), ()))
})))
}
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
}
async_std::task::block_on(async move {
let transport = DummyTrans;
let mut listeners = ListenersStream::new(transport);
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
for _ in 0..10 {
match listeners.next().await.unwrap() {
ListenersEvent::Error { .. } => {},
_ => panic!()
}
}
});
}
#[test]
fn listener_error_is_fatal() {
// Tests that a listener stops after producing an error on the stream itself.
#[derive(Clone)]
struct DummyTrans;
impl transport::Transport for DummyTrans {
type Output = ();
type Error = std::io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
Ok(Box::pin(stream::unfold((), |()| async move {
Some((Err(std::io::Error::from(std::io::ErrorKind::Other)), ()))
})))
}
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
}
async_std::task::block_on(async move {
let transport = DummyTrans;
let mut listeners = ListenersStream::new(transport);
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
match listeners.next().await.unwrap() {
ListenersEvent::Closed { .. } => {},
_ => panic!()
}
});
}
}

View File

@ -171,14 +171,17 @@ where
ListenerClosed {
/// The listener ID that closed.
listener_id: ListenerId,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
reason: Result<(), TTrans::Error>,
},
/// One of the listeners errored.
/// One of the listeners reported a non-fatal errored.
ListenerError {
/// The listener that errored.
listener_id: ListenerId,
/// The listener error.
error: <TTrans::Listener as TryStream>::Error
error: TTrans::Error
},
/// One of the listeners is now listening on an additional address.
@ -307,9 +310,10 @@ where
.field("listen_addr", listen_addr)
.finish()
}
NetworkEvent::ListenerClosed { listener_id } => {
NetworkEvent::ListenerClosed { listener_id, reason } => {
f.debug_struct("ListenerClosed")
.field("listener_id", listener_id)
.field("reason", reason)
.finish()
}
NetworkEvent::ListenerError { listener_id, error } => {
@ -1020,8 +1024,8 @@ where
Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => {
return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr })
}
Poll::Ready(ListenersEvent::Closed { listener_id }) => {
return Poll::Ready(NetworkEvent::ListenerClosed { listener_id })
Poll::Ready(ListenersEvent::Closed { listener_id, reason }) => {
return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, reason })
}
Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
return Poll::Ready(NetworkEvent::ListenerError { listener_id, error })

View File

@ -91,7 +91,10 @@ pub trait Transport {
/// transport stack. The item must be a [`ListenerUpgrade`](Transport::ListenerUpgrade) future
/// that resolves to an [`Output`](Transport::Output) value once all protocol upgrades
/// have been applied.
type Listener: TryStream<Ok = ListenerEvent<Self::ListenerUpgrade>, Error = Self::Error>;
///
/// If this stream produces an error, it is considered fatal and the listener is killed. It
/// is possible to report non-fatal errors by producing a [`ListenerEvent::Error`].
type Listener: Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
/// A pending [`Output`](Transport::Output) for an inbound connection,
/// obtained from the [`Listener`](Transport::Listener) stream.
@ -110,6 +113,9 @@ pub trait Transport {
/// Listens on the given [`Multiaddr`], producing a stream of pending, inbound connections
/// and addresses this transport is listening on (cf. [`ListenerEvent`]).
///
/// Returning an error from the stream is considered fatal. The listener can also report
/// non-fatal errors by producing a [`ListenerEvent::Error`].
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>>
where
Self: Sized;
@ -226,33 +232,52 @@ pub trait Transport {
/// a `NewAddress` event and which have not been invalidated by
/// an `AddressExpired` event yet.
#[derive(Clone, Debug, PartialEq)]
pub enum ListenerEvent<T> {
pub enum ListenerEvent<TUpgr, TErr> {
/// The transport is listening on a new additional [`Multiaddr`].
NewAddress(Multiaddr),
/// An upgrade, consisting of the upgrade future, the listener address and the remote address.
Upgrade {
/// The upgrade.
upgrade: T,
upgrade: TUpgr,
/// The local address which produced this upgrade.
local_addr: Multiaddr,
/// The remote address which produced this upgrade.
remote_addr: Multiaddr
},
/// A [`Multiaddr`] is no longer used for listening.
AddressExpired(Multiaddr)
AddressExpired(Multiaddr),
/// A non-fatal error has happened on the listener.
///
/// This event should be generated in order to notify the user that something wrong has
/// happened. The listener, however, continues to run.
Error(TErr),
}
impl<T> ListenerEvent<T> {
impl<TUpgr, TErr> ListenerEvent<TUpgr, TErr> {
/// In case this [`ListenerEvent`] is an upgrade, apply the given function
/// to the upgrade and multiaddress and produce another listener event
/// based the the function's result.
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> ListenerEvent<U> {
pub fn map<U>(self, f: impl FnOnce(TUpgr) -> U) -> ListenerEvent<U, TErr> {
match self {
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
ListenerEvent::Upgrade { upgrade: f(upgrade), local_addr, remote_addr }
}
ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a)
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
ListenerEvent::Error(e) => ListenerEvent::Error(e),
}
}
/// In case this [`ListenerEvent`] is an [`Error`](ListenerEvent::Error),
/// apply the given function to the error and produce another listener event based on the
/// function's result.
pub fn map_err<U>(self, f: impl FnOnce(TErr) -> U) -> ListenerEvent<TUpgr, U> {
match self {
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } =>
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr },
ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
ListenerEvent::Error(e) => ListenerEvent::Error(f(e)),
}
}
@ -269,7 +294,7 @@ impl<T> ListenerEvent<T> {
///
/// Returns `None` if the event is not actually an upgrade,
/// otherwise the upgrade and the remote address.
pub fn into_upgrade(self) -> Option<(T, Multiaddr)> {
pub fn into_upgrade(self) -> Option<(TUpgr, Multiaddr)> {
if let ListenerEvent::Upgrade { upgrade, remote_addr, .. } = self {
Some((upgrade, remote_addr))
} else {
@ -318,6 +343,27 @@ impl<T> ListenerEvent<T> {
None
}
}
/// Returns `true` if this is an `Error` listener event.
pub fn is_error(&self) -> bool {
if let ListenerEvent::Error(_) = self {
true
} else {
false
}
}
/// Try to turn this listener event into the `Error` part.
///
/// Returns `None` if the event is not actually a `Error`,
/// otherwise the error.
pub fn into_error(self) -> Option<TErr> {
if let ListenerEvent::Error(err) = self {
Some(err)
} else {
None
}
}
}
/// An error during [dialing][Transport::dial] or [listening][Transport::listen_on]

View File

@ -84,13 +84,13 @@ pub struct AndThenStream<TListener, TMap> {
impl<TListener, TMap, TTransOut, TMapOut, TListUpgr, TTransErr> Stream for AndThenStream<TListener, TMap>
where
TListener: TryStream<Ok = ListenerEvent<TListUpgr>, Error = TTransErr>,
TListener: TryStream<Ok = ListenerEvent<TListUpgr, TTransErr>, Error = TTransErr>,
TListUpgr: TryFuture<Ok = TTransOut, Error = TTransErr>,
TMap: FnOnce(TTransOut, ConnectedPoint) -> TMapOut + Clone,
TMapOut: TryFuture
{
type Item = Result<
ListenerEvent<AndThenFuture<TListUpgr, TMap, TMapOut>>,
ListenerEvent<AndThenFuture<TListUpgr, TMap, TMapOut>, EitherError<TTransErr, TMapOut::Error>>,
EitherError<TTransErr, TMapOut::Error>
>;
@ -115,8 +115,10 @@ where
}
}
ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a)
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
ListenerEvent::Error(e) => ListenerEvent::Error(EitherError::A(e)),
};
Poll::Ready(Some(Ok(event)))
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),

View File

@ -38,7 +38,7 @@ where
}
pub type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
pub type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>>, E>> + Send>>;
pub type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
pub type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
trait Abstract<O, E> {

View File

@ -56,7 +56,7 @@ impl<TOut> Clone for DummyTransport<TOut> {
impl<TOut> Transport for DummyTransport<TOut> {
type Output = TOut;
type Error = io::Error;
type Listener = futures::stream::Pending<Result<ListenerEvent<Self::ListenerUpgrade>, io::Error>>;
type Listener = futures::stream::Pending<Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
type ListenerUpgrade = futures::future::Pending<Result<Self::Output, io::Error>>;
type Dial = futures::future::Pending<Result<Self::Output, io::Error>>;

View File

@ -66,13 +66,13 @@ where
#[derive(Clone, Debug)]
pub struct MapStream<T, F> { #[pin] stream: T, fun: F }
impl<T, F, A, B, X> Stream for MapStream<T, F>
impl<T, F, A, B, X, E> Stream for MapStream<T, F>
where
T: TryStream<Ok = ListenerEvent<X>>,
T: TryStream<Ok = ListenerEvent<X, E>, Error = E>,
X: TryFuture<Ok = A>,
F: FnOnce(A, ConnectedPoint) -> B + Clone
{
type Item = Result<ListenerEvent<MapFuture<X, F>>, T::Error>;
type Item = Result<ListenerEvent<MapFuture<X, F>, E>, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
@ -94,7 +94,8 @@ where
}
}
ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a)
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a),
ListenerEvent::Error(e) => ListenerEvent::Error(e),
};
Poll::Ready(Some(Ok(event)))
}

View File

@ -80,19 +80,21 @@ where
F: FnOnce(T::Error) -> TErr + Clone,
TErr: error::Error,
{
type Item = Result<ListenerEvent<MapErrListenerUpgrade<T, F>>, TErr>;
type Item = Result<ListenerEvent<MapErrListenerUpgrade<T, F>, TErr>, TErr>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match TryStream::try_poll_next(this.inner, cx) {
Poll::Ready(Some(Ok(event))) => {
let map = &*this.map;
let event = event.map(move |value| {
MapErrListenerUpgrade {
inner: value,
map: Some(map.clone())
}
});
let event = event
.map(move |value| {
MapErrListenerUpgrade {
inner: value,
map: Some(map.clone())
}
})
.map_err(|err| (map.clone())(err));
Poll::Ready(Some(Ok(event)))
}
Poll::Ready(None) => Poll::Ready(None),

View File

@ -173,7 +173,7 @@ pub struct Listener {
}
impl Stream for Listener {
type Item = Result<ListenerEvent<Ready<Result<Channel<Vec<u8>>, MemoryTransportError>>>, MemoryTransportError>;
type Item = Result<ListenerEvent<Ready<Result<Channel<Vec<u8>>, MemoryTransportError>>, MemoryTransportError>, MemoryTransportError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.tell_listen_addr {

View File

@ -112,11 +112,11 @@ pub struct TimeoutListener<InnerStream> {
timeout: Duration,
}
impl<InnerStream, O> Stream for TimeoutListener<InnerStream>
impl<InnerStream, O, E> Stream for TimeoutListener<InnerStream>
where
InnerStream: TryStream<Ok = ListenerEvent<O>>,
InnerStream: TryStream<Ok = ListenerEvent<O, E>, Error = E>,
{
type Item = Result<ListenerEvent<Timeout<O>>, TransportTimeoutError<InnerStream::Error>>;
type Item = Result<ListenerEvent<Timeout<O>, TransportTimeoutError<E>>, TransportTimeoutError<E>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
@ -129,12 +129,14 @@ where
};
let timeout = *this.timeout;
let event = poll_out.map(move |inner_fut| {
Timeout {
inner: inner_fut,
timer: Delay::new(timeout),
}
});
let event = poll_out
.map(move |inner_fut| {
Timeout {
inner: inner_fut,
timer: Delay::new(timeout),
}
})
.map_err(TransportTimeoutError::Other);
Poll::Ready(Some(Ok(event)))
}

View File

@ -378,24 +378,26 @@ pub struct ListenerStream<S, U> {
upgrade: U
}
impl<S, U, F, I, C, D> Stream for ListenerStream<S, U>
impl<S, U, F, I, C, D, E> Stream for ListenerStream<S, U>
where
S: TryStream<Ok = ListenerEvent<F>>,
S: TryStream<Ok = ListenerEvent<F, E>, Error = E>,
F: TryFuture<Ok = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D> + Clone
{
type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, I, C>>, TransportUpgradeError<S::Error, U::Error>>;
type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, I, C>, TransportUpgradeError<E, U::Error>>, TransportUpgradeError<E, U::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) {
Some(Ok(event)) => {
let event = event.map(move |future| {
ListenerUpgradeFuture {
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade.clone()))
}
});
let event = event
.map(move |future| {
ListenerUpgradeFuture {
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade.clone()))
}
})
.map_err(TransportUpgradeError::Transport);
Poll::Ready(Some(Ok(event)))
}
Some(Err(err)) => {

View File

@ -146,7 +146,7 @@ fn run<T, U>(server_transport: T, client_transport: U, message1: Vec<u8>)
where
T: Transport<Output = Output>,
T::Dial: Send + 'static,
T::Listener: Send + Unpin + futures::stream::TryStream + 'static,
T::Listener: Send + Unpin + 'static,
T::ListenerUpgrade: Send + 'static,
U: Transport<Output = Output>,
U::Dial: Send + 'static,

View File

@ -89,11 +89,11 @@ pub struct BandwidthListener<TInner> {
sinks: Arc<BandwidthSinks>,
}
impl<TInner, TConn> Stream for BandwidthListener<TInner>
impl<TInner, TConn, TErr> Stream for BandwidthListener<TInner>
where
TInner: TryStream<Ok = ListenerEvent<TConn>>
TInner: TryStream<Ok = ListenerEvent<TConn, TErr>, Error = TErr>
{
type Item = Result<ListenerEvent<BandwidthFuture<TConn>>, TInner::Error>;
type Item = Result<ListenerEvent<BandwidthFuture<TConn>, TErr>, TErr>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();

View File

@ -98,7 +98,7 @@ where
type Error = DnsErr<T::Error>;
type Listener = stream::MapErr<
stream::MapOk<T::Listener,
fn(ListenerEvent<T::ListenerUpgrade>) -> ListenerEvent<Self::ListenerUpgrade>>,
fn(ListenerEvent<T::ListenerUpgrade, T::Error>) -> ListenerEvent<Self::ListenerUpgrade, Self::Error>>,
fn(T::Error) -> Self::Error>;
type ListenerUpgrade = future::MapErr<T::ListenerUpgrade, fn(T::Error) -> Self::Error>;
type Dial = future::Either<
@ -109,9 +109,13 @@ where
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let listener = self.inner.listen_on(addr).map_err(|err| err.map(DnsErr::Underlying))?;
let listener = listener
.map_ok::<_, fn(_) -> _>(|event| event.map(|upgr| {
upgr.map_err::<_, fn(_) -> _>(DnsErr::Underlying)
}))
.map_ok::<_, fn(_) -> _>(|event| {
event
.map(|upgr| {
upgr.map_err::<_, fn(_) -> _>(DnsErr::Underlying)
})
.map_err(DnsErr::Underlying)
})
.map_err::<_, fn(_) -> _>(DnsErr::Underlying);
Ok(listener)
}
@ -257,7 +261,7 @@ mod tests {
impl Transport for CustomTransport {
type Output = ();
type Error = std::io::Error;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>>;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;

View File

@ -93,7 +93,7 @@ impl $tcp_config {
impl Transport for $tcp_config {
type Output = $tcp_trans_stream;
type Error = io::Error;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade>, io::Error>> + Send>>;
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>> + Send>>;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = Pin<Box<dyn Future<Output = Result<$tcp_trans_stream, io::Error>> + Send>>;
@ -106,7 +106,7 @@ impl Transport for $tcp_config {
};
async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr)
-> Result<impl Stream<Item = Result<ListenerEvent<Ready<Result<$tcp_trans_stream, io::Error>>>, io::Error>>, io::Error>
-> Result<impl Stream<Item = Result<ListenerEvent<Ready<Result<$tcp_trans_stream, io::Error>>, io::Error>, io::Error>>, io::Error>
{
let listener = <$tcp_listener>::bind(&socket_addr).await?;
let local_addr = listener.local_addr()?;
@ -205,7 +205,7 @@ pub struct $tcp_listen_stream {
impl $tcp_listen_stream {
/// Takes ownership of the listener, and returns the next incoming event and the listener.
async fn next(mut self) -> (Result<ListenerEvent<Ready<Result<$tcp_trans_stream, io::Error>>>, io::Error>, Self) {
async fn next(mut self) -> (Result<ListenerEvent<Ready<Result<$tcp_trans_stream, io::Error>>, io::Error>, io::Error>, Self) {
loop {
if let Some(event) = self.pending.pop_front() {
return (event, self);
@ -221,7 +221,7 @@ impl $tcp_listen_stream {
Err(e) => {
debug!("error accepting incoming connection: {}", e);
self.pause = Some(Delay::new(self.pause_duration));
return (Err(e), self);
return (Ok(ListenerEvent::Error(e)), self);
}
};
@ -237,7 +237,7 @@ impl $tcp_listen_stream {
Ok(sock_addr) => {
if let Addresses::Many(ref mut addrs) = self.addrs {
if let Err(err) = check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending) {
return (Err(err), self);
return (Ok(ListenerEvent::Error(err)), self);
}
}
ip_to_multiaddr(sock_addr.ip(), sock_addr.port())
@ -416,7 +416,7 @@ enum Addresses {
Many(Vec<(IpAddr, IpNet, Multiaddr)>)
}
type Buffer<T> = VecDeque<Result<ListenerEvent<Ready<Result<T, io::Error>>>, io::Error>>;
type Buffer<T> = VecDeque<Result<ListenerEvent<Ready<Result<T, io::Error>>, io::Error>, io::Error>>;
// If we listen on all interfaces, find out to which interface the given
// socket address belongs. In case we think the address is new, check

View File

@ -63,7 +63,7 @@ impl $uds_config {
impl Transport for $uds_config {
type Output = $unix_stream;
type Error = io::Error;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>>;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;

View File

@ -230,7 +230,7 @@ pub struct Listen {
/// Promise that will yield the next `ListenEvent`.
next_event: Option<SendWrapper<JsFuture>>,
/// List of events that we are waiting to propagate.
pending_events: VecDeque<ListenerEvent<Ready<Result<Connection, JsErr>>>>,
pending_events: VecDeque<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>>,
}
impl fmt::Debug for Listen {
@ -240,7 +240,7 @@ impl fmt::Debug for Listen {
}
impl Stream for Listen {
type Item = Result<ListenerEvent<Ready<Result<Connection, JsErr>>>, JsErr>;
type Item = Result<ListenerEvent<Ready<Result<Connection, JsErr>>, JsErr>, JsErr>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
@ -248,11 +248,14 @@ impl Stream for Listen {
return Poll::Ready(Some(Ok(ev)));
}
// Try to fill `self.next_event` if necessary and possible. If we fail, then
// `Ready(None)` is returned below.
if self.next_event.is_none() {
let ev = self.iterator.next()?;
if !ev.done() {
let promise: js_sys::Promise = ev.value().into();
self.next_event = Some(SendWrapper::new(promise.into()));
if let Ok(ev) = self.iterator.next() {
if !ev.done() {
let promise: js_sys::Promise = ev.value().into();
self.next_event = Some(SendWrapper::new(promise.into()));
}
}
}
@ -296,9 +299,10 @@ impl Stream for Listen {
.into_iter()
.flat_map(|e| e.to_vec().into_iter())
{
let addr = js_value_to_addr(&addr)?;
self.pending_events
.push_back(ListenerEvent::AddressExpired(addr));
match js_value_to_addr(&addr) {
Ok(addr) => self.pending_events.push_back(ListenerEvent::NewAddress(addr)),
Err(err) => self.pending_events.push_back(ListenerEvent::Error(err)),
}
}
}
}

View File

@ -109,7 +109,7 @@ where
{
type Output = Connection<T::Output>;
type Error = Error<T::Error>;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>>;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade, Self::Error>, Self::Error>>;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;
@ -147,6 +147,9 @@ where
a = a.with(proto.clone());
ListenerEvent::AddressExpired(a)
}
ListenerEvent::Error(err) => {
ListenerEvent::Error(Error::Transport(err))
}
ListenerEvent::Upgrade { upgrade, mut local_addr, mut remote_addr } => {
local_addr = local_addr.with(proto.clone());
remote_addr = remote_addr.with(proto.clone());

View File

@ -117,7 +117,7 @@ where
}
/// Type alias corresponding to `framed::WsConfig::Listener`.
pub type InnerStream<T, E> = BoxStream<'static, Result<ListenerEvent<InnerFuture<T, E>>, Error<E>>>;
pub type InnerStream<T, E> = BoxStream<'static, Result<ListenerEvent<InnerFuture<T, E>, Error<E>>, Error<E>>>;
/// Type alias corresponding to `framed::WsConfig::Dial` and `framed::WsConfig::ListenerUpgrade`.
pub type InnerFuture<T, E> = BoxFuture<'static, Result<Connection<T>, Error<E>>>;