From 3f968cbf92d5939238b798d9ab1d5266e3edea20 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 Jan 2020 12:03:10 +0100 Subject: [PATCH] Remove some Unpin requirements on Futures (#1384) * Remove lots of Unpin requirements * Make Transport::and_then accept pinned futures * Finish the PR * Work on secio * Fix BandwidthTransport * Adjust ListenersStrema * Fix nodes/tasks * Fix nodes * Various more fixes * Fix yamux * Fix Swarm * Fix WebSockets * Fix rw-stream-sink --- Cargo.toml | 1 + core/src/either.rs | 8 +-- core/src/nodes/collection.rs | 2 +- core/src/nodes/listeners.rs | 53 ++++++++--------- core/src/nodes/network.rs | 25 ++++----- core/src/nodes/tasks/manager.rs | 2 +- core/src/nodes/tasks/task.rs | 7 ++- core/src/transport/and_then.rs | 56 +++++++++--------- core/src/transport/upgrade.rs | 59 ++++++++----------- core/src/upgrade/apply.rs | 10 ++-- core/src/upgrade/mod.rs | 4 +- misc/rw-stream-sink/Cargo.toml | 1 + misc/rw-stream-sink/src/lib.rs | 29 ++++++---- muxers/yamux/src/lib.rs | 10 +++- protocols/secio/Cargo.toml | 1 + protocols/secio/src/codec/decode.rs | 50 ++++++++++------- protocols/secio/src/codec/encode.rs | 35 +++++++----- protocols/secio/src/codec/len_prefix.rs | 7 ++- src/bandwidth.rs | 75 +++++++++++++++---------- src/simple.rs | 4 +- swarm/src/lib.rs | 22 ++++---- transports/websocket/src/framed.rs | 2 +- transports/websocket/src/lib.rs | 2 +- 23 files changed, 249 insertions(+), 216 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1e1917d9..b8813dfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ libp2p-uds = { version = "0.14.0-alpha.1", path = "transports/uds" } libp2p-wasm-ext = { version = "0.7.0-alpha.1", path = "transports/wasm-ext" } libp2p-yamux = { version = "0.14.0-alpha.1", path = "muxers/yamux" } parking_lot = "0.10.0" +pin-project = "0.4.6" smallvec = "1.0" wasm-timer = "0.2.4" diff --git a/core/src/either.rs b/core/src/either.rs index 8e084155..a3c24ade 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -155,8 +155,8 @@ where impl Sink for EitherOutput where - A: Sink + Unpin, - B: Sink + Unpin, + A: Sink, + B: Sink, { type Error = EitherError; @@ -414,8 +414,8 @@ pub enum EitherFuture2 { A(#[pin] A), B(#[pin] B) } impl Future for EitherFuture2 where - AFut: TryFuture + Unpin, - BFut: TryFuture + Unpin, + AFut: TryFuture, + BFut: TryFuture, { type Output = Result, EitherError>; diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 596ad3b1..f1f003d3 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -322,7 +322,7 @@ where pub fn add_reach_attempt(&mut self, future: TFut, handler: THandler) -> ReachAttemptId where - TFut: Future> + Unpin + Send + 'static, + TFut: Future> + Send + 'static, THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, diff --git a/core/src/nodes/listeners.rs b/core/src/nodes/listeners.rs index 13054fea..22c9a9ee 100644 --- a/core/src/nodes/listeners.rs +++ b/core/src/nodes/listeners.rs @@ -60,7 +60,7 @@ use std::{collections::VecDeque, fmt, pin::Pin}; /// ListenersEvent::AddressExpired { listener_id, listen_addr } => { /// println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr); /// }, -/// ListenersEvent::Closed { listener_id, .. } => { +/// ListenersEvent::Closed { listener_id } => { /// println!("Listener {:?} has been closed", listener_id); /// }, /// ListenersEvent::Error { listener_id, error } => { @@ -84,7 +84,9 @@ where /// Transport used to spawn listeners. transport: TTrans, /// All the active listeners. - listeners: VecDeque>, + /// The `Listener` struct contains a stream that we want to be pinned. Since the `VecDeque` + /// can be resized, the only way is to use a `Pin>`. + listeners: VecDeque>>>, /// The next listener ID to assign. next_id: ListenerId } @@ -97,6 +99,7 @@ where pub struct ListenerId(u64); /// A single active listener. +#[pin_project::pin_project] #[derive(Debug)] struct Listener where @@ -105,6 +108,7 @@ where /// The ID of this listener. id: ListenerId, /// The object that actually listens. + #[pin] listener: TTrans::Listener, /// Addresses it is listening on. addresses: SmallVec<[Multiaddr; 4]> @@ -144,8 +148,6 @@ where Closed { /// The ID of the listener that closed. listener_id: ListenerId, - /// The listener that closed. - listener: TTrans::Listener, }, /// A listener errored. /// @@ -190,22 +192,25 @@ where TTrans: Clone, { let listener = self.transport.clone().listen_on(addr)?; - self.listeners.push_back(Listener { + self.listeners.push_back(Box::pin(Listener { id: self.next_id, listener, addresses: SmallVec::new() - }); + })); let id = self.next_id; self.next_id = ListenerId(self.next_id.0 + 1); Ok(id) } /// Remove the listener matching the given `ListenerId`. - pub fn remove_listener(&mut self, id: ListenerId) -> Option { + /// + /// Return `Ok(())` if a listener with this ID was in the list. + pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> { if let Some(i) = self.listeners.iter().position(|l| l.id == id) { - self.listeners.remove(i).map(|l| l.listener) + self.listeners.remove(i); + Ok(()) } else { - None + Err(()) } } @@ -220,21 +225,19 @@ where } /// Provides an API similar to `Stream`, except that it cannot end. - pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> - where - TTrans::Listener: Unpin, - { + pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { // We remove each element from `listeners` one by one and add them back. let mut remaining = self.listeners.len(); while let Some(mut listener) = self.listeners.pop_back() { - match TryStream::try_poll_next(Pin::new(&mut listener.listener), cx) { + let mut listener_project = listener.as_mut().project(); + match TryStream::try_poll_next(listener_project.listener.as_mut(), cx) { Poll::Pending => { self.listeners.push_front(listener); remaining -= 1; if remaining == 0 { break } } Poll::Ready(Some(Ok(ListenerEvent::Upgrade { upgrade, local_addr, remote_addr }))) => { - let id = listener.id; + let id = *listener_project.id; self.listeners.push_front(listener); return Poll::Ready(ListenersEvent::Incoming { listener_id: id, @@ -244,13 +247,13 @@ where }) } Poll::Ready(Some(Ok(ListenerEvent::NewAddress(a)))) => { - if listener.addresses.contains(&a) { + if listener_project.addresses.contains(&a) { debug!("Transport has reported address {} multiple times", a) } - if !listener.addresses.contains(&a) { - listener.addresses.push(a.clone()); + if !listener_project.addresses.contains(&a) { + listener_project.addresses.push(a.clone()); } - let id = listener.id; + let id = *listener_project.id; self.listeners.push_front(listener); return Poll::Ready(ListenersEvent::NewAddress { listener_id: id, @@ -258,8 +261,8 @@ where }) } Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(a)))) => { - listener.addresses.retain(|x| x != &a); - let id = listener.id; + listener_project.addresses.retain(|x| x != &a); + let id = *listener_project.id; self.listeners.push_front(listener); return Poll::Ready(ListenersEvent::AddressExpired { listener_id: id, @@ -268,13 +271,12 @@ where } Poll::Ready(None) => { return Poll::Ready(ListenersEvent::Closed { - listener_id: listener.id, - listener: listener.listener + listener_id: *listener_project.id, }) } Poll::Ready(Some(Err(err))) => { return Poll::Ready(ListenersEvent::Error { - listener_id: listener.id, + listener_id: *listener_project.id, error: err }) } @@ -289,7 +291,6 @@ where impl Stream for ListenersStream where TTrans: Transport, - TTrans::Listener: Unpin, { type Item = ListenersEvent; @@ -338,7 +339,7 @@ where .field("listener_id", listener_id) .field("local_addr", local_addr) .finish(), - ListenersEvent::Closed { listener_id, .. } => f + ListenersEvent::Closed { listener_id } => f .debug_struct("ListenersEvent::Closed") .field("listener_id", listener_id) .finish(), diff --git a/core/src/nodes/network.rs b/core/src/nodes/network.rs index 3975b021..db8630fe 100644 --- a/core/src/nodes/network.rs +++ b/core/src/nodes/network.rs @@ -171,8 +171,6 @@ where ListenerClosed { /// The listener ID that closed. listener_id: ListenerId, - /// The listener which closed. - listener: TTrans::Listener, }, /// One of the listeners errored. @@ -309,7 +307,7 @@ where .field("listen_addr", listen_addr) .finish() } - NetworkEvent::ListenerClosed { listener_id, .. } => { + NetworkEvent::ListenerClosed { listener_id } => { f.debug_struct("ListenerClosed") .field("listener_id", listener_id) .finish() @@ -580,7 +578,7 @@ impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo, where TTrans: Transport, TTrans::Error: Send + 'static, - TTrans::ListenerUpgrade: Unpin + Send + 'static, + TTrans::ListenerUpgrade: Send + 'static, THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary @@ -735,7 +733,9 @@ where } /// Remove a previously added listener. - pub fn remove_listener(&mut self, id: ListenerId) -> Option { + /// + /// Returns `Ok(())` if a listener with this ID was in the list. + pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> { self.listeners.remove_listener(id) } @@ -788,7 +788,7 @@ where where TTrans: Transport, TTrans::Error: Send + 'static, - TTrans::Dial: Unpin + Send + 'static, + TTrans::Dial: Send + 'static, TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TInEvent: Send + 'static, @@ -936,7 +936,7 @@ where fn start_dial_out(&mut self, peer_id: TPeerId, handler: THandler, first: Multiaddr, rest: Vec) where TTrans: Transport, - TTrans::Dial: Unpin + Send + 'static, + TTrans::Dial: Send + 'static, TTrans::Error: Send + 'static, TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, @@ -982,8 +982,7 @@ where where TTrans: Transport, TTrans::Error: Send + 'static, - TTrans::Dial: Unpin + Send + 'static, - TTrans::Listener: Unpin, + TTrans::Dial: Send + 'static, TTrans::ListenerUpgrade: Send + 'static, TMuxer: Send + Sync + 'static, TMuxer::OutboundSubstream: Send, @@ -1021,8 +1020,8 @@ where Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => { return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) } - Poll::Ready(ListenersEvent::Closed { listener_id, listener }) => { - return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, listener }) + Poll::Ready(ListenersEvent::Closed { listener_id }) => { + return Poll::Ready(NetworkEvent::ListenerClosed { listener_id }) } Poll::Ready(ListenersEvent::Error { listener_id, error }) => { return Poll::Ready(NetworkEvent::ListenerError { listener_id, error }) @@ -1462,7 +1461,7 @@ impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, where TTrans: Transport + Clone, TTrans::Error: Send + 'static, - TTrans::Dial: Unpin + Send + 'static, + TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, @@ -1759,7 +1758,7 @@ impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo, where TTrans: Transport + Clone, TTrans::Error: Send + 'static, - TTrans::Dial: Unpin + Send + 'static, + TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, diff --git a/core/src/nodes/tasks/manager.rs b/core/src/nodes/tasks/manager.rs index dbfe485a..aa94e267 100644 --- a/core/src/nodes/tasks/manager.rs +++ b/core/src/nodes/tasks/manager.rs @@ -157,7 +157,7 @@ impl Manager { /// processing the node's events. pub fn add_reach_attempt(&mut self, future: F, user_data: T, handler: H) -> TaskId where - F: Future> + Unpin + Send + 'static, + F: Future> + Send + 'static, H: IntoNodeHandler + Send + 'static, H::Handler: NodeHandler, InEvent = I, OutEvent = O, Error = HE> + Send + 'static, E: error::Error + Send + 'static, diff --git a/core/src/nodes/tasks/task.rs b/core/src/nodes/tasks/task.rs index 992a59bf..24c4a280 100644 --- a/core/src/nodes/tasks/task.rs +++ b/core/src/nodes/tasks/task.rs @@ -92,7 +92,7 @@ where id: i, sender: s, receiver: r.fuse(), - state: State::Future { future: f, handler: h, events_buffer: Vec::new() }, + state: State::Future { future: Box::pin(f), handler: h, events_buffer: Vec::new() }, taken_over: SmallVec::new() } } @@ -124,7 +124,8 @@ where /// Future to resolve to connect to the node. Future { /// The future that will attempt to reach the node. - future: F, + // TODO: don't pin this Future; this requires deeper changes though + future: Pin>, /// The handler that will be used to build the `HandledNode`. handler: H, /// While we are dialing the future, we need to buffer the events received on @@ -163,7 +164,7 @@ where impl Future for Task where M: StreamMuxer, - F: Future> + Unpin, + F: Future>, H: IntoNodeHandler, H::Handler: NodeHandler, InEvent = I, OutEvent = O> { diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index a2e7ed61..7f8a52b0 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -25,7 +25,7 @@ use crate::{ }; use futures::{future::Either, prelude::*}; use multiaddr::Multiaddr; -use std::{error, pin::Pin, task::Context, task::Poll}; +use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll}; /// See the `Transport::and_then` method. #[derive(Debug, Clone)] @@ -40,11 +40,8 @@ impl AndThen { impl Transport for AndThen where T: Transport, - T::Dial: Unpin, - T::Listener: Unpin, - T::ListenerUpgrade: Unpin, C: FnOnce(T::Output, ConnectedPoint) -> F + Clone, - F: TryFuture + Unpin, + F: TryFuture, F::Error: error::Error, { type Output = O; @@ -66,8 +63,9 @@ where fn dial(self, addr: Multiaddr) -> Result> { let dialed_fut = self.transport.dial(addr.clone()).map_err(|err| err.map(EitherError::A))?; let future = AndThenFuture { - inner: Either::Left(dialed_fut), - args: Some((self.fun, ConnectedPoint::Dialer { address: addr })) + inner: Either::Left(Box::pin(dialed_fut)), + args: Some((self.fun, ConnectedPoint::Dialer { address: addr })), + marker: PhantomPinned, }; Ok(future) } @@ -76,18 +74,17 @@ where /// Custom `Stream` to avoid boxing. /// /// Applies a function to every stream item. +#[pin_project::pin_project] #[derive(Debug, Clone)] pub struct AndThenStream { + #[pin] stream: TListener, fun: TMap } -impl Unpin for AndThenStream { -} - impl Stream for AndThenStream where - TListener: TryStream, Error = TTransErr> + Unpin, + TListener: TryStream, Error = TTransErr>, TListUpgr: TryFuture, TMap: FnOnce(TTransOut, ConnectedPoint) -> TMapOut + Clone, TMapOut: TryFuture @@ -97,8 +94,9 @@ where EitherError >; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match TryStream::try_poll_next(Pin::new(&mut self.stream), cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + match TryStream::try_poll_next(this.stream, cx) { Poll::Ready(Some(Ok(event))) => { let event = match event { ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => { @@ -108,8 +106,9 @@ where }; ListenerEvent::Upgrade { upgrade: AndThenFuture { - inner: Either::Left(upgrade), - args: Some((self.fun.clone(), point)) + inner: Either::Left(Box::pin(upgrade)), + args: Some((this.fun.clone(), point)), + marker: PhantomPinned, }, local_addr, remote_addr @@ -132,26 +131,24 @@ where /// Applies a function to the result of the inner future. #[derive(Debug)] pub struct AndThenFuture { - inner: Either, - args: Option<(TMap, ConnectedPoint)> -} - -impl Unpin for AndThenFuture { + inner: Either>, Pin>>, + args: Option<(TMap, ConnectedPoint)>, + marker: PhantomPinned, } impl Future for AndThenFuture where - TFut: TryFuture + Unpin, + TFut: TryFuture, TMap: FnOnce(TFut::Ok, ConnectedPoint) -> TMapOut, - TMapOut: TryFuture + Unpin + TMapOut: TryFuture, { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { loop { - let future = match (*self).inner { - Either::Left(ref mut future) => { - let item = match TryFuture::try_poll(Pin::new(future), cx) { + let future = match &mut self.inner { + Either::Left(future) => { + let item = match TryFuture::try_poll(future.as_mut(), cx) { Poll::Ready(Ok(v)) => v, Poll::Ready(Err(err)) => return Poll::Ready(Err(EitherError::A(err))), Poll::Pending => return Poll::Pending, @@ -159,8 +156,8 @@ where let (f, a) = self.args.take().expect("AndThenFuture has already finished."); f(item, a) } - Either::Right(ref mut future) => { - return match TryFuture::try_poll(Pin::new(future), cx) { + Either::Right(future) => { + return match TryFuture::try_poll(future.as_mut(), cx) { Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), Poll::Ready(Err(err)) => return Poll::Ready(Err(EitherError::B(err))), Poll::Pending => Poll::Pending, @@ -168,7 +165,10 @@ where } }; - (*self).inner = Either::Right(future); + self.inner = Either::Right(Box::pin(future)); } } } + +impl Unpin for AndThenFuture { +} diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 335c2220..64a182a6 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -101,9 +101,6 @@ where AndThen Authenticate + Clone> > where T: Transport, - T::Dial: Unpin, - T::Listener: Unpin, - T::ListenerUpgrade: Unpin, I: ConnectionInfo, C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, @@ -133,9 +130,6 @@ where pub fn apply(self, upgrade: U) -> Builder> where T: Transport, - T::Dial: Unpin, - T::Listener: Unpin, - T::ListenerUpgrade: Unpin, C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, I: ConnectionInfo, @@ -161,9 +155,6 @@ where -> AndThen Multiplex + Clone> where T: Transport, - T::Dial: Unpin, - T::Listener: Unpin, - T::ListenerUpgrade: Unpin, C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, I: ConnectionInfo, @@ -183,11 +174,13 @@ where /// in the context of negotiating a secure channel. /// /// Configured through [`Builder::authenticate`]. +#[pin_project::pin_project] pub struct Authenticate where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade> + OutboundUpgrade> { + #[pin] inner: EitherUpgrade } @@ -201,8 +194,9 @@ where { type Output = as Future>::Output; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - Future::poll(Pin::new(&mut self.inner), cx) + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + Future::poll(this.inner, cx) } } @@ -210,12 +204,14 @@ where /// top of an authenticated transport. /// /// Configured through [`Builder::multiplex`]. +#[pin_project::pin_project] pub struct Multiplex where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade> + OutboundUpgrade>, { info: Option, + #[pin] upgrade: EitherUpgrade, } @@ -227,23 +223,17 @@ where { type Output = Result<(I, M), UpgradeError>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let m = match ready!(Future::poll(Pin::new(&mut self.upgrade), cx)) { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + let m = match ready!(Future::poll(this.upgrade, cx)) { Ok(m) => m, Err(err) => return Poll::Ready(Err(err)), }; - let i = self.info.take().expect("Multiplex future polled after completion."); + let i = this.info.take().expect("Multiplex future polled after completion."); Poll::Ready(Ok((i, m))) } } -impl Unpin for Multiplex -where - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade> + OutboundUpgrade>, -{ -} - /// An inbound or outbound upgrade. type EitherUpgrade = future::Either, OutboundUpgradeApply>; @@ -262,9 +252,6 @@ impl Upgrade { impl Transport for Upgrade where T: Transport, - T::Dial: Unpin, - T::Listener: Unpin, - T::ListenerUpgrade: Unpin, T::Error: 'static, C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade, Output = D, Error = E>, @@ -281,7 +268,7 @@ where let future = self.inner.dial(addr.clone()) .map_err(|err| err.map(TransportUpgradeError::Transport))?; Ok(DialUpgradeFuture { - future, + future: Box::pin(future), upgrade: future::Either::Left(Some(self.upgrade)) }) } @@ -290,7 +277,7 @@ where let stream = self.inner.listen_on(addr) .map_err(|err| err.map(TransportUpgradeError::Transport))?; Ok(ListenerStream { - stream, + stream: Box::pin(stream), upgrade: self.upgrade }) } @@ -337,13 +324,13 @@ where U: OutboundUpgrade>, C: AsyncRead + AsyncWrite + Unpin, { - future: F, + future: Pin>, upgrade: future::Either, (Option, OutboundUpgradeApply)> } impl Future for DialUpgradeFuture where - F: TryFuture + Unpin, + F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade, Output = D>, U::Error: Error @@ -358,7 +345,7 @@ where loop { this.upgrade = match this.upgrade { future::Either::Left(ref mut up) => { - let (i, c) = match ready!(TryFuture::try_poll(Pin::new(&mut this.future), cx).map_err(TransportUpgradeError::Transport)) { + let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) { Ok(v) => v, Err(err) => return Poll::Ready(Err(err)), }; @@ -387,13 +374,13 @@ where /// The [`Transport::Listener`] stream of an [`Upgrade`]d transport. pub struct ListenerStream { - stream: S, + stream: Pin>, upgrade: U } impl Stream for ListenerStream where - S: TryStream> + Unpin, + S: TryStream>, F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade, Output = D> + Clone @@ -401,11 +388,11 @@ where type Item = Result>, TransportUpgradeError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match ready!(TryStream::try_poll_next(Pin::new(&mut self.stream), cx)) { + match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) { Some(Ok(event)) => { let event = event.map(move |future| { ListenerUpgradeFuture { - future, + future: Box::pin(future), upgrade: future::Either::Left(Some(self.upgrade.clone())) } }); @@ -428,13 +415,13 @@ where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade> { - future: F, + future: Pin>, upgrade: future::Either, (Option, InboundUpgradeApply)> } impl Future for ListenerUpgradeFuture where - F: TryFuture + Unpin, + F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade, Output = D>, U::Error: Error @@ -449,7 +436,7 @@ where loop { this.upgrade = match this.upgrade { future::Either::Left(ref mut up) => { - let (i, c) = match ready!(TryFuture::try_poll(Pin::new(&mut this.future), cx).map_err(TransportUpgradeError::Transport)) { + let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) { Ok(v) => v, Err(err) => return Poll::Ready(Err(err)) }; diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index 6756003b..21976683 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -86,7 +86,7 @@ where upgrade: U, }, Upgrade { - future: U::Future + future: Pin> }, Undefined } @@ -102,7 +102,6 @@ impl Future for InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: InboundUpgrade>, - U::Future: Unpin, { type Output = Result>; @@ -118,7 +117,7 @@ where } }; self.inner = InboundUpgradeApplyState::Upgrade { - future: upgrade.upgrade_inbound(Compat01As03::new(io), info.0) + future: Box::pin(upgrade.upgrade_inbound(Compat01As03::new(io), info.0)) }; } InboundUpgradeApplyState::Upgrade { mut future } => { @@ -163,7 +162,7 @@ where upgrade: U }, Upgrade { - future: U::Future + future: Pin> }, Undefined } @@ -179,7 +178,6 @@ impl Future for OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, U: OutboundUpgrade>, - U::Future: Unpin, { type Output = Result>; @@ -195,7 +193,7 @@ where } }; self.inner = OutboundUpgradeApplyState::Upgrade { - future: upgrade.upgrade_outbound(Compat01As03::new(connection), info.0) + future: Box::pin(upgrade.upgrade_outbound(Compat01As03::new(connection), info.0)) }; } OutboundUpgradeApplyState::Upgrade { mut future } => { diff --git a/core/src/upgrade/mod.rs b/core/src/upgrade/mod.rs index e0f97170..cc870693 100644 --- a/core/src/upgrade/mod.rs +++ b/core/src/upgrade/mod.rs @@ -144,7 +144,7 @@ pub trait InboundUpgrade: UpgradeInfo { /// Possible error during the handshake. type Error; /// Future that performs the handshake with the remote. - type Future: Future> + Unpin; + type Future: Future>; /// After we have determined that the remote supports one of the protocols we support, this /// method is called to start the handshake. @@ -184,7 +184,7 @@ pub trait OutboundUpgrade: UpgradeInfo { /// Possible error during the handshake. type Error; /// Future that performs the handshake with the remote. - type Future: Future> + Unpin; + type Future: Future>; /// After we have determined that the remote supports one of the protocols we support, this /// method is called to start the handshake. diff --git a/misc/rw-stream-sink/Cargo.toml b/misc/rw-stream-sink/Cargo.toml index bcfabc35..76464be5 100644 --- a/misc/rw-stream-sink/Cargo.toml +++ b/misc/rw-stream-sink/Cargo.toml @@ -11,6 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" +pin-project = "0.4.6" static_assertions = "1" [dev-dependencies] diff --git a/misc/rw-stream-sink/src/lib.rs b/misc/rw-stream-sink/src/lib.rs index 80f919f2..69b30205 100644 --- a/misc/rw-stream-sink/src/lib.rs +++ b/misc/rw-stream-sink/src/lib.rs @@ -32,7 +32,9 @@ static_assertions::const_assert!(std::mem::size_of::() <= std::mem::size_ /// Wraps a [`Stream`] and [`Sink`] whose items are buffers. /// Implements [`AsyncRead`] and [`AsyncWrite`]. +#[pin_project::pin_project] pub struct RwStreamSink { + #[pin] inner: S, current_item: Option::Ok>> } @@ -49,15 +51,17 @@ where S: TryStream + Unpin, ::Ok: AsRef<[u8]> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let mut this = self.project(); + // Grab the item to copy from. let item_to_copy = loop { - if let Some(ref mut i) = self.current_item { + if let Some(ref mut i) = this.current_item { if i.position() < i.get_ref().as_ref().len() as u64 { break i } } - self.current_item = Some(match ready!(self.inner.try_poll_next_unpin(cx)) { + *this.current_item = Some(match ready!(this.inner.as_mut().try_poll_next(cx)) { Some(Ok(i)) => std::io::Cursor::new(i), Some(Err(e)) => return Poll::Ready(Err(e)), None => return Poll::Ready(Ok(0)) // EOF @@ -74,26 +78,27 @@ where S: TryStream + Sink<::Ok, Error = io::Error> + Unpin, ::Ok: for<'r> From<&'r [u8]> { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - ready!(Pin::new(&mut self.inner).poll_ready(cx)?); + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let mut this = self.project(); + ready!(this.inner.as_mut().poll_ready(cx)?); let n = buf.len(); - if let Err(e) = Pin::new(&mut self.inner).start_send(buf.into()) { + if let Err(e) = this.inner.start_send(buf.into()) { return Poll::Ready(Err(e)) } Poll::Ready(Ok(n)) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + this.inner.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.inner).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) } } -impl Unpin for RwStreamSink {} - #[cfg(test)] mod tests { use async_std::task; diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 07325331..bc8db663 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -288,7 +288,7 @@ impl fmt::Debug for LocalIncoming { } } -impl Stream for Incoming { +impl Stream for Incoming { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll> { @@ -300,7 +300,10 @@ impl Stream for Incoming { } } -impl Stream for LocalIncoming { +impl Unpin for Incoming { +} + +impl Stream for LocalIncoming { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll> { @@ -311,3 +314,6 @@ impl Stream for LocalIncoming { self.stream.size_hint() } } + +impl Unpin for LocalIncoming { +} \ No newline at end of file diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index 82222431..ada91d28 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -18,6 +18,7 @@ hmac = "0.7.0" lazy_static = "1.2.0" libp2p-core = { version = "0.14.0-alpha.1", path = "../../core" } log = "0.4.6" +pin-project = "0.4.6" protobuf = "=2.8.1" # note: see https://github.com/libp2p/rust-libp2p/issues/1363 quicksink = "0.1" rand = "0.7" diff --git a/protocols/secio/src/codec/decode.rs b/protocols/secio/src/codec/decode.rs index 14edb8ef..04bbad56 100644 --- a/protocols/secio/src/codec/decode.rs +++ b/protocols/secio/src/codec/decode.rs @@ -35,9 +35,11 @@ use std::{cmp::min, pin::Pin, task::Context, task::Poll}; /// frames isn't handled by this module. /// /// Also implements `Sink` for convenience. +#[pin_project::pin_project] pub struct DecoderMiddleware { cipher_state: StreamCipher, hmac: Hmac, + #[pin] raw_stream: S, nonce: Vec } @@ -59,29 +61,31 @@ impl DecoderMiddleware { impl Stream for DecoderMiddleware where - S: TryStream> + Unpin, + S: TryStream>, S::Error: Into, { type Item = Result, SecioError>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let frame = match TryStream::try_poll_next(Pin::new(&mut self.raw_stream), cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + + let frame = match TryStream::try_poll_next(this.raw_stream, cx) { Poll::Ready(Some(Ok(t))) => t, Poll::Ready(None) => return Poll::Ready(None), Poll::Pending => return Poll::Pending, Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))), }; - if frame.len() < self.hmac.num_bytes() { + if frame.len() < this.hmac.num_bytes() { debug!("frame too short when decoding secio frame"); return Poll::Ready(Some(Err(SecioError::FrameTooShort))); } - let content_length = frame.len() - self.hmac.num_bytes(); + let content_length = frame.len() - this.hmac.num_bytes(); { let (crypted_data, expected_hash) = frame.split_at(content_length); - debug_assert_eq!(expected_hash.len(), self.hmac.num_bytes()); + debug_assert_eq!(expected_hash.len(), this.hmac.num_bytes()); - if self.hmac.verify(crypted_data, expected_hash).is_err() { + if this.hmac.verify(crypted_data, expected_hash).is_err() { debug!("hmac mismatch when decoding secio frame"); return Poll::Ready(Some(Err(SecioError::HmacNotMatching))); } @@ -89,14 +93,14 @@ where let mut data_buf = frame; data_buf.truncate(content_length); - self.cipher_state.decrypt(&mut data_buf); + this.cipher_state.decrypt(&mut data_buf); - if !self.nonce.is_empty() { - let n = min(data_buf.len(), self.nonce.len()); - if data_buf[.. n] != self.nonce[.. n] { + if !this.nonce.is_empty() { + let n = min(data_buf.len(), this.nonce.len()); + if data_buf[.. n] != this.nonce[.. n] { return Poll::Ready(Some(Err(SecioError::NonceVerificationFailed))) } - self.nonce.drain(.. n); + this.nonce.drain(.. n); data_buf.drain(.. n); } @@ -106,23 +110,27 @@ where impl Sink for DecoderMiddleware where - S: Sink + Unpin, + S: Sink, { type Error = S::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Sink::poll_ready(Pin::new(&mut self.raw_stream), cx) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + Sink::poll_ready(this.raw_stream, cx) } - fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - Sink::start_send(Pin::new(&mut self.raw_stream), item) + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + let this = self.project(); + Sink::start_send(this.raw_stream, item) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Sink::poll_flush(Pin::new(&mut self.raw_stream), cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + Sink::poll_flush(this.raw_stream, cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Sink::poll_close(Pin::new(&mut self.raw_stream), cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + Sink::poll_close(this.raw_stream, cx) } } diff --git a/protocols/secio/src/codec/encode.rs b/protocols/secio/src/codec/encode.rs index a0f0c04c..88611bc1 100644 --- a/protocols/secio/src/codec/encode.rs +++ b/protocols/secio/src/codec/encode.rs @@ -31,9 +31,11 @@ use std::{pin::Pin, task::Context, task::Poll}; /// prefix is not covered by this module. /// /// Also implements `Stream` for convenience. +#[pin_project::pin_project] pub struct EncoderMiddleware { cipher_state: StreamCipher, hmac: Hmac, + #[pin] raw_sink: S, } @@ -49,38 +51,43 @@ impl EncoderMiddleware { impl Sink> for EncoderMiddleware where - S: Sink> + Unpin, + S: Sink>, { type Error = S::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Sink::poll_ready(Pin::new(&mut self.raw_sink), cx) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + Sink::poll_ready(this.raw_sink, cx) } - fn start_send(mut self: Pin<&mut Self>, mut data_buf: Vec) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, mut data_buf: Vec) -> Result<(), Self::Error> { + let this = self.project(); // TODO if SinkError gets refactor to SecioError, then use try_apply_keystream - self.cipher_state.encrypt(&mut data_buf[..]); - let signature = self.hmac.sign(&data_buf[..]); + this.cipher_state.encrypt(&mut data_buf[..]); + let signature = this.hmac.sign(&data_buf[..]); data_buf.extend_from_slice(signature.as_ref()); - Sink::start_send(Pin::new(&mut self.raw_sink), data_buf) + Sink::start_send(this.raw_sink, data_buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Sink::poll_flush(Pin::new(&mut self.raw_sink), cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + Sink::poll_flush(this.raw_sink, cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Sink::poll_close(Pin::new(&mut self.raw_sink), cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + Sink::poll_close(this.raw_sink, cx) } } impl Stream for EncoderMiddleware where - S: Stream + Unpin, + S: Stream, { type Item = S::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Stream::poll_next(Pin::new(&mut self.raw_sink), cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + Stream::poll_next(this.raw_sink, cx) } } diff --git a/protocols/secio/src/codec/len_prefix.rs b/protocols/secio/src/codec/len_prefix.rs index 376d15c2..8b70083b 100644 --- a/protocols/secio/src/codec/len_prefix.rs +++ b/protocols/secio/src/codec/len_prefix.rs @@ -91,7 +91,7 @@ where impl Stream for LenPrefixCodec where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static + T: AsyncRead + AsyncWrite + Send + 'static { type Item = io::Result>; @@ -102,7 +102,7 @@ where impl Sink> for LenPrefixCodec where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static + T: AsyncRead + AsyncWrite + Send + 'static { type Error = io::Error; @@ -122,3 +122,6 @@ where Pin::new(&mut self.sink).poll_close(cx) } } + +impl Unpin for LenPrefixCodec { +} diff --git a/src/bandwidth.rs b/src/bandwidth.rs index 8e7b882b..c5497dde 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -57,10 +57,7 @@ impl BandwidthLogging { impl Transport for BandwidthLogging where - TInner: Transport + Unpin, - TInner::Dial: Unpin, - TInner::Listener: Unpin, - TInner::ListenerUpgrade: Unpin + TInner: Transport, { type Output = BandwidthConnecLogging; type Error = TInner::Error; @@ -85,27 +82,32 @@ where /// Wraps around a `Stream` that produces connections. Wraps each connection around a bandwidth /// counter. +#[pin_project::pin_project] pub struct BandwidthListener { + #[pin] inner: TInner, sinks: Arc, } impl Stream for BandwidthListener where - TInner: TryStream> + Unpin + TInner: TryStream> { type Item = Result>, TInner::Error>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + let event = - if let Some(event) = ready!(self.inner.try_poll_next_unpin(cx)?) { + if let Some(event) = ready!(this.inner.try_poll_next(cx)?) { event } else { return Poll::Ready(None) }; - let event = event.map(|inner| { - BandwidthFuture { inner, sinks: self.sinks.clone() } + let event = event.map({ + let sinks = this.sinks.clone(); + |inner| BandwidthFuture { inner, sinks } }); Poll::Ready(Some(Ok(event))) @@ -114,17 +116,20 @@ where /// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth /// counter. +#[pin_project::pin_project] pub struct BandwidthFuture { + #[pin] inner: TInner, sinks: Arc, } -impl Future for BandwidthFuture { +impl Future for BandwidthFuture { type Output = Result, TInner::Error>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = ready!(self.inner.try_poll_unpin(cx)?); - let logged = BandwidthConnecLogging { inner, sinks: self.sinks.clone() }; + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + let inner = ready!(this.inner.try_poll(cx)?); + let logged = BandwidthConnecLogging { inner, sinks: this.sinks.clone() }; Poll::Ready(Ok(logged)) } } @@ -148,44 +153,52 @@ impl BandwidthSinks { } /// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it. +#[pin_project::pin_project] pub struct BandwidthConnecLogging { + #[pin] inner: TInner, sinks: Arc, } -impl AsyncRead for BandwidthConnecLogging { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - let num_bytes = ready!(Pin::new(&mut self.inner).poll_read(cx, buf))?; - self.sinks.download.lock().inject(num_bytes); +impl AsyncRead for BandwidthConnecLogging { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_read(cx, buf))?; + this.sinks.download.lock().inject(num_bytes); Poll::Ready(Ok(num_bytes)) } - fn poll_read_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll> { - let num_bytes = ready!(Pin::new(&mut self.inner).poll_read_vectored(cx, bufs))?; - self.sinks.download.lock().inject(num_bytes); + fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?; + this.sinks.download.lock().inject(num_bytes); Poll::Ready(Ok(num_bytes)) } } -impl AsyncWrite for BandwidthConnecLogging { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - let num_bytes = ready!(Pin::new(&mut self.inner).poll_write(cx, buf))?; - self.sinks.upload.lock().inject(num_bytes); +impl AsyncWrite for BandwidthConnecLogging { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_write(cx, buf))?; + this.sinks.upload.lock().inject(num_bytes); Poll::Ready(Ok(num_bytes)) } - fn poll_write_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll> { - let num_bytes = ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, bufs))?; - self.sinks.upload.lock().inject(num_bytes); + fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll> { + let this = self.project(); + let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?; + this.sinks.upload.lock().inject(num_bytes); Poll::Ready(Ok(num_bytes)) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + this.inner.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut self.inner).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) } } diff --git a/src/simple.rs b/src/simple.rs index 4604346b..fb4d3b73 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -67,7 +67,7 @@ impl InboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, F: Fn(C) -> O, - O: Future> + Unpin + O: Future>, { type Output = A; type Error = E; @@ -83,7 +83,7 @@ impl OutboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, F: Fn(C) -> O, - O: Future> + Unpin + O: Future>, { type Output = A; type Error = E; diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 7a9c4e0c..6dbf7b18 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -207,9 +207,9 @@ where TBehaviour: NetworkBehaviour, ::Substream: Send + 'static, TTransport: Transport + Clone, TTransport::Error: Send + 'static, - TTransport::Listener: Unpin + Send + 'static, - TTransport::ListenerUpgrade: Unpin + Send + 'static, - TTransport::Dial: Unpin + Send + 'static, + TTransport::Listener: Send + 'static, + TTransport::ListenerUpgrade: Send + 'static, + TTransport::Dial: Send + 'static, THandlerErr: error::Error, THandler: IntoProtocolsHandler + Send + 'static, ::Handler: ProtocolsHandler, Error = THandlerErr> + Send + 'static, @@ -251,7 +251,9 @@ where TBehaviour: NetworkBehaviour, } /// Remove some listener. - pub fn remove_listener(me: &mut Self, id: ListenerId) -> Option { + /// + /// Returns `Ok(())` if there was a listener with this ID. + pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> { me.network.remove_listener(id) } @@ -502,9 +504,9 @@ where TBehaviour: NetworkBehaviour, ::Substream: Send + 'static, TTransport: Transport + Clone, TTransport::Error: Send + 'static, - TTransport::Listener: Unpin + Send + 'static, - TTransport::ListenerUpgrade: Unpin + Send + 'static, - TTransport::Dial: Unpin + Send + 'static, + TTransport::Listener: Send + 'static, + TTransport::ListenerUpgrade: Send + 'static, + TTransport::Dial: Send + 'static, THandlerErr: error::Error, THandler: IntoProtocolsHandler + Send + 'static, ::Handler: ProtocolsHandler, Error = THandlerErr> + Send + 'static, @@ -584,9 +586,9 @@ where TBehaviour: NetworkBehaviour, ::Substream: Send + 'static, TTransport: Transport + Clone, TTransport::Error: Send + 'static, - TTransport::Listener: Unpin + Send + 'static, - TTransport::ListenerUpgrade: Unpin + Send + 'static, - TTransport::Dial: Unpin + Send + 'static, + TTransport::Listener: Send + 'static, + TTransport::ListenerUpgrade: Send + 'static, + TTransport::Dial: Send + 'static, ::ProtocolsHandler: Send + 'static, <::ProtocolsHandler as IntoProtocolsHandler>::Handler: ProtocolsHandler> + Send + 'static, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Send + 'static, diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 2ccdebe1..c3966da4 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -103,7 +103,7 @@ where T: Transport + Send + Clone + 'static, T::Error: Send + 'static, T::Dial: Send + 'static, - T::Listener: Send + Unpin + 'static, + T::Listener: Send + 'static, T::ListenerUpgrade: Send + 'static, T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static { diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index ca96a0fe..856be7e5 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -97,7 +97,7 @@ where T: Transport + Send + Clone + 'static, T::Error: Send + 'static, T::Dial: Send + 'static, - T::Listener: Send + Unpin + 'static, + T::Listener: Send + 'static, T::ListenerUpgrade: Send + 'static, T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static {