core/muxing: Replace Into<io::Error> bound on StreamMuxer with std::error::Error (#2710)

* core/muxing: Remove `Into<io::Error>` bound from `StreamMuxer::Error`

This allows us to preserve the type information of a muxer's concrete
error as long as possible. For `StreamMuxerBox`, we leverage `io::Error`'s
capability of wrapping any error that implements `Into<Box<dyn Error>>`.

* Use `?` in `Connection::poll`

* Use `?` in `muxing::boxed::Wrap`

* Use `futures::ready!` in `muxing::boxed::Wrap`

* Fill PR number into changelog

* Put `Error + Send + Sync` bounds directly on `StreamMuxer::Error`

* Move `Send + Sync` bounds to higher layers

* Use `map_inbound_stream` helper

* Update changelog to match new implementation
This commit is contained in:
Thomas Eizinger
2022-06-24 08:26:49 +02:00
committed by GitHub
parent eb490c08e9
commit 0f40e513cc
8 changed files with 49 additions and 45 deletions

View File

@ -3,9 +3,11 @@
- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691]. - Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait - Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707]. in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].
[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691 [PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691
[PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707 [PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707
[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710
# 0.33.0 # 0.33.0

View File

@ -203,7 +203,7 @@ where
{ {
type Substream = EitherOutput<A::Substream, B::Substream>; type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>; type OutboundSubstream = EitherOutbound<A, B>;
type Error = io::Error; type Error = EitherError<A::Error, B::Error>;
fn poll_event( fn poll_event(
&self, &self,
@ -212,11 +212,11 @@ where
match self { match self {
EitherOutput::First(inner) => inner EitherOutput::First(inner) => inner
.poll_event(cx) .poll_event(cx)
.map_err(|e| e.into()) .map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)), .map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
EitherOutput::Second(inner) => inner EitherOutput::Second(inner) => inner
.poll_event(cx) .poll_event(cx)
.map_err(|e| e.into()) .map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)), .map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
} }
} }
@ -237,11 +237,11 @@ where
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream) .poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First)) .map(|p| p.map(EitherOutput::First))
.map_err(|e| e.into()), .map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream) .poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second)) .map(|p| p.map(EitherOutput::Second))
.map_err(|e| e.into()), .map_err(EitherError::B),
_ => panic!("Wrong API usage"), _ => panic!("Wrong API usage"),
} }
} }
@ -261,8 +261,8 @@ where
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self { match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()), EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()), EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
} }
} }
} }

View File

