Cleanups in libp2p-core in stable-futures branch

This commit is contained in:
Pierre Krieger 2019-12-10 11:46:30 +01:00
parent 7e9175716e
commit ad42b00981
No known key found for this signature in database
GPG Key ID: EE749C4F41D4EA47
10 changed files with 161 additions and 195 deletions

View File

@ -25,6 +25,7 @@ multiaddr = { package = "parity-multiaddr", version = "0.6.0", path = "../misc/m
multihash = { package = "parity-multihash", version = "0.2.0", path = "../misc/multihash" } multihash = { package = "parity-multihash", version = "0.2.0", path = "../misc/multihash" }
multistream-select = { version = "0.6.0", path = "../misc/multistream-select" } multistream-select = { version = "0.6.0", path = "../misc/multistream-select" }
parking_lot = "0.9.0" parking_lot = "0.9.0"
pin-project = "0.4.6"
protobuf = "2.8" protobuf = "2.8"
quick-error = "1.2" quick-error = "1.2"
rand = "0.7" rand = "0.7"

View File

@ -20,7 +20,8 @@
use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent}; use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent};
use futures::prelude::*; use futures::prelude::*;
use std::{fmt, io::{Error as IoError, Read, Write}, pin::Pin, task::Context, task::Poll}; use pin_project::{pin_project, project};
use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll};
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum EitherError<A, B> { pub enum EitherError<A, B> {
@ -56,99 +57,75 @@ where
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to /// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
/// either `First` or `Second`. /// either `First` or `Second`.
#[pin_project]
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum EitherOutput<A, B> { pub enum EitherOutput<A, B> {
First(A), First(#[pin] A),
Second(B), Second(#[pin] B),
} }
impl<A, B> AsyncRead for EitherOutput<A, B> impl<A, B> AsyncRead for EitherOutput<A, B>
where where
A: AsyncRead + Unpin, A: AsyncRead,
B: AsyncRead + Unpin, B: AsyncRead,
{ {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, IoError>> { #[project]
match &mut *self { fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<Result<usize, IoError>> {
EitherOutput::First(a) => AsyncRead::poll_read(Pin::new(a), cx, buf), #[project]
EitherOutput::Second(b) => AsyncRead::poll_read(Pin::new(b), cx, buf), match self.project() {
} EitherOutput::First(a) => AsyncRead::poll_read(a, cx, buf),
} EitherOutput::Second(b) => AsyncRead::poll_read(b, cx, buf),
}
// TODO: remove?
impl<A, B> Read for EitherOutput<A, B>
where
A: Read,
B: Read,
{
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
match self {
EitherOutput::First(a) => a.read(buf),
EitherOutput::Second(b) => b.read(buf),
} }
} }
} }
impl<A, B> AsyncWrite for EitherOutput<A, B> impl<A, B> AsyncWrite for EitherOutput<A, B>
where where
A: AsyncWrite + Unpin, A: AsyncWrite,
B: AsyncWrite + Unpin, B: AsyncWrite,
{ {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, IoError>> { #[project]
match &mut *self { fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<Result<usize, IoError>> {
EitherOutput::First(a) => AsyncWrite::poll_write(Pin::new(a), cx, buf), #[project]
EitherOutput::Second(b) => AsyncWrite::poll_write(Pin::new(b), cx, buf), match self.project() {
EitherOutput::First(a) => AsyncWrite::poll_write(a, cx, buf),
EitherOutput::Second(b) => AsyncWrite::poll_write(b, cx, buf),
} }
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> { #[project]
match &mut *self { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
EitherOutput::First(a) => AsyncWrite::poll_flush(Pin::new(a), cx), #[project]
EitherOutput::Second(b) => AsyncWrite::poll_flush(Pin::new(b), cx), match self.project() {
EitherOutput::First(a) => AsyncWrite::poll_flush(a, cx),
EitherOutput::Second(b) => AsyncWrite::poll_flush(b, cx),
} }
} }
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> { #[project]
match &mut *self { fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
EitherOutput::First(a) => AsyncWrite::poll_close(Pin::new(a), cx), #[project]
EitherOutput::Second(b) => AsyncWrite::poll_close(Pin::new(b), cx), match self.project() {
} EitherOutput::First(a) => AsyncWrite::poll_close(a, cx),
} EitherOutput::Second(b) => AsyncWrite::poll_close(b, cx),
}
// TODO: remove?
impl<A, B> Write for EitherOutput<A, B>
where
A: Write,
B: Write,
{
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
match self {
EitherOutput::First(a) => a.write(buf),
EitherOutput::Second(b) => b.write(buf),
}
}
fn flush(&mut self) -> Result<(), IoError> {
match self {
EitherOutput::First(a) => a.flush(),
EitherOutput::Second(b) => b.flush(),
} }
} }
} }
impl<A, B, I> Stream for EitherOutput<A, B> impl<A, B, I> Stream for EitherOutput<A, B>
where where
A: TryStream<Ok = I> + Unpin, A: TryStream<Ok = I>,
B: TryStream<Ok = I> + Unpin, B: TryStream<Ok = I>,
{ {
type Item = Result<I, EitherError<A::Error, B::Error>>; type Item = Result<I, EitherError<A::Error, B::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { #[project]
match &mut *self { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
EitherOutput::First(a) => TryStream::try_poll_next(Pin::new(a), cx) #[project]
match self.project() {
EitherOutput::First(a) => TryStream::try_poll_next(a, cx)
.map(|v| v.map(|r| r.map_err(EitherError::A))), .map(|v| v.map(|r| r.map_err(EitherError::A))),
EitherOutput::Second(b) => TryStream::try_poll_next(Pin::new(b), cx) EitherOutput::Second(b) => TryStream::try_poll_next(b, cx)
.map(|v| v.map(|r| r.map_err(EitherError::B))), .map(|v| v.map(|r| r.map_err(EitherError::B))),
} }
} }
@ -161,31 +138,39 @@ where
{ {
type Error = EitherError<A::Error, B::Error>; type Error = EitherError<A::Error, B::Error>;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { #[project]
match &mut *self { fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
EitherOutput::First(a) => Sink::poll_ready(Pin::new(a), cx).map_err(EitherError::A), #[project]
EitherOutput::Second(b) => Sink::poll_ready(Pin::new(b), cx).map_err(EitherError::B), match self.project() {
EitherOutput::First(a) => Sink::poll_ready(a, cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_ready(b, cx).map_err(EitherError::B),
} }
} }
fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { #[project]
match &mut *self { fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
EitherOutput::First(a) => Sink::start_send(Pin::new(a), item).map_err(EitherError::A), #[project]
EitherOutput::Second(b) => Sink::start_send(Pin::new(b), item).map_err(EitherError::B), match self.project() {
EitherOutput::First(a) => Sink::start_send(a, item).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::start_send(b, item).map_err(EitherError::B),
} }
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { #[project]
match &mut *self { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
EitherOutput::First(a) => Sink::poll_flush(Pin::new(a), cx).map_err(EitherError::A), #[project]
EitherOutput::Second(b) => Sink::poll_flush(Pin::new(b), cx).map_err(EitherError::B), match self.project() {
EitherOutput::First(a) => Sink::poll_flush(a, cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_flush(b, cx).map_err(EitherError::B),
} }
} }
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { #[project]
match &mut *self { fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
EitherOutput::First(a) => Sink::poll_close(Pin::new(a), cx).map_err(EitherError::A), #[project]
EitherOutput::Second(b) => Sink::poll_close(Pin::new(b), cx).map_err(EitherError::B), match self.project() {
EitherOutput::First(a) => Sink::poll_close(a, cx).map_err(EitherError::A),
EitherOutput::Second(b) => Sink::poll_close(b, cx).map_err(EitherError::B),
} }
} }
} }
@ -337,29 +322,32 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
} }
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`. /// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project]
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub enum EitherListenStream<A, B> { pub enum EitherListenStream<A, B> {
First(A), First(#[pin] A),
Second(B), Second(#[pin] B),
} }
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream> impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
where where
AStream: TryStream<Ok = ListenerEvent<AInner>> + Unpin, AStream: TryStream<Ok = ListenerEvent<AInner>>,
BStream: TryStream<Ok = ListenerEvent<BInner>> + Unpin, BStream: TryStream<Ok = ListenerEvent<BInner>>,
{ {
type Item = Result<ListenerEvent<EitherFuture<AInner, BInner>>, EitherError<AStream::Error, BStream::Error>>; type Item = Result<ListenerEvent<EitherFuture<AInner, BInner>>, EitherError<AStream::Error, BStream::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { #[project]
match &mut *self { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
EitherListenStream::First(a) => match TryStream::try_poll_next(Pin::new(a), cx) { #[project]
match self.project() {
EitherListenStream::First(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First)))), Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::First)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))), Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
}, },
EitherListenStream::Second(a) => match TryStream::try_poll_next(Pin::new(a), cx) { EitherListenStream::Second(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second)))), Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le.map(EitherFuture::Second)))),
@ -370,33 +358,37 @@ where
} }
/// Implements `Future` and dispatches all method calls to either `First` or `Second`. /// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project]
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub enum EitherFuture<A, B> { pub enum EitherFuture<A, B> {
First(A), First(#[pin] A),
Second(B), Second(#[pin] B),
} }
impl<AFuture, BFuture, AInner, BInner> Future for EitherFuture<AFuture, BFuture> impl<AFuture, BFuture, AInner, BInner> Future for EitherFuture<AFuture, BFuture>
where where
AFuture: TryFuture<Ok = AInner> + Unpin, AFuture: TryFuture<Ok = AInner>,
BFuture: TryFuture<Ok = BInner> + Unpin, BFuture: TryFuture<Ok = BInner>,
{ {
type Output = Result<EitherOutput<AInner, BInner>, EitherError<AFuture::Error, BFuture::Error>>; type Output = Result<EitherOutput<AInner, BInner>, EitherError<AFuture::Error, BFuture::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { #[project]
match &mut *self { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
EitherFuture::First(a) => TryFuture::try_poll(Pin::new(a), cx) #[project]
match self.project() {
EitherFuture::First(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::First).map_err(EitherError::A), .map_ok(EitherOutput::First).map_err(EitherError::A),
EitherFuture::Second(a) => TryFuture::try_poll(Pin::new(a), cx) EitherFuture::Second(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::Second).map_err(EitherError::B), .map_ok(EitherOutput::Second).map_err(EitherError::B),
} }
} }
} }
#[pin_project]
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub enum EitherFuture2<A, B> { A(A), B(B) } pub enum EitherFuture2<A, B> { A(#[pin] A), B(#[pin] B) }
impl<AFut, BFut, AItem, BItem, AError, BError> Future for EitherFuture2<AFut, BFut> impl<AFut, BFut, AItem, BItem, AError, BError> Future for EitherFuture2<AFut, BFut>
where where
@ -405,11 +397,13 @@ where
{ {
type Output = Result<EitherOutput<AItem, BItem>, EitherError<AError, BError>>; type Output = Result<EitherOutput<AItem, BItem>, EitherError<AError, BError>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { #[project]
match &mut *self { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
EitherFuture2::A(a) => TryFuture::try_poll(Pin::new(a), cx) #[project]
match self.project() {
EitherFuture2::A(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::First).map_err(EitherError::A), .map_ok(EitherOutput::First).map_err(EitherError::A),
EitherFuture2::B(a) => TryFuture::try_poll(Pin::new(a), cx) EitherFuture2::B(a) => TryFuture::try_poll(a, cx)
.map_ok(EitherOutput::Second).map_err(EitherError::B), .map_ok(EitherOutput::Second).map_err(EitherError::B),
} }
} }

View File

@ -18,8 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
#![cfg_attr(feature = "async-await", feature(async_await))]
//! Transports, upgrades, multiplexing and node handling of *libp2p*. //! Transports, upgrades, multiplexing and node handling of *libp2p*.
//! //!
//! The main concepts of libp2p-core are: //! The main concepts of libp2p-core are:

View File

@ -35,13 +35,7 @@ impl<A, B> OrTransport<A, B> {
impl<A, B> Transport for OrTransport<A, B> impl<A, B> Transport for OrTransport<A, B>
where where
B: Transport, B: Transport,
B::Dial: Unpin,
B::Listener: Unpin,
B::ListenerUpgrade: Unpin,
A: Transport, A: Transport,
A::Dial: Unpin,
A::Listener: Unpin,
A::ListenerUpgrade: Unpin,
{ {
type Output = EitherOutput<A::Output, B::Output>; type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>; type Error = EitherError<A::Error, B::Error>;

View File

@ -39,9 +39,6 @@ impl<T, F> Map<T, F> {
impl<T, F, D> Transport for Map<T, F> impl<T, F, D> Transport for Map<T, F>
where where
T: Transport, T: Transport,
T::Dial: Unpin,
T::Listener: Unpin,
T::ListenerUpgrade: Unpin,
F: FnOnce(T::Output, ConnectedPoint) -> D + Clone F: FnOnce(T::Output, ConnectedPoint) -> D + Clone
{ {
type Output = D; type Output = D;
@ -65,22 +62,21 @@ where
/// Custom `Stream` implementation to avoid boxing. /// Custom `Stream` implementation to avoid boxing.
/// ///
/// Maps a function over every stream item. /// Maps a function over every stream item.
#[pin_project::pin_project]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MapStream<T, F> { stream: T, fun: F } pub struct MapStream<T, F> { #[pin] stream: T, fun: F }
impl<T, F> Unpin for MapStream<T, F> {
}
impl<T, F, A, B, X> Stream for MapStream<T, F> impl<T, F, A, B, X> Stream for MapStream<T, F>
where where
T: TryStream<Ok = ListenerEvent<X>> + Unpin, T: TryStream<Ok = ListenerEvent<X>>,
X: TryFuture<Ok = A>, X: TryFuture<Ok = A>,
F: FnOnce(A, ConnectedPoint) -> B + Clone F: FnOnce(A, ConnectedPoint) -> B + Clone
{ {
type Item = Result<ListenerEvent<MapFuture<X, F>>, T::Error>; type Item = Result<ListenerEvent<MapFuture<X, F>>, T::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match TryStream::try_poll_next(Pin::new(&mut self.stream), cx) { let this = self.project();
match TryStream::try_poll_next(this.stream, cx) {
Poll::Ready(Some(Ok(event))) => { Poll::Ready(Some(Ok(event))) => {
let event = match event { let event = match event {
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => { ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
@ -91,7 +87,7 @@ where
ListenerEvent::Upgrade { ListenerEvent::Upgrade {
upgrade: MapFuture { upgrade: MapFuture {
inner: upgrade, inner: upgrade,
args: Some((self.fun.clone(), point)) args: Some((this.fun.clone(), point))
}, },
local_addr, local_addr,
remote_addr remote_addr
@ -112,30 +108,29 @@ where
/// Custom `Future` to avoid boxing. /// Custom `Future` to avoid boxing.
/// ///
/// Applies a function to the inner future's result. /// Applies a function to the inner future's result.
#[pin_project::pin_project]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MapFuture<T, F> { pub struct MapFuture<T, F> {
#[pin]
inner: T, inner: T,
args: Option<(F, ConnectedPoint)> args: Option<(F, ConnectedPoint)>
} }
impl<T, F> Unpin for MapFuture<T, F> {
}
impl<T, A, F, B> Future for MapFuture<T, F> impl<T, A, F, B> Future for MapFuture<T, F>
where where
T: TryFuture<Ok = A> + Unpin, T: TryFuture<Ok = A>,
F: FnOnce(A, ConnectedPoint) -> B F: FnOnce(A, ConnectedPoint) -> B
{ {
type Output = Result<B, T::Error>; type Output = Result<B, T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let item = match TryFuture::try_poll(Pin::new(&mut self.inner), cx) { let this = self.project();
let item = match TryFuture::try_poll(this.inner, cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(v)) => v, Poll::Ready(Ok(v)) => v,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}; };
let (f, a) = self.args.take().expect("MapFuture has already finished."); let (f, a) = this.args.take().expect("MapFuture has already finished.");
Poll::Ready(Ok(f(item, a))) Poll::Ready(Ok(f(item, a)))
} }
} }

View File

@ -40,9 +40,6 @@ impl<T, F> MapErr<T, F> {
impl<T, F, TErr> Transport for MapErr<T, F> impl<T, F, TErr> Transport for MapErr<T, F>
where where
T: Transport, T: Transport,
T::Dial: Unpin,
T::Listener: Unpin,
T::ListenerUpgrade: Unpin,
F: FnOnce(T::Error) -> TErr + Clone, F: FnOnce(T::Error) -> TErr + Clone,
TErr: error::Error, TErr: error::Error,
{ {
@ -70,67 +67,62 @@ where
} }
/// Listening stream for `MapErr`. /// Listening stream for `MapErr`.
#[pin_project::pin_project]
pub struct MapErrListener<T: Transport, F> { pub struct MapErrListener<T: Transport, F> {
#[pin]
inner: T::Listener, inner: T::Listener,
map: F, map: F,
} }
impl<T, F> Unpin for MapErrListener<T, F>
where T: Transport
{
}
impl<T, F, TErr> Stream for MapErrListener<T, F> impl<T, F, TErr> Stream for MapErrListener<T, F>
where where
T: Transport, T: Transport,
T::Listener: Unpin,
F: FnOnce(T::Error) -> TErr + Clone, F: FnOnce(T::Error) -> TErr + Clone,
TErr: error::Error, TErr: error::Error,
{ {
type Item = Result<ListenerEvent<MapErrListenerUpgrade<T, F>>, TErr>; type Item = Result<ListenerEvent<MapErrListenerUpgrade<T, F>>, TErr>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match TryStream::try_poll_next(Pin::new(&mut self.inner), cx) { let this = self.project();
match TryStream::try_poll_next(this.inner, cx) {
Poll::Ready(Some(Ok(event))) => { Poll::Ready(Some(Ok(event))) => {
let map = &*this.map;
let event = event.map(move |value| { let event = event.map(move |value| {
MapErrListenerUpgrade { MapErrListenerUpgrade {
inner: value, inner: value,
map: Some(self.map.clone()) map: Some(map.clone())
} }
}); });
Poll::Ready(Some(Ok(event))) Poll::Ready(Some(Ok(event)))
} }
Poll::Ready(None) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err((self.map.clone())(err)))), Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err((this.map.clone())(err)))),
} }
} }
} }
/// Listening upgrade future for `MapErr`. /// Listening upgrade future for `MapErr`.
#[pin_project::pin_project]
pub struct MapErrListenerUpgrade<T: Transport, F> { pub struct MapErrListenerUpgrade<T: Transport, F> {
#[pin]
inner: T::ListenerUpgrade, inner: T::ListenerUpgrade,
map: Option<F>, map: Option<F>,
} }
impl<T, F> Unpin for MapErrListenerUpgrade<T, F>
where T: Transport
{
}
impl<T, F, TErr> Future for MapErrListenerUpgrade<T, F> impl<T, F, TErr> Future for MapErrListenerUpgrade<T, F>
where T: Transport, where T: Transport,
T::ListenerUpgrade: Unpin,
F: FnOnce(T::Error) -> TErr, F: FnOnce(T::Error) -> TErr,
{ {
type Output = Result<T::Output, TErr>; type Output = Result<T::Output, TErr>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.inner), cx) { let this = self.project();
match Future::poll(this.inner, cx) {
Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)), Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
let map = self.map.take().expect("poll() called again after error"); let map = this.map.take().expect("poll() called again after error");
Poll::Ready(Err(map(err))) Poll::Ready(Err(map(err)))
} }
} }
@ -138,30 +130,27 @@ where T: Transport,
} }
/// Dialing future for `MapErr`. /// Dialing future for `MapErr`.
#[pin_project::pin_project]
pub struct MapErrDial<T: Transport, F> { pub struct MapErrDial<T: Transport, F> {
#[pin]
inner: T::Dial, inner: T::Dial,
map: Option<F>, map: Option<F>,
} }
impl<T, F> Unpin for MapErrDial<T, F>
where T: Transport
{
}
impl<T, F, TErr> Future for MapErrDial<T, F> impl<T, F, TErr> Future for MapErrDial<T, F>
where where
T: Transport, T: Transport,
T::Dial: Unpin,
F: FnOnce(T::Error) -> TErr, F: FnOnce(T::Error) -> TErr,
{ {
type Output = Result<T::Output, TErr>; type Output = Result<T::Output, TErr>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.inner), cx) { let this = self.project();
match Future::poll(this.inner, cx) {
Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)), Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
let map = self.map.take().expect("poll() called again after error"); let map = this.map.take().expect("poll() called again after error");
Poll::Ready(Err(map(err))) Poll::Ready(Err(map(err)))
} }
} }

View File

@ -74,9 +74,6 @@ impl<InnerTrans> Transport for TransportTimeout<InnerTrans>
where where
InnerTrans: Transport, InnerTrans: Transport,
InnerTrans::Error: 'static, InnerTrans::Error: 'static,
InnerTrans::Dial: Unpin,
InnerTrans::Listener: Unpin,
InnerTrans::ListenerUpgrade: Unpin,
{ {
type Output = InnerTrans::Output; type Output = InnerTrans::Output;
type Error = TransportTimeoutError<InnerTrans::Error>; type Error = TransportTimeoutError<InnerTrans::Error>;
@ -108,29 +105,34 @@ where
// TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable // TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable
// in Rust (https://github.com/rust-lang/rust/issues/34511) // in Rust (https://github.com/rust-lang/rust/issues/34511)
#[pin_project::pin_project]
pub struct TimeoutListener<InnerStream> { pub struct TimeoutListener<InnerStream> {
#[pin]
inner: InnerStream, inner: InnerStream,
timeout: Duration, timeout: Duration,
} }
impl<InnerStream, O> Stream for TimeoutListener<InnerStream> impl<InnerStream, O> Stream for TimeoutListener<InnerStream>
where where
InnerStream: TryStream<Ok = ListenerEvent<O>> + Unpin InnerStream: TryStream<Ok = ListenerEvent<O>>,
{ {
type Item = Result<ListenerEvent<Timeout<O>>, TransportTimeoutError<InnerStream::Error>>; type Item = Result<ListenerEvent<Timeout<O>>, TransportTimeoutError<InnerStream::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let poll_out = match TryStream::try_poll_next(Pin::new(&mut self.inner), cx) { let this = self.project();
let poll_out = match TryStream::try_poll_next(this.inner, cx) {
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(TransportTimeoutError::Other(err)))), Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(TransportTimeoutError::Other(err)))),
Poll::Ready(Some(Ok(v))) => v, Poll::Ready(Some(Ok(v))) => v,
Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
}; };
let timeout = *this.timeout;
let event = poll_out.map(move |inner_fut| { let event = poll_out.map(move |inner_fut| {
Timeout { Timeout {
inner: inner_fut, inner: inner_fut,
timer: Delay::new(self.timeout), timer: Delay::new(timeout),
} }
}); });
@ -142,31 +144,35 @@ where
/// `TransportTimeoutError<Err>`. /// `TransportTimeoutError<Err>`.
// TODO: can be replaced with `impl Future` once `impl Trait` are fully stable in Rust // TODO: can be replaced with `impl Future` once `impl Trait` are fully stable in Rust
// (https://github.com/rust-lang/rust/issues/34511) // (https://github.com/rust-lang/rust/issues/34511)
#[pin_project::pin_project]
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub struct Timeout<InnerFut> { pub struct Timeout<InnerFut> {
#[pin]
inner: InnerFut, inner: InnerFut,
timer: Delay, timer: Delay,
} }
impl<InnerFut> Future for Timeout<InnerFut> impl<InnerFut> Future for Timeout<InnerFut>
where where
InnerFut: TryFuture + Unpin, InnerFut: TryFuture,
{ {
type Output = Result<InnerFut::Ok, TransportTimeoutError<InnerFut::Error>>; type Output = Result<InnerFut::Ok, TransportTimeoutError<InnerFut::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// It is debatable whether we should poll the inner future first or the timer first. // It is debatable whether we should poll the inner future first or the timer first.
// For example, if you start dialing with a timeout of 10 seconds, then after 15 seconds // For example, if you start dialing with a timeout of 10 seconds, then after 15 seconds
// the dialing succeeds on the wire, then after 20 seconds you poll, then depending on // the dialing succeeds on the wire, then after 20 seconds you poll, then depending on
// which gets polled first, the outcome will be success or failure. // which gets polled first, the outcome will be success or failure.
match TryFuture::try_poll(Pin::new(&mut self.inner), cx) { let mut this = self.project();
match TryFuture::try_poll(this.inner, cx) {
Poll::Pending => {}, Poll::Pending => {},
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
Poll::Ready(Err(err)) => return Poll::Ready(Err(TransportTimeoutError::Other(err))), Poll::Ready(Err(err)) => return Poll::Ready(Err(TransportTimeoutError::Other(err))),
} }
match TryFuture::try_poll(Pin::new(&mut self.timer), cx) { match TryFuture::try_poll(Pin::new(&mut this.timer), cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => Poll::Ready(Err(TransportTimeoutError::Timeout)), Poll::Ready(Ok(())) => Poll::Ready(Err(TransportTimeoutError::Timeout)),
Poll::Ready(Err(err)) => Poll::Ready(Err(TransportTimeoutError::TimerError(err))), Poll::Ready(Err(err)) => Poll::Ready(Err(TransportTimeoutError::TimerError(err))),

View File

@ -48,7 +48,6 @@ where
impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgrade<U, F> impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgrade<U, F>
where where
U: InboundUpgrade<C>, U: InboundUpgrade<C>,
U::Future: Unpin,
F: FnOnce(U::Output) -> T F: FnOnce(U::Output) -> T
{ {
type Output = T; type Output = T;
@ -66,7 +65,6 @@ where
impl<C, U, F> OutboundUpgrade<C> for MapInboundUpgrade<U, F> impl<C, U, F> OutboundUpgrade<C> for MapInboundUpgrade<U, F>
where where
U: OutboundUpgrade<C>, U: OutboundUpgrade<C>,
U::Future: Unpin,
{ {
type Output = U::Output; type Output = U::Output;
type Error = U::Error; type Error = U::Error;
@ -102,7 +100,6 @@ where
impl<C, U, F> InboundUpgrade<C> for MapOutboundUpgrade<U, F> impl<C, U, F> InboundUpgrade<C> for MapOutboundUpgrade<U, F>
where where
U: InboundUpgrade<C>, U: InboundUpgrade<C>,
U::Future: Unpin,
{ {
type Output = U::Output; type Output = U::Output;
type Error = U::Error; type Error = U::Error;
@ -116,7 +113,6 @@ where
impl<C, U, F, T> OutboundUpgrade<C> for MapOutboundUpgrade<U, F> impl<C, U, F, T> OutboundUpgrade<C> for MapOutboundUpgrade<U, F>
where where
U: OutboundUpgrade<C>, U: OutboundUpgrade<C>,
U::Future: Unpin,
F: FnOnce(U::Output) -> T F: FnOnce(U::Output) -> T
{ {
type Output = T; type Output = T;
@ -156,7 +152,6 @@ where
impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgradeErr<U, F> impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgradeErr<U, F>
where where
U: InboundUpgrade<C>, U: InboundUpgrade<C>,
U::Future: Unpin,
F: FnOnce(U::Error) -> T F: FnOnce(U::Error) -> T
{ {
type Output = U::Output; type Output = U::Output;
@ -174,7 +169,6 @@ where
impl<C, U, F> OutboundUpgrade<C> for MapInboundUpgradeErr<U, F> impl<C, U, F> OutboundUpgrade<C> for MapInboundUpgradeErr<U, F>
where where
U: OutboundUpgrade<C>, U: OutboundUpgrade<C>,
U::Future: Unpin,
{ {
type Output = U::Output; type Output = U::Output;
type Error = U::Error; type Error = U::Error;
@ -210,7 +204,6 @@ where
impl<C, U, F, T> OutboundUpgrade<C> for MapOutboundUpgradeErr<U, F> impl<C, U, F, T> OutboundUpgrade<C> for MapOutboundUpgradeErr<U, F>
where where
U: OutboundUpgrade<C>, U: OutboundUpgrade<C>,
U::Future: Unpin,
F: FnOnce(U::Error) -> T F: FnOnce(U::Error) -> T
{ {
type Output = U::Output; type Output = U::Output;
@ -238,54 +231,54 @@ where
} }
} }
#[pin_project::pin_project]
pub struct MapFuture<TInnerFut, TMap> { pub struct MapFuture<TInnerFut, TMap> {
#[pin]
inner: TInnerFut, inner: TInnerFut,
map: Option<TMap>, map: Option<TMap>,
} }
impl<TInnerFut, TMap> Unpin for MapFuture<TInnerFut, TMap> {
}
impl<TInnerFut, TIn, TMap, TOut> Future for MapFuture<TInnerFut, TMap> impl<TInnerFut, TIn, TMap, TOut> Future for MapFuture<TInnerFut, TMap>
where where
TInnerFut: TryFuture<Ok = TIn> + Unpin, TInnerFut: TryFuture<Ok = TIn>,
TMap: FnOnce(TIn) -> TOut, TMap: FnOnce(TIn) -> TOut,
{ {
type Output = Result<TOut, TInnerFut::Error>; type Output = Result<TOut, TInnerFut::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let item = match TryFuture::try_poll(Pin::new(&mut self.inner), cx) { let this = self.project();
let item = match TryFuture::try_poll(this.inner, cx) {
Poll::Ready(Ok(v)) => v, Poll::Ready(Ok(v)) => v,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
}; };
let map = self.map.take().expect("Future has already finished"); let map = this.map.take().expect("Future has already finished");
Poll::Ready(Ok(map(item))) Poll::Ready(Ok(map(item)))
} }
} }
#[pin_project::pin_project]
pub struct MapErrFuture<T, F> { pub struct MapErrFuture<T, F> {
#[pin]
fut: T, fut: T,
fun: Option<F>, fun: Option<F>,
} }
impl<T, F> Unpin for MapErrFuture<T, F> {
}
impl<T, E, F, A> Future for MapErrFuture<T, F> impl<T, E, F, A> Future for MapErrFuture<T, F>
where where
T: TryFuture<Error = E> + Unpin, T: TryFuture<Error = E>,
F: FnOnce(E) -> A, F: FnOnce(E) -> A,
{ {
type Output = Result<T::Ok, A>; type Output = Result<T::Ok, A>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match TryFuture::try_poll(Pin::new(&mut self.fut), cx) { let this = self.project();
match TryFuture::try_poll(this.fut, cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Ok(x)) => Poll::Ready(Ok(x)), Poll::Ready(Ok(x)) => Poll::Ready(Ok(x)),
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let f = self.fun.take().expect("Future has not resolved yet"); let f = this.fun.take().expect("Future has not resolved yet");
Poll::Ready(Err(f(e))) Poll::Ready(Err(f(e)))
} }
} }

View File

@ -144,7 +144,6 @@ pub trait InboundUpgrade<C>: UpgradeInfo {
/// Possible error during the handshake. /// Possible error during the handshake.
type Error; type Error;
/// Future that performs the handshake with the remote. /// Future that performs the handshake with the remote.
// TODO: remove Unpin
type Future: Future<Output = Result<Self::Output, Self::Error>> + Unpin; type Future: Future<Output = Result<Self::Output, Self::Error>> + Unpin;
/// After we have determined that the remote supports one of the protocols we support, this /// After we have determined that the remote supports one of the protocols we support, this
@ -185,7 +184,6 @@ pub trait OutboundUpgrade<C>: UpgradeInfo {
/// Possible error during the handshake. /// Possible error during the handshake.
type Error; type Error;
/// Future that performs the handshake with the remote. /// Future that performs the handshake with the remote.
// TODO: remove Unpin
type Future: Future<Output = Result<Self::Output, Self::Error>> + Unpin; type Future: Future<Output = Result<Self::Output, Self::Error>> + Unpin;
/// After we have determined that the remote supports one of the protocols we support, this /// After we have determined that the remote supports one of the protocols we support, this

View File

@ -59,9 +59,7 @@ where
impl<C, A, B, TA, TB, EA, EB> InboundUpgrade<C> for SelectUpgrade<A, B> impl<C, A, B, TA, TB, EA, EB> InboundUpgrade<C> for SelectUpgrade<A, B>
where where
A: InboundUpgrade<C, Output = TA, Error = EA>, A: InboundUpgrade<C, Output = TA, Error = EA>,
<A as InboundUpgrade<C>>::Future: Unpin,
B: InboundUpgrade<C, Output = TB, Error = EB>, B: InboundUpgrade<C, Output = TB, Error = EB>,
<B as InboundUpgrade<C>>::Future: Unpin,
{ {
type Output = EitherOutput<TA, TB>; type Output = EitherOutput<TA, TB>;
type Error = EitherError<EA, EB>; type Error = EitherError<EA, EB>;