Remove some Unpin requirements on Futures (#1384)

* Remove lots of Unpin requirements

* Make Transport::and_then accept pinned futures

* Finish the PR

* Work on secio

* Fix BandwidthTransport

* Adjust ListenersStrema

* Fix nodes/tasks

* Fix nodes

* Various more fixes

* Fix yamux

* Fix Swarm

* Fix WebSockets

* Fix rw-stream-sink
This commit is contained in:
Pierre Krieger 2020-01-14 12:03:10 +01:00 committed by GitHub
parent 42a45e2630
commit 3f968cbf92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 249 additions and 216 deletions

View File

@ -33,6 +33,7 @@ libp2p-uds = { version = "0.14.0-alpha.1", path = "transports/uds" }
libp2p-wasm-ext = { version = "0.7.0-alpha.1", path = "transports/wasm-ext" }
libp2p-yamux = { version = "0.14.0-alpha.1", path = "muxers/yamux" }
parking_lot = "0.10.0"
pin-project = "0.4.6"
smallvec = "1.0"
wasm-timer = "0.2.4"

View File

@ -155,8 +155,8 @@ where
impl<A, B, I> Sink<I> for EitherOutput<A, B>
where
A: Sink<I> + Unpin,
B: Sink<I> + Unpin,
A: Sink<I>,
B: Sink<I>,
{
type Error = EitherError<A::Error, B::Error>;
@ -414,8 +414,8 @@ pub enum EitherFuture2<A, B> { A(#[pin] A), B(#[pin] B) }
impl<AFut, BFut, AItem, BItem, AError, BError> Future for EitherFuture2<AFut, BFut>
where
AFut: TryFuture<Ok = AItem, Error = AError> + Unpin,
BFut: TryFuture<Ok = BItem, Error = BError> + Unpin,
AFut: TryFuture<Ok = AItem, Error = AError>,
BFut: TryFuture<Ok = BItem, Error = BError>,
{
type Output = Result<EitherOutput<AItem, BItem>, EitherError<AError, BError>>;

View File

@ -322,7 +322,7 @@ where
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
-> ReachAttemptId
where
TFut: Future<Output = Result<(TConnInfo, TMuxer), TReachErr>> + Unpin + Send + 'static,
TFut: Future<Output = Result<(TConnInfo, TMuxer), TReachErr>> + Send + 'static,
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,

View File

@ -60,7 +60,7 @@ use std::{collections::VecDeque, fmt, pin::Pin};
/// ListenersEvent::AddressExpired { listener_id, listen_addr } => {
/// println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr);
/// },
/// ListenersEvent::Closed { listener_id, .. } => {
/// ListenersEvent::Closed { listener_id } => {
/// println!("Listener {:?} has been closed", listener_id);
/// },
/// ListenersEvent::Error { listener_id, error } => {
@ -84,7 +84,9 @@ where
/// Transport used to spawn listeners.
transport: TTrans,
/// All the active listeners.
listeners: VecDeque<Listener<TTrans>>,
/// The `Listener` struct contains a stream that we want to be pinned. Since the `VecDeque`
/// can be resized, the only way is to use a `Pin<Box<>>`.
listeners: VecDeque<Pin<Box<Listener<TTrans>>>>,
/// The next listener ID to assign.
next_id: ListenerId
}
@ -97,6 +99,7 @@ where
pub struct ListenerId(u64);
/// A single active listener.
#[pin_project::pin_project]
#[derive(Debug)]
struct Listener<TTrans>
where
@ -105,6 +108,7 @@ where
/// The ID of this listener.
id: ListenerId,
/// The object that actually listens.
#[pin]
listener: TTrans::Listener,
/// Addresses it is listening on.
addresses: SmallVec<[Multiaddr; 4]>
@ -144,8 +148,6 @@ where
Closed {
/// The ID of the listener that closed.
listener_id: ListenerId,
/// The listener that closed.
listener: TTrans::Listener,
},
/// A listener errored.
///
@ -190,22 +192,25 @@ where
TTrans: Clone,
{
let listener = self.transport.clone().listen_on(addr)?;
self.listeners.push_back(Listener {
self.listeners.push_back(Box::pin(Listener {
id: self.next_id,
listener,
addresses: SmallVec::new()
});
}));
let id = self.next_id;
self.next_id = ListenerId(self.next_id.0 + 1);
Ok(id)
}
/// Remove the listener matching the given `ListenerId`.
pub fn remove_listener(&mut self, id: ListenerId) -> Option<TTrans::Listener> {
///
/// Return `Ok(())` if a listener with this ID was in the list.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
self.listeners.remove(i).map(|l| l.listener)
self.listeners.remove(i);
Ok(())
} else {
None
Err(())
}
}
@ -220,21 +225,19 @@ where
}
/// Provides an API similar to `Stream`, except that it cannot end.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<ListenersEvent<TTrans>>
where
TTrans::Listener: Unpin,
{
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<ListenersEvent<TTrans>> {
// We remove each element from `listeners` one by one and add them back.
let mut remaining = self.listeners.len();
while let Some(mut listener) = self.listeners.pop_back() {
match TryStream::try_poll_next(Pin::new(&mut listener.listener), cx) {
let mut listener_project = listener.as_mut().project();
match TryStream::try_poll_next(listener_project.listener.as_mut(), cx) {
Poll::Pending => {
self.listeners.push_front(listener);
remaining -= 1;
if remaining == 0 { break }
}
Poll::Ready(Some(Ok(ListenerEvent::Upgrade { upgrade, local_addr, remote_addr }))) => {
let id = listener.id;
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::Incoming {
listener_id: id,
@ -244,13 +247,13 @@ where
})
}
Poll::Ready(Some(Ok(ListenerEvent::NewAddress(a)))) => {
if listener.addresses.contains(&a) {
if listener_project.addresses.contains(&a) {
debug!("Transport has reported address {} multiple times", a)
}
if !listener.addresses.contains(&a) {
listener.addresses.push(a.clone());
if !listener_project.addresses.contains(&a) {
listener_project.addresses.push(a.clone());
}
let id = listener.id;
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::NewAddress {
listener_id: id,
@ -258,8 +261,8 @@ where
})
}
Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(a)))) => {
listener.addresses.retain(|x| x != &a);
let id = listener.id;
listener_project.addresses.retain(|x| x != &a);
let id = *listener_project.id;
self.listeners.push_front(listener);
return Poll::Ready(ListenersEvent::AddressExpired {
listener_id: id,
@ -268,13 +271,12 @@ where
}
Poll::Ready(None) => {
return Poll::Ready(ListenersEvent::Closed {
listener_id: listener.id,
listener: listener.listener
listener_id: *listener_project.id,
})
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(ListenersEvent::Error {
listener_id: listener.id,
listener_id: *listener_project.id,
error: err
})
}
@ -289,7 +291,6 @@ where
impl<TTrans> Stream for ListenersStream<TTrans>
where
TTrans: Transport,
TTrans::Listener: Unpin,
{
type Item = ListenersEvent<TTrans>;
@ -338,7 +339,7 @@ where
.field("listener_id", listener_id)
.field("local_addr", local_addr)
.finish(),
ListenersEvent::Closed { listener_id, .. } => f
ListenersEvent::Closed { listener_id } => f
.debug_struct("ListenersEvent::Closed")
.field("listener_id", listener_id)
.finish(),

View File

@ -171,8 +171,6 @@ where
ListenerClosed {
/// The listener ID that closed.
listener_id: ListenerId,
/// The listener which closed.
listener: TTrans::Listener,
},
/// One of the listeners errored.
@ -309,7 +307,7 @@ where
.field("listen_addr", listen_addr)
.finish()
}
NetworkEvent::ListenerClosed { listener_id, .. } => {
NetworkEvent::ListenerClosed { listener_id } => {
f.debug_struct("ListenerClosed")
.field("listener_id", listener_id)
.finish()
@ -580,7 +578,7 @@ impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo,
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::ListenerUpgrade: Unpin + Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
@ -735,7 +733,9 @@ where
}
/// Remove a previously added listener.
pub fn remove_listener(&mut self, id: ListenerId) -> Option<TTrans::Listener> {
///
/// Returns `Ok(())` if a listener with this ID was in the list.
pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
self.listeners.remove_listener(id)
}
@ -788,7 +788,7 @@ where
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Unpin + Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TInEvent: Send + 'static,
@ -936,7 +936,7 @@ where
fn start_dial_out(&mut self, peer_id: TPeerId, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Dial: Unpin + Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
@ -982,8 +982,7 @@ where
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Unpin + Send + 'static,
TTrans::Listener: Unpin,
TTrans::Dial: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
@ -1021,8 +1020,8 @@ where
Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => {
return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr })
}
Poll::Ready(ListenersEvent::Closed { listener_id, listener }) => {
return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, listener })
Poll::Ready(ListenersEvent::Closed { listener_id }) => {
return Poll::Ready(NetworkEvent::ListenerClosed { listener_id })
}
Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
return Poll::Ready(NetworkEvent::ListenerError { listener_id, error })
@ -1462,7 +1461,7 @@ impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo,
where
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Unpin + Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
@ -1759,7 +1758,7 @@ impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TConnInfo,
where
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Unpin + Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,

View File

@ -157,7 +157,7 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
/// processing the node's events.
pub fn add_reach_attempt<F, M>(&mut self, future: F, user_data: T, handler: H) -> TaskId
where
F: Future<Output = Result<(C, M), E>> + Unpin + Send + 'static,
F: Future<Output = Result<(C, M), E>> + Send + 'static,
H: IntoNodeHandler<C> + Send + 'static,
H::Handler: NodeHandler<Substream = Substream<M>, InEvent = I, OutEvent = O, Error = HE> + Send + 'static,
E: error::Error + Send + 'static,

View File

@ -92,7 +92,7 @@ where
id: i,
sender: s,
receiver: r.fuse(),
state: State::Future { future: f, handler: h, events_buffer: Vec::new() },
state: State::Future { future: Box::pin(f), handler: h, events_buffer: Vec::new() },
taken_over: SmallVec::new()
}
}
@ -124,7 +124,8 @@ where
/// Future to resolve to connect to the node.
Future {
/// The future that will attempt to reach the node.
future: F,
// TODO: don't pin this Future; this requires deeper changes though
future: Pin<Box<F>>,
/// The handler that will be used to build the `HandledNode`.
handler: H,
/// While we are dialing the future, we need to buffer the events received on
@ -163,7 +164,7 @@ where
impl<F, M, H, I, O, E, C> Future for Task<F, M, H, I, O, E, C>
where
M: StreamMuxer,
F: Future<Output = Result<(C, M), E>> + Unpin,
F: Future<Output = Result<(C, M), E>>,
H: IntoNodeHandler<C>,
H::Handler: NodeHandler<Substream = Substream<M>, InEvent = I, OutEvent = O>
{

View File

@ -25,7 +25,7 @@ use crate::{
};
use futures::{future::Either, prelude::*};
use multiaddr::Multiaddr;
use std::{error, pin::Pin, task::Context, task::Poll};
use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll};
/// See the `Transport::and_then` method.
#[derive(Debug, Clone)]
@ -40,11 +40,8 @@ impl<T, C> AndThen<T, C> {
impl<T, C, F, O> Transport for AndThen<T, C>
where
T: Transport,
T::Dial: Unpin,
T::Listener: Unpin,
T::ListenerUpgrade: Unpin,
C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
F: TryFuture<Ok = O> + Unpin,
F: TryFuture<Ok = O>,
F::Error: error::Error,
{
type Output = O;
@ -66,8 +63,9 @@ where
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let dialed_fut = self.transport.dial(addr.clone()).map_err(|err| err.map(EitherError::A))?;
let future = AndThenFuture {
inner: Either::Left(dialed_fut),
args: Some((self.fun, ConnectedPoint::Dialer { address: addr }))
inner: Either::Left(Box::pin(dialed_fut)),
args: Some((self.fun, ConnectedPoint::Dialer { address: addr })),
marker: PhantomPinned,
};
Ok(future)
}
@ -76,18 +74,17 @@ where
/// Custom `Stream` to avoid boxing.
///
/// Applies a function to every stream item.
#[pin_project::pin_project]
#[derive(Debug, Clone)]
pub struct AndThenStream<TListener, TMap> {
#[pin]
stream: TListener,
fun: TMap
}
impl<TListener, TMap> Unpin for AndThenStream<TListener, TMap> {
}
impl<TListener, TMap, TTransOut, TMapOut, TListUpgr, TTransErr> Stream for AndThenStream<TListener, TMap>
where
TListener: TryStream<Ok = ListenerEvent<TListUpgr>, Error = TTransErr> + Unpin,
TListener: TryStream<Ok = ListenerEvent<TListUpgr>, Error = TTransErr>,
TListUpgr: TryFuture<Ok = TTransOut, Error = TTransErr>,
TMap: FnOnce(TTransOut, ConnectedPoint) -> TMapOut + Clone,
TMapOut: TryFuture
@ -97,8 +94,9 @@ where
EitherError<TTransErr, TMapOut::Error>
>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match TryStream::try_poll_next(Pin::new(&mut self.stream), cx) {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match TryStream::try_poll_next(this.stream, cx) {
Poll::Ready(Some(Ok(event))) => {
let event = match event {
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
@ -108,8 +106,9 @@ where
};
ListenerEvent::Upgrade {
upgrade: AndThenFuture {
inner: Either::Left(upgrade),
args: Some((self.fun.clone(), point))
inner: Either::Left(Box::pin(upgrade)),
args: Some((this.fun.clone(), point)),
marker: PhantomPinned,
},
local_addr,
remote_addr
@ -132,26 +131,24 @@ where
/// Applies a function to the result of the inner future.
#[derive(Debug)]
pub struct AndThenFuture<TFut, TMap, TMapOut> {
inner: Either<TFut, TMapOut>,
args: Option<(TMap, ConnectedPoint)>
}
impl<TFut, TMap, TMapOut> Unpin for AndThenFuture<TFut, TMap, TMapOut> {
inner: Either<Pin<Box<TFut>>, Pin<Box<TMapOut>>>,
args: Option<(TMap, ConnectedPoint)>,
marker: PhantomPinned,
}
impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut>
where
TFut: TryFuture + Unpin,
TFut: TryFuture,
TMap: FnOnce(TFut::Ok, ConnectedPoint) -> TMapOut,
TMapOut: TryFuture + Unpin
TMapOut: TryFuture,
{
type Output = Result<TMapOut::Ok, EitherError<TFut::Error, TMapOut::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
let future = match (*self).inner {
Either::Left(ref mut future) => {
let item = match TryFuture::try_poll(Pin::new(future), cx) {
let future = match &mut self.inner {
Either::Left(future) => {
let item = match TryFuture::try_poll(future.as_mut(), cx) {
Poll::Ready(Ok(v)) => v,
Poll::Ready(Err(err)) => return Poll::Ready(Err(EitherError::A(err))),
Poll::Pending => return Poll::Pending,
@ -159,8 +156,8 @@ where
let (f, a) = self.args.take().expect("AndThenFuture has already finished.");
f(item, a)
}
Either::Right(ref mut future) => {
return match TryFuture::try_poll(Pin::new(future), cx) {
Either::Right(future) => {
return match TryFuture::try_poll(future.as_mut(), cx) {
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
Poll::Ready(Err(err)) => return Poll::Ready(Err(EitherError::B(err))),
Poll::Pending => Poll::Pending,
@ -168,7 +165,10 @@ where
}
};
(*self).inner = Either::Right(future);
self.inner = Either::Right(Box::pin(future));
}
}
}
impl<TFut, TMap, TMapOut> Unpin for AndThenFuture<TFut, TMap, TMapOut> {
}

View File

@ -101,9 +101,6 @@ where
AndThen<T, impl FnOnce(C, ConnectedPoint) -> Authenticate<C, U> + Clone>
> where
T: Transport<Output = C>,
T::Dial: Unpin,
T::Listener: Unpin,
T::ListenerUpgrade: Unpin,
I: ConnectionInfo,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
@ -133,9 +130,6 @@ where
pub fn apply<C, D, U, I, E>(self, upgrade: U) -> Builder<Upgrade<T, U>>
where
T: Transport<Output = (I, C)>,
T::Dial: Unpin,
T::Listener: Unpin,
T::ListenerUpgrade: Unpin,
C: AsyncRead + AsyncWrite + Unpin,
D: AsyncRead + AsyncWrite + Unpin,
I: ConnectionInfo,
@ -161,9 +155,6 @@ where
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
T::Dial: Unpin,
T::Listener: Unpin,
T::ListenerUpgrade: Unpin,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
@ -183,11 +174,13 @@ where
/// in the context of negotiating a secure channel.
///
/// Configured through [`Builder::authenticate`].
#[pin_project::pin_project]
pub struct Authenticate<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>
{
#[pin]
inner: EitherUpgrade<C, U>
}
@ -201,8 +194,9 @@ where
{
type Output = <EitherUpgrade<C, U> as Future>::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Future::poll(Pin::new(&mut self.inner), cx)
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
Future::poll(this.inner, cx)
}
}
@ -210,12 +204,14 @@ where
/// top of an authenticated transport.
///
/// Configured through [`Builder::multiplex`].
#[pin_project::pin_project]
pub struct Multiplex<C, U, I>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
{
info: Option<I>,
#[pin]
upgrade: EitherUpgrade<C, U>,
}
@ -227,23 +223,17 @@ where
{
type Output = Result<(I, M), UpgradeError<E>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let m = match ready!(Future::poll(Pin::new(&mut self.upgrade), cx)) {
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let m = match ready!(Future::poll(this.upgrade, cx)) {
Ok(m) => m,
Err(err) => return Poll::Ready(Err(err)),
};
let i = self.info.take().expect("Multiplex future polled after completion.");
let i = this.info.take().expect("Multiplex future polled after completion.");
Poll::Ready(Ok((i, m)))
}
}
impl<C, U, I> Unpin for Multiplex<C, U, I>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>> + OutboundUpgrade<Negotiated<C>>,
{
}
/// An inbound or outbound upgrade.
type EitherUpgrade<C, U> = future::Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>;
@ -262,9 +252,6 @@ impl<T, U> Upgrade<T, U> {
impl<T, C, D, U, I, E> Transport for Upgrade<T, U>
where
T: Transport<Output = (I, C)>,
T::Dial: Unpin,
T::Listener: Unpin,
T::ListenerUpgrade: Unpin,
T::Error: 'static,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D, Error = E>,
@ -281,7 +268,7 @@ where
let future = self.inner.dial(addr.clone())
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(DialUpgradeFuture {
future,
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade))
})
}
@ -290,7 +277,7 @@ where
let stream = self.inner.listen_on(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(ListenerStream {
stream,
stream: Box::pin(stream),
upgrade: self.upgrade
})
}
@ -337,13 +324,13 @@ where
U: OutboundUpgrade<Negotiated<C>>,
C: AsyncRead + AsyncWrite + Unpin,
{
future: F,
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<I>, OutboundUpgradeApply<C, U>)>
}
impl<F, U, I, C, D> Future for DialUpgradeFuture<F, U, I, C>
where
F: TryFuture<Ok = (I, C)> + Unpin,
F: TryFuture<Ok = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundUpgrade<Negotiated<C>, Output = D>,
U::Error: Error
@ -358,7 +345,7 @@ where
loop {
this.upgrade = match this.upgrade {
future::Either::Left(ref mut up) => {
let (i, c) = match ready!(TryFuture::try_poll(Pin::new(&mut this.future), cx).map_err(TransportUpgradeError::Transport)) {
let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
Ok(v) => v,
Err(err) => return Poll::Ready(Err(err)),
};
@ -387,13 +374,13 @@ where
/// The [`Transport::Listener`] stream of an [`Upgrade`]d transport.
pub struct ListenerStream<S, U> {
stream: S,
stream: Pin<Box<S>>,
upgrade: U
}
impl<S, U, F, I, C, D> Stream for ListenerStream<S, U>
where
S: TryStream<Ok = ListenerEvent<F>> + Unpin,
S: TryStream<Ok = ListenerEvent<F>>,
F: TryFuture<Ok = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D> + Clone
@ -401,11 +388,11 @@ where
type Item = Result<ListenerEvent<ListenerUpgradeFuture<F, U, I, C>>, TransportUpgradeError<S::Error, U::Error>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match ready!(TryStream::try_poll_next(Pin::new(&mut self.stream), cx)) {
match ready!(TryStream::try_poll_next(self.stream.as_mut(), cx)) {
Some(Ok(event)) => {
let event = event.map(move |future| {
ListenerUpgradeFuture {
future,
future: Box::pin(future),
upgrade: future::Either::Left(Some(self.upgrade.clone()))
}
});
@ -428,13 +415,13 @@ where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>
{
future: F,
future: Pin<Box<F>>,
upgrade: future::Either<Option<U>, (Option<I>, InboundUpgradeApply<C, U>)>
}
impl<F, U, I, C, D> Future for ListenerUpgradeFuture<F, U, I, C>
where
F: TryFuture<Ok = (I, C)> + Unpin,
F: TryFuture<Ok = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>, Output = D>,
U::Error: Error
@ -449,7 +436,7 @@ where
loop {
this.upgrade = match this.upgrade {
future::Either::Left(ref mut up) => {
let (i, c) = match ready!(TryFuture::try_poll(Pin::new(&mut this.future), cx).map_err(TransportUpgradeError::Transport)) {
let (i, c) = match ready!(TryFuture::try_poll(this.future.as_mut(), cx).map_err(TransportUpgradeError::Transport)) {
Ok(v) => v,
Err(err) => return Poll::Ready(Err(err))
};

View File

@ -86,7 +86,7 @@ where
upgrade: U,
},
Upgrade {
future: U::Future
future: Pin<Box<U::Future>>
},
Undefined
}
@ -102,7 +102,6 @@ impl<C, U> Future for InboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: InboundUpgrade<Negotiated<C>>,
U::Future: Unpin,
{
type Output = Result<U::Output, UpgradeError<U::Error>>;
@ -118,7 +117,7 @@ where
}
};
self.inner = InboundUpgradeApplyState::Upgrade {
future: upgrade.upgrade_inbound(Compat01As03::new(io), info.0)
future: Box::pin(upgrade.upgrade_inbound(Compat01As03::new(io), info.0))
};
}
InboundUpgradeApplyState::Upgrade { mut future } => {
@ -163,7 +162,7 @@ where
upgrade: U
},
Upgrade {
future: U::Future
future: Pin<Box<U::Future>>
},
Undefined
}
@ -179,7 +178,6 @@ impl<C, U> Future for OutboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite + Unpin,
U: OutboundUpgrade<Negotiated<C>>,
U::Future: Unpin,
{
type Output = Result<U::Output, UpgradeError<U::Error>>;
@ -195,7 +193,7 @@ where
}
};
self.inner = OutboundUpgradeApplyState::Upgrade {
future: upgrade.upgrade_outbound(Compat01As03::new(connection), info.0)
future: Box::pin(upgrade.upgrade_outbound(Compat01As03::new(connection), info.0))
};
}
OutboundUpgradeApplyState::Upgrade { mut future } => {

View File

@ -144,7 +144,7 @@ pub trait InboundUpgrade<C>: UpgradeInfo {
/// Possible error during the handshake.
type Error;
/// Future that performs the handshake with the remote.
type Future: Future<Output = Result<Self::Output, Self::Error>> + Unpin;
type Future: Future<Output = Result<Self::Output, Self::Error>>;
/// After we have determined that the remote supports one of the protocols we support, this
/// method is called to start the handshake.
@ -184,7 +184,7 @@ pub trait OutboundUpgrade<C>: UpgradeInfo {
/// Possible error during the handshake.
type Error;
/// Future that performs the handshake with the remote.
type Future: Future<Output = Result<Self::Output, Self::Error>> + Unpin;
type Future: Future<Output = Result<Self::Output, Self::Error>>;
/// After we have determined that the remote supports one of the protocols we support, this
/// method is called to start the handshake.

View File

@ -11,6 +11,7 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
futures = "0.3.1"
pin-project = "0.4.6"
static_assertions = "1"
[dev-dependencies]

View File

@ -32,7 +32,9 @@ static_assertions::const_assert!(std::mem::size_of::<usize>() <= std::mem::size_
/// Wraps a [`Stream`] and [`Sink`] whose items are buffers.
/// Implements [`AsyncRead`] and [`AsyncWrite`].
#[pin_project::pin_project]
pub struct RwStreamSink<S: TryStream> {
#[pin]
inner: S,
current_item: Option<std::io::Cursor<<S as TryStream>::Ok>>
}
@ -49,15 +51,17 @@ where
S: TryStream<Error = io::Error> + Unpin,
<S as TryStream>::Ok: AsRef<[u8]>
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let mut this = self.project();
// Grab the item to copy from.
let item_to_copy = loop {
if let Some(ref mut i) = self.current_item {
if let Some(ref mut i) = this.current_item {
if i.position() < i.get_ref().as_ref().len() as u64 {
break i
}
}
self.current_item = Some(match ready!(self.inner.try_poll_next_unpin(cx)) {
*this.current_item = Some(match ready!(this.inner.as_mut().try_poll_next(cx)) {
Some(Ok(i)) => std::io::Cursor::new(i),
Some(Err(e)) => return Poll::Ready(Err(e)),
None => return Poll::Ready(Ok(0)) // EOF
@ -74,26 +78,27 @@ where
S: TryStream + Sink<<S as TryStream>::Ok, Error = io::Error> + Unpin,
<S as TryStream>::Ok: for<'r> From<&'r [u8]>
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
ready!(Pin::new(&mut self.inner).poll_ready(cx)?);
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let mut this = self.project();
ready!(this.inner.as_mut().poll_ready(cx)?);
let n = buf.len();
if let Err(e) = Pin::new(&mut self.inner).start_send(buf.into()) {
if let Err(e) = this.inner.start_send(buf.into()) {
return Poll::Ready(Err(e))
}
Poll::Ready(Ok(n))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let this = self.project();
this.inner.poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_close(cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let this = self.project();
this.inner.poll_close(cx)
}
}
impl<S: TryStream> Unpin for RwStreamSink<S> {}
#[cfg(test)]
mod tests {
use async_std::task;

View File

@ -288,7 +288,7 @@ impl<T> fmt::Debug for LocalIncoming<T> {
}
}
impl<T: Unpin> Stream for Incoming<T> {
impl<T> Stream for Incoming<T> {
type Item = Result<yamux::Stream, YamuxError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll<Option<Self::Item>> {
@ -300,7 +300,10 @@ impl<T: Unpin> Stream for Incoming<T> {
}
}
impl<T: Unpin> Stream for LocalIncoming<T> {
impl<T> Unpin for Incoming<T> {
}
impl<T> Stream for LocalIncoming<T> {
type Item = Result<yamux::Stream, YamuxError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll<Option<Self::Item>> {
@ -311,3 +314,6 @@ impl<T: Unpin> Stream for LocalIncoming<T> {
self.stream.size_hint()
}
}
impl<T> Unpin for LocalIncoming<T> {
}

View File

@ -18,6 +18,7 @@ hmac = "0.7.0"
lazy_static = "1.2.0"
libp2p-core = { version = "0.14.0-alpha.1", path = "../../core" }
log = "0.4.6"
pin-project = "0.4.6"
protobuf = "=2.8.1" # note: see https://github.com/libp2p/rust-libp2p/issues/1363
quicksink = "0.1"
rand = "0.7"

View File

@ -35,9 +35,11 @@ use std::{cmp::min, pin::Pin, task::Context, task::Poll};
/// frames isn't handled by this module.
///
/// Also implements `Sink` for convenience.
#[pin_project::pin_project]
pub struct DecoderMiddleware<S> {
cipher_state: StreamCipher,
hmac: Hmac,
#[pin]
raw_stream: S,
nonce: Vec<u8>
}
@ -59,29 +61,31 @@ impl<S> DecoderMiddleware<S> {
impl<S> Stream for DecoderMiddleware<S>
where
S: TryStream<Ok = Vec<u8>> + Unpin,
S: TryStream<Ok = Vec<u8>>,
S::Error: Into<SecioError>,
{
type Item = Result<Vec<u8>, SecioError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let frame = match TryStream::try_poll_next(Pin::new(&mut self.raw_stream), cx) {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let frame = match TryStream::try_poll_next(this.raw_stream, cx) {
Poll::Ready(Some(Ok(t))) => t,
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
};
if frame.len() < self.hmac.num_bytes() {
if frame.len() < this.hmac.num_bytes() {
debug!("frame too short when decoding secio frame");
return Poll::Ready(Some(Err(SecioError::FrameTooShort)));
}
let content_length = frame.len() - self.hmac.num_bytes();
let content_length = frame.len() - this.hmac.num_bytes();
{
let (crypted_data, expected_hash) = frame.split_at(content_length);
debug_assert_eq!(expected_hash.len(), self.hmac.num_bytes());
debug_assert_eq!(expected_hash.len(), this.hmac.num_bytes());
if self.hmac.verify(crypted_data, expected_hash).is_err() {
if this.hmac.verify(crypted_data, expected_hash).is_err() {
debug!("hmac mismatch when decoding secio frame");
return Poll::Ready(Some(Err(SecioError::HmacNotMatching)));
}
@ -89,14 +93,14 @@ where
let mut data_buf = frame;
data_buf.truncate(content_length);
self.cipher_state.decrypt(&mut data_buf);
this.cipher_state.decrypt(&mut data_buf);
if !self.nonce.is_empty() {
let n = min(data_buf.len(), self.nonce.len());
if data_buf[.. n] != self.nonce[.. n] {
if !this.nonce.is_empty() {
let n = min(data_buf.len(), this.nonce.len());
if data_buf[.. n] != this.nonce[.. n] {
return Poll::Ready(Some(Err(SecioError::NonceVerificationFailed)))
}
self.nonce.drain(.. n);
this.nonce.drain(.. n);
data_buf.drain(.. n);
}
@ -106,23 +110,27 @@ where
impl<S, I> Sink<I> for DecoderMiddleware<S>
where
S: Sink<I> + Unpin,
S: Sink<I>,
{
type Error = S::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_ready(Pin::new(&mut self.raw_stream), cx)
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = self.project();
Sink::poll_ready(this.raw_stream, cx)
}
fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
Sink::start_send(Pin::new(&mut self.raw_stream), item)
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
let this = self.project();
Sink::start_send(this.raw_stream, item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_flush(Pin::new(&mut self.raw_stream), cx)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = self.project();
Sink::poll_flush(this.raw_stream, cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_close(Pin::new(&mut self.raw_stream), cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = self.project();
Sink::poll_close(this.raw_stream, cx)
}
}

View File

@ -31,9 +31,11 @@ use std::{pin::Pin, task::Context, task::Poll};
/// prefix is not covered by this module.
///
/// Also implements `Stream` for convenience.
#[pin_project::pin_project]
pub struct EncoderMiddleware<S> {
cipher_state: StreamCipher,
hmac: Hmac,
#[pin]
raw_sink: S,
}
@ -49,38 +51,43 @@ impl<S> EncoderMiddleware<S> {
impl<S> Sink<Vec<u8>> for EncoderMiddleware<S>
where
S: Sink<Vec<u8>> + Unpin,
S: Sink<Vec<u8>>,
{
type Error = S::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_ready(Pin::new(&mut self.raw_sink), cx)
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = self.project();
Sink::poll_ready(this.raw_sink, cx)
}
fn start_send(mut self: Pin<&mut Self>, mut data_buf: Vec<u8>) -> Result<(), Self::Error> {
fn start_send(self: Pin<&mut Self>, mut data_buf: Vec<u8>) -> Result<(), Self::Error> {
let this = self.project();
// TODO if SinkError gets refactor to SecioError, then use try_apply_keystream
self.cipher_state.encrypt(&mut data_buf[..]);
let signature = self.hmac.sign(&data_buf[..]);
this.cipher_state.encrypt(&mut data_buf[..]);
let signature = this.hmac.sign(&data_buf[..]);
data_buf.extend_from_slice(signature.as_ref());
Sink::start_send(Pin::new(&mut self.raw_sink), data_buf)
Sink::start_send(this.raw_sink, data_buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_flush(Pin::new(&mut self.raw_sink), cx)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = self.project();
Sink::poll_flush(this.raw_sink, cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
Sink::poll_close(Pin::new(&mut self.raw_sink), cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let this = self.project();
Sink::poll_close(this.raw_sink, cx)
}
}
impl<S> Stream for EncoderMiddleware<S>
where
S: Stream + Unpin,
S: Stream,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Stream::poll_next(Pin::new(&mut self.raw_sink), cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
Stream::poll_next(this.raw_sink, cx)
}
}

View File

@ -91,7 +91,7 @@ where
impl<T> Stream for LenPrefixCodec<T>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
T: AsyncRead + AsyncWrite + Send + 'static
{
type Item = io::Result<Vec<u8>>;
@ -102,7 +102,7 @@ where
impl<T> Sink<Vec<u8>> for LenPrefixCodec<T>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static
T: AsyncRead + AsyncWrite + Send + 'static
{
type Error = io::Error;
@ -122,3 +122,6 @@ where
Pin::new(&mut self.sink).poll_close(cx)
}
}
impl<T> Unpin for LenPrefixCodec<T> {
}

View File

@ -57,10 +57,7 @@ impl<TInner> BandwidthLogging<TInner> {
impl<TInner> Transport for BandwidthLogging<TInner>
where
TInner: Transport + Unpin,
TInner::Dial: Unpin,
TInner::Listener: Unpin,
TInner::ListenerUpgrade: Unpin
TInner: Transport,
{
type Output = BandwidthConnecLogging<TInner::Output>;
type Error = TInner::Error;
@ -85,27 +82,32 @@ where
/// Wraps around a `Stream` that produces connections. Wraps each connection around a bandwidth
/// counter.
#[pin_project::pin_project]
pub struct BandwidthListener<TInner> {
#[pin]
inner: TInner,
sinks: Arc<BandwidthSinks>,
}
impl<TInner, TConn> Stream for BandwidthListener<TInner>
where
TInner: TryStream<Ok = ListenerEvent<TConn>> + Unpin
TInner: TryStream<Ok = ListenerEvent<TConn>>
{
type Item = Result<ListenerEvent<BandwidthFuture<TConn>>, TInner::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let event =
if let Some(event) = ready!(self.inner.try_poll_next_unpin(cx)?) {
if let Some(event) = ready!(this.inner.try_poll_next(cx)?) {
event
} else {
return Poll::Ready(None)
};
let event = event.map(|inner| {
BandwidthFuture { inner, sinks: self.sinks.clone() }
let event = event.map({
let sinks = this.sinks.clone();
|inner| BandwidthFuture { inner, sinks }
});
Poll::Ready(Some(Ok(event)))
@ -114,17 +116,20 @@ where
/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth
/// counter.
#[pin_project::pin_project]
pub struct BandwidthFuture<TInner> {
#[pin]
inner: TInner,
sinks: Arc<BandwidthSinks>,
}
impl<TInner: TryFuture + Unpin> Future for BandwidthFuture<TInner> {
impl<TInner: TryFuture> Future for BandwidthFuture<TInner> {
type Output = Result<BandwidthConnecLogging<TInner::Ok>, TInner::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = ready!(self.inner.try_poll_unpin(cx)?);
let logged = BandwidthConnecLogging { inner, sinks: self.sinks.clone() };
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let inner = ready!(this.inner.try_poll(cx)?);
let logged = BandwidthConnecLogging { inner, sinks: this.sinks.clone() };
Poll::Ready(Ok(logged))
}
}
@ -148,44 +153,52 @@ impl BandwidthSinks {
}
/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it.
#[pin_project::pin_project]
pub struct BandwidthConnecLogging<TInner> {
#[pin]
inner: TInner,
sinks: Arc<BandwidthSinks>,
}
impl<TInner: AsyncRead + Unpin> AsyncRead for BandwidthConnecLogging<TInner> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_read(cx, buf))?;
self.sinks.download.lock().inject(num_bytes);
impl<TInner: AsyncRead> AsyncRead for BandwidthConnecLogging<TInner> {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let this = self.project();
let num_bytes = ready!(this.inner.poll_read(cx, buf))?;
this.sinks.download.lock().inject(num_bytes);
Poll::Ready(Ok(num_bytes))
}
fn poll_read_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_read_vectored(cx, bufs))?;
self.sinks.download.lock().inject(num_bytes);
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll<io::Result<usize>> {
let this = self.project();
let num_bytes = ready!(this.inner.poll_read_vectored(cx, bufs))?;
this.sinks.download.lock().inject(num_bytes);
Poll::Ready(Ok(num_bytes))
}
}
impl<TInner: AsyncWrite + Unpin> AsyncWrite for BandwidthConnecLogging<TInner> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_write(cx, buf))?;
self.sinks.upload.lock().inject(num_bytes);
impl<TInner: AsyncWrite> AsyncWrite for BandwidthConnecLogging<TInner> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let this = self.project();
let num_bytes = ready!(this.inner.poll_write(cx, buf))?;
this.sinks.upload.lock().inject(num_bytes);
Poll::Ready(Ok(num_bytes))
}
fn poll_write_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, bufs))?;
self.sinks.upload.lock().inject(num_bytes);
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll<io::Result<usize>> {
let this = self.project();
let num_bytes = ready!(this.inner.poll_write_vectored(cx, bufs))?;
this.sinks.upload.lock().inject(num_bytes);
Poll::Ready(Ok(num_bytes))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let this = self.project();
this.inner.poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_close(cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let this = self.project();
this.inner.poll_close(cx)
}
}

View File

@ -67,7 +67,7 @@ impl<C, F, O, A, E> InboundUpgrade<C> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(C) -> O,
O: Future<Output = Result<A, E>> + Unpin
O: Future<Output = Result<A, E>>,
{
type Output = A;
type Error = E;
@ -83,7 +83,7 @@ impl<C, F, O, A, E> OutboundUpgrade<C> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(C) -> O,
O: Future<Output = Result<A, E>> + Unpin
O: Future<Output = Result<A, E>>,
{
type Output = A;
type Error = E;

View File

@ -207,9 +207,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Unpin + Send + 'static,
TTransport::ListenerUpgrade: Unpin + Send + 'static,
TTransport::Dial: Unpin + Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
THandlerErr: error::Error,
THandler: IntoProtocolsHandler + Send + 'static,
<THandler as IntoProtocolsHandler>::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Substream = Substream<TMuxer>, Error = THandlerErr> + Send + 'static,
@ -251,7 +251,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}
/// Remove some listener.
pub fn remove_listener(me: &mut Self, id: ListenerId) -> Option<TTransport::Listener> {
///
/// Returns `Ok(())` if there was a listener with this ID.
pub fn remove_listener(me: &mut Self, id: ListenerId) -> Result<(), ()> {
me.network.remove_listener(id)
}
@ -502,9 +504,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Unpin + Send + 'static,
TTransport::ListenerUpgrade: Unpin + Send + 'static,
TTransport::Dial: Unpin + Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
THandlerErr: error::Error,
THandler: IntoProtocolsHandler + Send + 'static,
<THandler as IntoProtocolsHandler>::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Substream = Substream<TMuxer>, Error = THandlerErr> + Send + 'static,
@ -584,9 +586,9 @@ where TBehaviour: NetworkBehaviour,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Unpin + Send + 'static,
TTransport::ListenerUpgrade: Unpin + Send + 'static,
TTransport::Dial: Unpin + Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
<TBehaviour as NetworkBehaviour>::ProtocolsHandler: Send + 'static,
<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler: ProtocolsHandler<Substream = Substream<TMuxer>> + Send + 'static,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Send + 'static,

View File

@ -103,7 +103,7 @@ where
T: Transport + Send + Clone + 'static,
T::Error: Send + 'static,
T::Dial: Send + 'static,
T::Listener: Send + Unpin + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static
{

View File

@ -97,7 +97,7 @@ where
T: Transport + Send + Clone + 'static,
T::Error: Send + 'static,
T::Dial: Send + 'static,
T::Listener: Send + Unpin + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static
{