Merge pull request #1350 from twittner/phantom

Bring back phantom types to yamux upgrade outputs.
This commit is contained in:
Pierre Krieger
2019-12-12 14:58:00 +01:00
committed by GitHub

View File

@ -21,7 +21,7 @@
//! Implements the Yamux multiplexing protocol for libp2p, see also the
//! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md).
use futures::{future, prelude::*, ready};
use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
use parking_lot::Mutex;
use std::{fmt, io, iter, pin::Pin, task::Context};
@ -49,17 +49,20 @@ struct Inner<S> {
#[derive(Debug)]
pub struct OpenSubstreamToken(());
impl Yamux<Incoming> {
/// Create a new Yamux connection.
pub fn new<C>(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self
where
impl<C> Yamux<Incoming<C>>
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
{
/// Create a new Yamux connection.
pub fn new(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self {
cfg.set_read_after_close(false);
let conn = yamux::Connection::new(io, cfg, mode);
let ctrl = conn.control();
let inner = Inner {
incoming: Incoming(Box::pin(yamux::into_stream(conn).err_into())),
incoming: Incoming {
stream: yamux::into_stream(conn).err_into().boxed(),
_marker: std::marker::PhantomData
},
control: ctrl,
acknowledged: false
};
@ -67,17 +70,20 @@ impl Yamux<Incoming> {
}
}
impl Yamux<LocalIncoming> {
/// Create a new Yamux connection (which is ![`Send`]).
pub fn local<C>(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self
where
impl<C> Yamux<LocalIncoming<C>>
where
C: AsyncRead + AsyncWrite + Unpin + 'static
{
{
/// Create a new Yamux connection (which is ![`Send`]).
pub fn local(io: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self {
cfg.set_read_after_close(false);
let conn = yamux::Connection::new(io, cfg, mode);
let ctrl = conn.control();
let inner = Inner {
incoming: LocalIncoming(Box::pin(yamux::into_stream(conn).err_into())),
incoming: LocalIncoming {
stream: yamux::into_stream(conn).err_into().boxed_local(),
_marker: std::marker::PhantomData
},
control: ctrl,
acknowledged: false
};
@ -199,9 +205,9 @@ impl<C> InboundUpgrade<C> for Config
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
type Output = Yamux<Incoming>;
type Output = Yamux<Incoming<Negotiated<C>>>;
type Error = io::Error;
type Future = future::Ready<Result<Yamux<Incoming>, Self::Error>>;
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, io: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Server)))
@ -212,9 +218,9 @@ impl<C> InboundUpgrade<C> for LocalConfig
where
C: AsyncRead + AsyncWrite + Unpin + 'static
{
type Output = Yamux<LocalIncoming>;
type Output = Yamux<LocalIncoming<Negotiated<C>>>;
type Error = io::Error;
type Future = future::Ready<Result<Yamux<LocalIncoming>, Self::Error>>;
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_inbound(self, io: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Server)))
@ -225,9 +231,9 @@ impl<C> OutboundUpgrade<C> for Config
where
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
type Output = Yamux<Incoming>;
type Output = Yamux<Incoming<Negotiated<C>>>;
type Error = io::Error;
type Future = future::Ready<Result<Yamux<Incoming>, Self::Error>>;
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, io: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Client)))
@ -238,9 +244,9 @@ impl<C> OutboundUpgrade<C> for LocalConfig
where
C: AsyncRead + AsyncWrite + Unpin + 'static
{
type Output = Yamux<LocalIncoming>;
type Output = Yamux<LocalIncoming<Negotiated<C>>>;
type Error = io::Error;
type Future = future::Ready<Result<Yamux<LocalIncoming>, Self::Error>>;
type Future = future::Ready<Result<Self::Output, Self::Error>>;
fn upgrade_outbound(self, io: Negotiated<C>, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Client)))
@ -259,31 +265,49 @@ impl Into<io::Error> for YamuxError {
}
/// The [`futures::stream::Stream`] of incoming substreams.
pub struct Incoming(Pin<Box<dyn Stream<Item = Result<yamux::Stream, YamuxError>> + Send>>);
pub struct Incoming<T> {
stream: BoxStream<'static, Result<yamux::Stream, YamuxError>>,
_marker: std::marker::PhantomData<T>
}
impl<T> fmt::Debug for Incoming<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Incoming")
}
}
/// The [`futures::stream::Stream`] of incoming substreams (`!Send`).
pub struct LocalIncoming(Pin<Box<dyn Stream<Item = Result<yamux::Stream, YamuxError>>>>);
pub struct LocalIncoming<T> {
stream: LocalBoxStream<'static, Result<yamux::Stream, YamuxError>>,
_marker: std::marker::PhantomData<T>
}
impl Stream for Incoming {
type Item = Result<yamux::Stream, YamuxError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
impl<T> fmt::Debug for LocalIncoming<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("LocalIncoming")
}
}
impl Stream for LocalIncoming {
impl<T: Unpin> Stream for Incoming<T> {
type Item = Result<yamux::Stream, YamuxError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
self.stream.as_mut().poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
self.stream.size_hint()
}
}
impl<T: Unpin> Stream for LocalIncoming<T> {
type Item = Result<yamux::Stream, YamuxError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> std::task::Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next_unpin(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}