@ -52,7 +52,6 @@
use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite}; use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use std::io;
pub use self::boxed::StreamMuxerBox; pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox; pub use self::boxed::SubstreamBox;
@ -76,7 +75,7 @@ pub trait StreamMuxer {
type OutboundSubstream; type OutboundSubstream;
/// Error type of the muxer /// Error type of the muxer
type Error: Into<io::Error>; type Error: std::error::Error;
/// Polls for a connection-wide event. /// Polls for a connection-wide event.
/// ///

View File

@ -1,8 +1,9 @@
use crate::muxing::StreamMuxerEvent; use crate::muxing::StreamMuxerEvent;
use crate::StreamMuxer; use crate::StreamMuxer;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::{AsyncRead, AsyncWrite}; use futures::{ready, AsyncRead, AsyncWrite};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::error::Error;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::io::{IoSlice, IoSliceMut}; use std::io::{IoSlice, IoSliceMut};
@ -38,6 +39,7 @@ impl<T> StreamMuxer for Wrap<T>
where where
T: StreamMuxer, T: StreamMuxer,
T::Substream: Send + Unpin + 'static, T::Substream: Send + Unpin + 'static,
T::Error: Send + Sync + 'static,
{ {
type Substream = SubstreamBox; type Substream = SubstreamBox;
type OutboundSubstream = usize; // TODO: use a newtype type OutboundSubstream = usize; // TODO: use a newtype
@ -48,18 +50,10 @@ where
&self, &self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> { ) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
let substream = match self.inner.poll_event(cx) { let event = ready!(self.inner.poll_event(cx).map_err(into_io_error)?)
Poll::Pending => return Poll::Pending, .map_inbound_stream(SubstreamBox::new);
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a))) => {
return Poll::Ready(Ok(StreamMuxerEvent::AddressChange(a)))
}
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(SubstreamBox::new( Poll::Ready(Ok(event))
substream,
))))
} }
#[inline] #[inline]
@ -77,16 +71,12 @@ where
substream: &mut Self::OutboundSubstream, substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> { ) -> Poll<Result<Self::Substream, Self::Error>> {
let mut list = self.outbound.lock(); let mut list = self.outbound.lock();
let substream = match self let stream = ready!(self
.inner .inner
.poll_outbound(cx, list.get_mut(substream).unwrap()) .poll_outbound(cx, list.get_mut(substream).unwrap())
{ .map_err(into_io_error)?);
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(s)) => s,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
};
Poll::Ready(Ok(SubstreamBox::new(substream))) Poll::Ready(Ok(SubstreamBox::new(stream)))
} }
#[inline] #[inline]
@ -98,10 +88,17 @@ where
#[inline] #[inline]
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close(cx).map_err(|e| e.into()) self.inner.poll_close(cx).map_err(into_io_error)
} }
} }
fn into_io_error<E>(err: E) -> io::Error
where
E: Error + Send + Sync + 'static,
{
io::Error::new(io::ErrorKind::Other, err)
}
impl StreamMuxerBox { impl StreamMuxerBox {
/// Turns a stream muxer into a `StreamMuxerBox`. /// Turns a stream muxer into a `StreamMuxerBox`.
pub fn new<T>(muxer: T) -> StreamMuxerBox pub fn new<T>(muxer: T) -> StreamMuxerBox
@ -109,6 +106,7 @@ impl StreamMuxerBox {
T: StreamMuxer + Send + Sync + 'static, T: StreamMuxer + Send + Sync + 'static,
T::OutboundSubstream: Send, T::OutboundSubstream: Send,
T::Substream: Send + Unpin + 'static, T::Substream: Send + Unpin + 'static,
T::Error: Send + Sync + 'static,
{ {
let wrap = Wrap { let wrap = Wrap {
inner: muxer, inner: muxer,

View File

@ -302,6 +302,7 @@ impl<T> Multiplexed<T> {
M: StreamMuxer + Send + Sync + 'static, M: StreamMuxer + Send + Sync + 'static,
M::Substream: Send + Unpin + 'static, M::Substream: Send + Unpin + 'static,
M::OutboundSubstream: Send + 'static, M::OutboundSubstream: Send + 'static,
M::Error: Send + Sync + 'static,
{ {
boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))) boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m))))
} }

View File

@ -137,40 +137,38 @@ where
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> { ) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
loop { loop {
// Poll the handler for new events. // Poll the handler for new events.
match self.handler.poll(cx) { match self.handler.poll(cx)? {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Ok(handler_wrapper::Event::OutboundSubstreamRequest(user_data))) => { Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => {
self.muxing.open_substream(user_data); self.muxing.open_substream(user_data);
continue; continue;
} }
Poll::Ready(Ok(handler_wrapper::Event::Custom(event))) => { Poll::Ready(handler_wrapper::Event::Custom(event)) => {
return Poll::Ready(Ok(Event::Handler(event))); return Poll::Ready(Ok(Event::Handler(event)));
} }
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
} }
// Perform I/O on the connection through the muxer, informing the handler // Perform I/O on the connection through the muxer, informing the handler
// of new substreams. // of new substreams.
match self.muxing.poll(cx) { match self.muxing.poll(cx)? {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })) => { Poll::Ready(SubstreamEvent::InboundSubstream { substream }) => {
self.handler self.handler
.inject_substream(substream, SubstreamEndpoint::Listener); .inject_substream(substream, SubstreamEndpoint::Listener);
continue; continue;
} }
Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { Poll::Ready(SubstreamEvent::OutboundSubstream {
user_data, user_data,
substream, substream,
})) => { }) => {
let endpoint = SubstreamEndpoint::Dialer(user_data); let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint); self.handler.inject_substream(substream, endpoint);
continue; continue;
} }
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => { Poll::Ready(SubstreamEvent::AddressChange(address)) => {
self.handler.inject_address_change(&address); self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address))); return Poll::Ready(Ok(Event::AddressChange(address)));
} }
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
} }
return Poll::Pending; return Poll::Pending;

View File

@ -75,6 +75,12 @@ impl<THandlerErr> From<handler_wrapper::Error<THandlerErr>> for ConnectionError<
} }
} }
impl<THandlerErr> From<io::Error> for ConnectionError<THandlerErr> {
fn from(error: io::Error) -> Self {
ConnectionError::IO(error)
}
}
/// Errors that can occur in the context of a pending outgoing `Connection`. /// Errors that can occur in the context of a pending outgoing `Connection`.
/// ///
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to /// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to

View File

@ -23,7 +23,7 @@ use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use smallvec::SmallVec; use smallvec::SmallVec;
use std::sync::Arc; use std::sync::Arc;
use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll}; use std::{fmt, pin::Pin, task::Context, task::Poll};
/// Endpoint for a received substream. /// Endpoint for a received substream.
#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
@ -134,7 +134,7 @@ where
pub fn poll( pub fn poll(
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> { ) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, TMuxer::Error>> {
// Polling inbound substream. // Polling inbound substream.
match self.inner.poll_event(cx) { match self.inner.poll_event(cx) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => {
@ -143,7 +143,7 @@ where
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => {
return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr)))
} }
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => {} Poll::Pending => {}
} }
@ -164,7 +164,7 @@ where
} }
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
self.inner.destroy_outbound(outbound); self.inner.destroy_outbound(outbound);
return Poll::Ready(Err(err.into())); return Poll::Ready(Err(err));
} }
} }
} }
@ -203,13 +203,13 @@ impl<TMuxer> Future for Close<TMuxer>
where where
TMuxer: StreamMuxer, TMuxer: StreamMuxer,
{ {
type Output = Result<(), IoError>; type Output = Result<(), TMuxer::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.muxer.poll_close(cx) { match self.muxer.poll_close(cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Ok(())), Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
} }
} }
} }