From 894c83170bd5f638de020c97dc46efe4d8f97803 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Thu, 12 Dec 2019 14:18:45 +0100 Subject: [PATCH] Bring back phantom types to yamux upgrade outputs. --- muxers/yamux/src/lib.rs | 94 ++++++++++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 35 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index c4745fd4..507a1bea 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -21,7 +21,7 @@ //! Implements the Yamux multiplexing protocol for libp2p, see also the //! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md). -use futures::{future, prelude::*, ready}; +use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use parking_lot::Mutex; use std::{fmt, io, iter, pin::Pin, task::Context}; @@ -49,17 +49,20 @@ struct Inner { #[derive(Debug)] pub struct OpenSubstreamToken(()); -impl Yamux { +impl Yamux> +where + C: AsyncRead + AsyncWrite + Send + Unpin + 'static +{ /// Create a new Yamux connection. - pub fn new(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self - where - C: AsyncRead + AsyncWrite + Send + Unpin + 'static - { + pub fn new(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self { cfg.set_read_after_close(false); let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); let inner = Inner { - incoming: Incoming(Box::pin(yamux::into_stream(conn).err_into())), + incoming: Incoming { + stream: yamux::into_stream(conn).err_into().boxed(), + _marker: std::marker::PhantomData + }, control: ctrl, acknowledged: false }; @@ -67,17 +70,20 @@ impl Yamux { } } -impl Yamux { +impl Yamux> +where + C: AsyncRead + AsyncWrite + Unpin + 'static +{ /// Create a new Yamux connection (which is ![`Send`]). - pub fn local(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self - where - C: AsyncRead + AsyncWrite + Unpin + 'static - { + pub fn local(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self { cfg.set_read_after_close(false); let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); let inner = Inner { - incoming: LocalIncoming(Box::pin(yamux::into_stream(conn).err_into())), + incoming: LocalIncoming { + stream: yamux::into_stream(conn).err_into().boxed_local(), + _marker: std::marker::PhantomData + }, control: ctrl, acknowledged: false }; @@ -199,9 +205,9 @@ impl InboundUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { - type Output = Yamux; + type Output = Yamux>>; type Error = io::Error; - type Future = future::Ready, Self::Error>>; + type Future = future::Ready>; fn upgrade_inbound(self, io: Negotiated, _: Self::Info) -> Self::Future { future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Server))) @@ -212,9 +218,9 @@ impl InboundUpgrade for LocalConfig where C: AsyncRead + AsyncWrite + Unpin + 'static { - type Output = Yamux; + type Output = Yamux>>; type Error = io::Error; - type Future = future::Ready, Self::Error>>; + type Future = future::Ready>; fn upgrade_inbound(self, io: Negotiated, _: Self::Info) -> Self::Future { future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Server))) @@ -225,9 +231,9 @@ impl OutboundUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { - type Output = Yamux; + type Output = Yamux>>; type Error = io::Error; - type Future = future::Ready, Self::Error>>; + type Future = future::Ready>; fn upgrade_outbound(self, io: Negotiated, _: Self::Info) -> Self::Future { future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Client))) @@ -238,9 +244,9 @@ impl OutboundUpgrade for LocalConfig where C: AsyncRead + AsyncWrite + Unpin + 'static { - type Output = Yamux; + type Output = Yamux>>; type Error = io::Error; - type Future = future::Ready, Self::Error>>; + type Future = future::Ready>; fn upgrade_outbound(self, io: Negotiated, _: Self::Info) -> Self::Future { future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Client))) @@ -259,31 +265,49 @@ impl Into for YamuxError { } /// The [`futures::stream::Stream`] of incoming substreams. -pub struct Incoming(Pin> + Send>>); +pub struct Incoming { + stream: BoxStream<'static, Result>, + _marker: std::marker::PhantomData +} + +impl fmt::Debug for Incoming { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("Incoming") + } +} /// The [`futures::stream::Stream`] of incoming substreams (`!Send`). -pub struct LocalIncoming(Pin>>>); +pub struct LocalIncoming { + stream: LocalBoxStream<'static, Result>, + _marker: std::marker::PhantomData +} -impl Stream for Incoming { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll> { - self.0.poll_next_unpin(cx) - } - - fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() +impl fmt::Debug for LocalIncoming { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("LocalIncoming") } } -impl Stream for LocalIncoming { +impl Stream for Incoming { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll> { - self.0.poll_next_unpin(cx) + self.stream.as_mut().poll_next_unpin(cx) } fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() + self.stream.size_hint() + } +} + +impl Stream for LocalIncoming { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll> { + self.stream.as_mut().poll_next_unpin(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() } }