diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 03e59d47..335c2220 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -25,6 +25,7 @@ pub use crate::upgrade::Version; use crate::{ ConnectedPoint, ConnectionInfo, + Negotiated, transport::{ Transport, TransportError, @@ -106,8 +107,8 @@ where I: ConnectionInfo, C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, - U: OutboundUpgrade + Clone, + U: InboundUpgrade, Output = (I, D), Error = E>, + U: OutboundUpgrade, Output = (I, D), Error = E> + Clone, E: Error + 'static, { let version = self.version; @@ -138,8 +139,8 @@ where C: AsyncRead + AsyncWrite + Unpin, D: AsyncRead + AsyncWrite + Unpin, I: ConnectionInfo, - U: InboundUpgrade, - U: OutboundUpgrade + Clone, + U: InboundUpgrade, Output = D, Error = E>, + U: OutboundUpgrade, Output = D, Error = E> + Clone, E: Error + 'static, { Builder::new(Upgrade::new(self.inner, upgrade), self.version) @@ -166,8 +167,8 @@ where C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, I: ConnectionInfo, - U: InboundUpgrade, - U: OutboundUpgrade + Clone, + U: InboundUpgrade, Output = M, Error = E>, + U: OutboundUpgrade, Output = M, Error = E> + Clone, E: Error + 'static, { let version = self.version; @@ -185,7 +186,7 @@ where pub struct Authenticate where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + OutboundUpgrade + U: InboundUpgrade> + OutboundUpgrade> { inner: EitherUpgrade } @@ -193,9 +194,9 @@ where impl Future for Authenticate where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + OutboundUpgrade>::Output, - Error = >::Error + U: InboundUpgrade> + OutboundUpgrade, + Output = >>::Output, + Error = >>::Error > { type Output = as Future>::Output; @@ -212,7 +213,7 @@ where pub struct Multiplex where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + OutboundUpgrade, + U: InboundUpgrade> + OutboundUpgrade>, { info: Option, upgrade: EitherUpgrade, @@ -221,8 +222,8 @@ where impl Future for Multiplex where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, - U: OutboundUpgrade + U: InboundUpgrade, Output = M, Error = E>, + U: OutboundUpgrade, Output = M, Error = E> { type Output = Result<(I, M), UpgradeError>; @@ -239,7 +240,7 @@ where impl Unpin for Multiplex where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + OutboundUpgrade, + U: InboundUpgrade> + OutboundUpgrade>, { } @@ -266,8 +267,8 @@ where T::ListenerUpgrade: Unpin, T::Error: 'static, C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, - U: OutboundUpgrade + Clone, + U: InboundUpgrade, Output = D, Error = E>, + U: OutboundUpgrade, Output = D, Error = E> + Clone, E: Error + 'static { type Output = (I, D); @@ -333,7 +334,7 @@ where /// The [`Transport::Dial`] future of an [`Upgrade`]d transport. pub struct DialUpgradeFuture where - U: OutboundUpgrade, + U: OutboundUpgrade>, C: AsyncRead + AsyncWrite + Unpin, { future: F, @@ -344,7 +345,7 @@ impl Future for DialUpgradeFuture where F: TryFuture + Unpin, C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade, + U: OutboundUpgrade, Output = D>, U::Error: Error { type Output = Result<(I, D), TransportUpgradeError>; @@ -379,7 +380,7 @@ where impl Unpin for DialUpgradeFuture where - U: OutboundUpgrade, + U: OutboundUpgrade>, C: AsyncRead + AsyncWrite + Unpin, { } @@ -395,7 +396,7 @@ where S: TryStream> + Unpin, F: TryFuture, C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + Clone + U: InboundUpgrade, Output = D> + Clone { type Item = Result>, TransportUpgradeError>; @@ -425,7 +426,7 @@ impl Unpin for ListenerStream { pub struct ListenerUpgradeFuture where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + U: InboundUpgrade> { future: F, upgrade: future::Either, (Option, InboundUpgradeApply)> @@ -435,7 +436,7 @@ impl Future for ListenerUpgradeFuture where F: TryFuture + Unpin, C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, + U: InboundUpgrade, Output = D>, U::Error: Error { type Output = Result<(I, D), TransportUpgradeError>; @@ -471,6 +472,6 @@ where impl Unpin for ListenerUpgradeFuture where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + U: InboundUpgrade> { } diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index ae8abfa9..6756003b 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::ConnectedPoint; +use crate::{ConnectedPoint, Negotiated}; use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError, ProtocolName}; use futures::{future::Either, prelude::*, compat::Compat, compat::Compat01As03, compat::Future01CompatExt}; use log::debug; @@ -32,7 +32,7 @@ pub fn apply(conn: C, up: U, cp: ConnectedPoint, v: Version) -> Either, OutboundUpgradeApply> where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + OutboundUpgrade, + U: InboundUpgrade> + OutboundUpgrade>, { if cp.is_listener() { Either::Left(apply_inbound(conn, up)) @@ -45,7 +45,7 @@ where pub fn apply_inbound(conn: C, up: U) -> InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, + U: InboundUpgrade>, { let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>); let future = multistream_select::listener_select_proto(Compat::new(conn), iter).compat(); @@ -58,7 +58,7 @@ where pub fn apply_outbound(conn: C, up: U, v: Version) -> OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade + U: OutboundUpgrade> { let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>); let future = multistream_select::dialer_select_proto(Compat::new(conn), iter, v).compat(); @@ -71,7 +71,7 @@ where pub struct InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade + U: InboundUpgrade> { inner: InboundUpgradeApplyState } @@ -79,7 +79,7 @@ where enum InboundUpgradeApplyState where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, + U: InboundUpgrade>, { Init { future: Compat01As03, NameWrap>>, @@ -94,14 +94,14 @@ where impl Unpin for InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, + U: InboundUpgrade>, { } impl Future for InboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, + U: InboundUpgrade>, U::Future: Unpin, { type Output = Result>; @@ -148,7 +148,7 @@ where pub struct OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade + U: OutboundUpgrade> { inner: OutboundUpgradeApplyState } @@ -156,7 +156,7 @@ where enum OutboundUpgradeApplyState where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade + U: OutboundUpgrade> { Init { future: Compat01As03, NameWrapIter<::IntoIter>>>, @@ -171,14 +171,14 @@ where impl Unpin for OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade, + U: OutboundUpgrade>, { } impl Future for OutboundUpgradeApply where C: AsyncRead + AsyncWrite + Unpin, - U: OutboundUpgrade, + U: OutboundUpgrade>, U::Future: Unpin, { type Output = Result>; diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index 276d8782..93438e0b 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -18,7 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::Negotiated; use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use futures::future; use std::iter; @@ -43,7 +42,7 @@ impl InboundUpgrade for DeniedUpgrade { type Error = Void; type Future = future::Pending>; - fn upgrade_inbound(self, _: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future { future::pending() } } @@ -53,7 +52,7 @@ impl OutboundUpgrade for DeniedUpgrade { type Error = Void; type Future = future::Pending>; - fn upgrade_outbound(self, _: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { future::pending() } } diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index 9e6d0742..28db987c 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - Negotiated, either::{EitherOutput, EitherError, EitherFuture2, EitherName}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} }; @@ -56,7 +55,7 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { match (self, info) { (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_inbound(sock, info)), (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_inbound(sock, info)), @@ -74,7 +73,7 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { match (self, info) { (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info)), (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info)), diff --git a/core/src/upgrade/map.rs b/core/src/upgrade/map.rs index 50da58d9..d55971df 100644 --- a/core/src/upgrade/map.rs +++ b/core/src/upgrade/map.rs @@ -18,7 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::Negotiated; use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use futures::prelude::*; use std::{pin::Pin, task::Context, task::Poll}; @@ -54,7 +53,7 @@ where type Error = U::Error; type Future = MapFuture; - fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { MapFuture { inner: self.upgrade.upgrade_inbound(sock, info), map: Some(self.fun) @@ -70,7 +69,7 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { self.upgrade.upgrade_outbound(sock, info) } } @@ -105,7 +104,7 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { self.upgrade.upgrade_inbound(sock, info) } } @@ -119,7 +118,7 @@ where type Error = U::Error; type Future = MapFuture; - fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { MapFuture { inner: self.upgrade.upgrade_outbound(sock, info), map: Some(self.fun) @@ -158,7 +157,7 @@ where type Error = T; type Future = MapErrFuture; - fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { MapErrFuture { fut: self.upgrade.upgrade_inbound(sock, info), fun: Some(self.fun) @@ -174,7 +173,7 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { self.upgrade.upgrade_outbound(sock, info) } } @@ -210,7 +209,7 @@ where type Error = T; type Future = MapErrFuture; - fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { MapErrFuture { fut: self.upgrade.upgrade_outbound(sock, info), fun: Some(self.fun) @@ -226,7 +225,7 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { self.upgrade.upgrade_inbound(sock, info) } } diff --git a/core/src/upgrade/mod.rs b/core/src/upgrade/mod.rs index b0babe7c..e0f97170 100644 --- a/core/src/upgrade/mod.rs +++ b/core/src/upgrade/mod.rs @@ -150,7 +150,7 @@ pub trait InboundUpgrade: UpgradeInfo { /// method is called to start the handshake. /// /// The `info` is the identifier of the protocol, as produced by `protocol_info`. - fn upgrade_inbound(self, socket: Negotiated, info: Self::Info) -> Self::Future; + fn upgrade_inbound(self, socket: C, info: Self::Info) -> Self::Future; } /// Extension trait for `InboundUpgrade`. Automatically implemented on all types that implement @@ -190,7 +190,7 @@ pub trait OutboundUpgrade: UpgradeInfo { /// method is called to start the handshake. /// /// The `info` is the identifier of the protocol, as produced by `protocol_info`. - fn upgrade_outbound(self, socket: Negotiated, info: Self::Info) -> Self::Future; + fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future; } /// Extention trait for `OutboundUpgrade`. Automatically implemented on all types that implement diff --git a/core/src/upgrade/optional.rs b/core/src/upgrade/optional.rs index 618f8579..02dc3c48 100644 --- a/core/src/upgrade/optional.rs +++ b/core/src/upgrade/optional.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use crate::Negotiated; /// Upgrade that can be disabled at runtime. /// @@ -60,7 +59,7 @@ where type Error = T::Error; type Future = T::Future; - fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { if let Some(inner) = self.0 { inner.upgrade_inbound(sock, info) } else { @@ -77,7 +76,7 @@ where type Error = T::Error; type Future = T::Future; - fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { if let Some(inner) = self.0 { inner.upgrade_outbound(sock, info) } else { diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index 35d82042..8fa4c5b8 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - Negotiated, either::{EitherOutput, EitherError, EitherFuture2, EitherName}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} }; @@ -65,7 +64,7 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { match info { EitherName::A(info) => EitherFuture2::A(self.0.upgrade_inbound(sock, info)), EitherName::B(info) => EitherFuture2::B(self.1.upgrade_inbound(sock, info)) @@ -82,7 +81,7 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { match info { EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info)), EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info)) diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index 12e3e503..b4c732b5 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -23,7 +23,7 @@ mod util; use futures::prelude::*; use libp2p_core::identity; use libp2p_core::transport::{Transport, MemoryTransport}; -use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade}; +use libp2p_core::upgrade::{self, UpgradeInfo, InboundUpgrade, OutboundUpgrade}; use libp2p_mplex::MplexConfig; use libp2p_secio::SecioConfig; use multiaddr::{Multiaddr, Protocol}; @@ -46,11 +46,11 @@ impl InboundUpgrade for HelloUpgrade where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { - type Output = Negotiated; + type Output = C; type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_inbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, mut socket: C, _: Self::Info) -> Self::Future { Box::pin(async move { let mut buf = [0u8; 5]; socket.read_exact(&mut buf).await.unwrap(); @@ -64,11 +64,11 @@ impl OutboundUpgrade for HelloUpgrade where C: AsyncWrite + AsyncRead + Send + Unpin + 'static, { - type Output = Negotiated; + type Output = C; type Error = io::Error; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future { Box::pin(async move { socket.write_all(b"hello").await.unwrap(); socket.flush().await.unwrap(); diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index 452cc094..fbceef63 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -90,8 +90,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { quote!{Self: #net_behv_event_proc<<#ty as #trait_to_impl>::OutEvent>}, quote!{<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler: #protocols_handler}, // Note: this bound is required because of https://github.com/rust-lang/rust/issues/55697 - quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<#substream_generic>}, - quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<#substream_generic>}, + quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<::libp2p::core::Negotiated<#substream_generic>>}, + quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<::libp2p::core::Negotiated<#substream_generic>>}, ] }) .collect::>(); diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 30d00450..64c532f8 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -28,7 +28,7 @@ use bytes::Bytes; use libp2p_core::{ Endpoint, StreamMuxer, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}, + upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, }; use log::{debug, trace}; use parking_lot::Mutex; @@ -159,11 +159,11 @@ impl InboundUpgrade for MplexConfig where C: AsyncRead + AsyncWrite + Unpin, { - type Output = Multiplex>; + type Output = Multiplex; type Error = IoError; type Future = future::Ready>; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(self.upgrade(socket))) } } @@ -172,11 +172,11 @@ impl OutboundUpgrade for MplexConfig where C: AsyncRead + AsyncWrite + Unpin, { - type Output = Multiplex>; + type Output = Multiplex; type Error = IoError; type Future = future::Ready>; - fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { future::ready(Ok(self.upgrade(socket))) } } diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 507a1bea..07325331 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -22,7 +22,7 @@ //! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md). use futures::{future, prelude::*, ready, stream::{BoxStream, LocalBoxStream}}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use parking_lot::Mutex; use std::{fmt, io, iter, pin::Pin, task::Context}; use thiserror::Error; @@ -205,11 +205,11 @@ impl InboundUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { - type Output = Yamux>>; + type Output = Yamux>; type Error = io::Error; type Future = future::Ready>; - fn upgrade_inbound(self, io: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Server))) } } @@ -218,11 +218,11 @@ impl InboundUpgrade for LocalConfig where C: AsyncRead + AsyncWrite + Unpin + 'static { - type Output = Yamux>>; + type Output = Yamux>; type Error = io::Error; type Future = future::Ready>; - fn upgrade_inbound(self, io: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Server))) } } @@ -231,11 +231,11 @@ impl OutboundUpgrade for Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { - type Output = Yamux>>; + type Output = Yamux>; type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, io: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Client))) } } @@ -244,11 +244,11 @@ impl OutboundUpgrade for LocalConfig where C: AsyncRead + AsyncWrite + Unpin + 'static { - type Output = Yamux>>; + type Output = Yamux>; type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, io: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Client))) } } diff --git a/protocols/deflate/src/lib.rs b/protocols/deflate/src/lib.rs index 581900b4..32a82f24 100644 --- a/protocols/deflate/src/lib.rs +++ b/protocols/deflate/src/lib.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::{prelude::*, ready}; -use libp2p_core::{Negotiated, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::{io, iter, pin::Pin, task::Context, task::Poll}; #[derive(Debug, Copy, Clone)] @@ -48,11 +48,11 @@ impl InboundUpgrade for DeflateConfig where C: AsyncRead + AsyncWrite, { - type Output = DeflateOutput>; + type Output = DeflateOutput; type Error = io::Error; type Future = future::Ready>; - fn upgrade_inbound(self, r: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, r: C, _: Self::Info) -> Self::Future { future::ok(DeflateOutput::new(r, self.compression)) } } @@ -61,11 +61,11 @@ impl OutboundUpgrade for DeflateConfig where C: AsyncRead + AsyncWrite, { - type Output = DeflateOutput>; + type Output = DeflateOutput; type Error = io::Error; type Future = future::Ready>; - fn upgrade_outbound(self, w: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, w: C, _: Self::Info) -> Self::Future { future::ok(DeflateOutput::new(w, self.compression)) } } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 6b36f407..6a9ac91b 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -55,7 +55,7 @@ where type Error = FloodsubDecodeError; type Future = Pin> + Send>>; - fn upgrade_inbound(self, mut socket: upgrade::Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { let packet = upgrade::read_one(&mut socket, 2048).await?; let mut rpc: rpc_proto::RPC = protobuf::parse_from_bytes(&packet)?; @@ -171,7 +171,7 @@ where type Future = Pin> + Send>>; #[inline] - fn upgrade_outbound(self, mut socket: upgrade::Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { let bytes = self.into_bytes(); upgrade::write_one(&mut socket, bytes).await?; diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index da764bcd..5afff5c4 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -108,14 +108,14 @@ where fn inject_fully_negotiated_inbound( &mut self, - protocol: >::Output + protocol: >>::Output ) { self.events.push(IdentifyHandlerEvent::Identify(protocol)) } fn inject_fully_negotiated_outbound( &mut self, - protocol: >::Output, + protocol: >>::Output, _info: Self::OutboundOpenInfo, ) { self.events.push(IdentifyHandlerEvent::Identified(protocol)); diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index f768d574..4edb4a82 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -23,7 +23,7 @@ use futures::prelude::*; use libp2p_core::{ Multiaddr, PublicKey, - upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated} + upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo} }; use log::{debug, trace}; use protobuf::Message as ProtobufMessage; @@ -125,11 +125,11 @@ impl InboundUpgrade for IdentifyProtocolConfig where C: AsyncRead + AsyncWrite + Unpin, { - type Output = ReplySubstream>; + type Output = ReplySubstream; type Error = io::Error; type Future = future::Ready>; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { trace!("Upgrading inbound connection"); future::ok(ReplySubstream { inner: socket }) } @@ -143,7 +143,7 @@ where type Error = upgrade::ReadOneError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future { Box::pin(async move { socket.close().await?; let msg = upgrade::read_one(&mut socket, 4096).await?; diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 87a5fabf..59bf26c7 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -59,7 +59,7 @@ where next_connec_unique_id: UniqueConnecId, /// List of active substreams with the state they are in. - substreams: Vec, TUserData>>, + substreams: Vec>, /// Until when to keep the connection alive. keep_alive: KeepAlive, @@ -75,29 +75,29 @@ where OutPendingOpen(KadRequestMsg, Option), /// Waiting to send a message to the remote. OutPendingSend( - KadOutStreamSink, + KadOutStreamSink>, KadRequestMsg, Option, ), /// Waiting to flush the substream so that the data arrives to the remote. - OutPendingFlush(KadOutStreamSink, Option), + OutPendingFlush(KadOutStreamSink>, Option), /// Waiting for an answer back from the remote. // TODO: add timeout - OutWaitingAnswer(KadOutStreamSink, TUserData), + OutWaitingAnswer(KadOutStreamSink>, TUserData), /// An error happened on the substream and we should report the error to the user. OutReportError(KademliaHandlerQueryErr, TUserData), /// The substream is being closed. - OutClosing(KadOutStreamSink), + OutClosing(KadOutStreamSink>), /// Waiting for a request from the remote. - InWaitingMessage(UniqueConnecId, KadInStreamSink), + InWaitingMessage(UniqueConnecId, KadInStreamSink>), /// Waiting for the user to send a `KademliaHandlerIn` event containing the response. - InWaitingUser(UniqueConnecId, KadInStreamSink), + InWaitingUser(UniqueConnecId, KadInStreamSink>), /// Waiting to send an answer back to the remote. - InPendingSend(UniqueConnecId, KadInStreamSink, KadResponseMsg), + InPendingSend(UniqueConnecId, KadInStreamSink>, KadResponseMsg), /// Waiting to flush an answer back to the remote. - InPendingFlush(UniqueConnecId, KadInStreamSink), + InPendingFlush(UniqueConnecId, KadInStreamSink>), /// The substream is being closed. - InClosing(KadInStreamSink), + InClosing(KadInStreamSink>), } impl SubstreamState @@ -450,7 +450,7 @@ where fn inject_fully_negotiated_outbound( &mut self, - protocol: >::Output, + protocol: >>::Output, (msg, user_data): Self::OutboundOpenInfo, ) { self.substreams @@ -459,7 +459,7 @@ where fn inject_fully_negotiated_inbound( &mut self, - protocol: >::Output, + protocol: >>::Output, ) { // If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol` // is a `Void`. diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 645c151d..b1e79224 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -37,7 +37,7 @@ use crate::record::{self, Record}; use futures::prelude::*; use futures_codec::Framed; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use protobuf::{self, Message}; use std::{borrow::Cow, convert::TryFrom, time::Duration}; use std::{io, iter}; @@ -175,11 +175,11 @@ impl InboundUpgrade for KademliaProtocolConfig where C: AsyncRead + AsyncWrite + Unpin, { - type Output = KadInStreamSink>; + type Output = KadInStreamSink; type Future = future::Ready>; type Error = io::Error; - fn upgrade_inbound(self, incoming: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { let mut codec = UviBytes::default(); codec.set_max_len(4096); @@ -207,11 +207,11 @@ impl OutboundUpgrade for KademliaProtocolConfig where C: AsyncRead + AsyncWrite + Unpin, { - type Output = KadOutStreamSink>; + type Output = KadOutStreamSink; type Future = future::Ready>; type Error = io::Error; - fn upgrade_outbound(self, incoming: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { let mut codec = UviBytes::default(); codec.set_max_len(4096); diff --git a/protocols/noise/src/lib.rs b/protocols/noise/src/lib.rs index 1a9b4134..e5a54d20 100644 --- a/protocols/noise/src/lib.rs +++ b/protocols/noise/src/lib.rs @@ -63,7 +63,7 @@ pub use protocol::{Keypair, AuthenticKeypair, KeypairIdentity, PublicKey, Secret pub use protocol::{Protocol, ProtocolParams, x25519::X25519, IX, IK, XX}; use futures::prelude::*; -use libp2p_core::{identity, PeerId, UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated}; +use libp2p_core::{identity, PeerId, UpgradeInfo, InboundUpgrade, OutboundUpgrade}; use std::pin::Pin; use zeroize::Zeroize; @@ -162,11 +162,11 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, { - type Output = (RemoteIdentity, NoiseOutput>); + type Output = (RemoteIdentity, NoiseOutput); type Error = NoiseError; - type Future = Handshake, C>; + type Future = Handshake; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.dh_keys.secret().as_ref()) .build_responder() @@ -183,11 +183,11 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, { - type Output = (RemoteIdentity, NoiseOutput>); + type Output = (RemoteIdentity, NoiseOutput); type Error = NoiseError; - type Future = Handshake, C>; + type Future = Handshake; - fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.dh_keys.secret().as_ref()) .build_initiator() @@ -206,11 +206,11 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, { - type Output = (RemoteIdentity, NoiseOutput>); + type Output = (RemoteIdentity, NoiseOutput); type Error = NoiseError; - type Future = Handshake, C>; + type Future = Handshake; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.dh_keys.secret().as_ref()) .build_responder() @@ -227,11 +227,11 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, { - type Output = (RemoteIdentity, NoiseOutput>); + type Output = (RemoteIdentity, NoiseOutput); type Error = NoiseError; - type Future = Handshake, C>; + type Future = Handshake; - fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.dh_keys.secret().as_ref()) .build_initiator() @@ -250,11 +250,11 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, { - type Output = (RemoteIdentity, NoiseOutput>); + type Output = (RemoteIdentity, NoiseOutput); type Error = NoiseError; - type Future = Handshake, C>; + type Future = Handshake; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.dh_keys.secret().as_ref()) .build_responder() @@ -271,11 +271,11 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, { - type Output = (RemoteIdentity, NoiseOutput>); + type Output = (RemoteIdentity, NoiseOutput); type Error = NoiseError; - type Future = Handshake, C>; + type Future = Handshake; - fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.dh_keys.secret().as_ref()) .remote_public_key(self.remote.0.as_ref()) @@ -319,18 +319,18 @@ where impl InboundUpgrade for NoiseAuthenticated where NoiseConfig: UpgradeInfo + InboundUpgrade, NoiseOutput>), + Output = (RemoteIdentity, NoiseOutput), Error = NoiseError > + 'static, as InboundUpgrade>::Future: Send, T: AsyncRead + AsyncWrite + Send + 'static, C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, { - type Output = (PeerId, NoiseOutput>); + type Output = (PeerId, NoiseOutput); type Error = NoiseError; type Future = Pin> + Send>>; - fn upgrade_inbound(self, socket: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future { Box::pin(self.config.upgrade_inbound(socket, info) .and_then(|(remote, io)| match remote { RemoteIdentity::IdentityKey(pk) => future::ok((pk.into_peer_id(), io)), @@ -342,18 +342,18 @@ where impl OutboundUpgrade for NoiseAuthenticated where NoiseConfig: UpgradeInfo + OutboundUpgrade, NoiseOutput>), + Output = (RemoteIdentity, NoiseOutput), Error = NoiseError > + 'static, as OutboundUpgrade>::Future: Send, T: AsyncRead + AsyncWrite + Send + 'static, C: Protocol + AsRef<[u8]> + Zeroize + Send + 'static, { - type Output = (PeerId, NoiseOutput>); + type Output = (PeerId, NoiseOutput); type Error = NoiseError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: Negotiated, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future { Box::pin(self.config.upgrade_outbound(socket, info) .and_then(|(remote, io)| match remote { RemoteIdentity::IdentityKey(pk) => future::ok((pk.into_peer_id(), io)), diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index df729722..f7e9b90b 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::{future::BoxFuture, prelude::*}; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use log::debug; use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration}; @@ -61,7 +61,7 @@ where type Error = io::Error; type Future = BoxFuture<'static, Result<(), io::Error>>; - fn upgrade_inbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { async move { let mut payload = [0u8; 32]; socket.read_exact(&mut payload).await?; @@ -80,7 +80,7 @@ where type Error = io::Error; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { let payload: [u8; 32] = thread_rng().sample(distributions::Standard); debug!("Preparing ping payload {:?}", payload); async move { diff --git a/protocols/plaintext/src/lib.rs b/protocols/plaintext/src/lib.rs index e57139eb..23d447d4 100644 --- a/protocols/plaintext/src/lib.rs +++ b/protocols/plaintext/src/lib.rs @@ -31,7 +31,6 @@ use libp2p_core::{ InboundUpgrade, OutboundUpgrade, UpgradeInfo, - upgrade::Negotiated, PeerId, PublicKey, }; @@ -84,21 +83,21 @@ impl UpgradeInfo for PlainText1Config { } impl InboundUpgrade for PlainText1Config { - type Output = Negotiated; + type Output = C; type Error = Void; - type Future = Ready, Self::Error>>; + type Future = Ready>; - fn upgrade_inbound(self, i: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, i: C, _: Self::Info) -> Self::Future { future::ready(Ok(i)) } } impl OutboundUpgrade for PlainText1Config { - type Output = Negotiated; + type Output = C; type Error = Void; - type Future = Ready, Self::Error>>; + type Future = Ready>; - fn upgrade_outbound(self, i: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future { future::ready(Ok(i)) } } @@ -123,11 +122,11 @@ impl InboundUpgrade for PlainText2Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { - type Output = (PeerId, PlainTextOutput>); + type Output = (PeerId, PlainTextOutput); type Error = PlainTextError; type Future = BoxFuture<'static, Result>; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { Box::pin(self.handshake(socket)) } } @@ -136,11 +135,11 @@ impl OutboundUpgrade for PlainText2Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { - type Output = (PeerId, PlainTextOutput>); + type Output = (PeerId, PlainTextOutput); type Error = PlainTextError; type Future = BoxFuture<'static, Result>; - fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { Box::pin(self.handshake(socket)) } } diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index af55a279..d6d640e9 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -59,7 +59,7 @@ pub use self::error::SecioError; use futures::stream::MapErr as StreamMapErr; use futures::prelude::*; -use libp2p_core::{PeerId, PublicKey, identity, upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated}}; +use libp2p_core::{PeerId, PublicKey, identity, upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade}}; use log::debug; use rw_stream_sink::RwStreamSink; use std::{io, iter, pin::Pin, task::Context, task::Poll}; @@ -179,11 +179,11 @@ impl InboundUpgrade for SecioConfig where T: AsyncRead + AsyncWrite + Unpin + Send + 'static { - type Output = (PeerId, SecioOutput>); + type Output = (PeerId, SecioOutput); type Error = SecioError; type Future = Pin> + Send>>; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { Box::pin(self.handshake(socket)) } } @@ -192,11 +192,11 @@ impl OutboundUpgrade for SecioConfig where T: AsyncRead + AsyncWrite + Unpin + Send + 'static { - type Output = (PeerId, SecioOutput>); + type Output = (PeerId, SecioOutput); type Error = SecioError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { Box::pin(self.handshake(socket)) } } diff --git a/src/simple.rs b/src/simple.rs index b61f2e25..4604346b 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; +use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use bytes::Bytes; use futures::prelude::*; use std::{iter, sync::Arc}; @@ -66,14 +66,14 @@ impl UpgradeInfo for SimpleProtocol { impl InboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, - F: Fn(Negotiated) -> O, + F: Fn(C) -> O, O: Future> + Unpin { type Output = A; type Error = E; type Future = O; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket) } @@ -82,14 +82,14 @@ where impl OutboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, - F: Fn(Negotiated) -> O, + F: Fn(C) -> O, O: Future> + Unpin { type Output = A; type Error = E; type Future = O; - fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket) } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 37ced1cf..7a9c4e0c 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -80,7 +80,7 @@ pub use protocols_handler::{ use protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapper, NodeHandlerWrapperError}; use futures::prelude::*; use libp2p_core::{ - Transport, Multiaddr, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName, + Transport, Multiaddr, Negotiated, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName, muxing::StreamMuxer, nodes::{ ListenerId, @@ -217,18 +217,18 @@ where TBehaviour: NetworkBehaviour, <::Handler as ProtocolsHandler>::OutEvent: Send + 'static, <::Handler as ProtocolsHandler>::Error: Send + 'static, <::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - <::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade> + Send + 'static, + <::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade>> + Send + 'static, <<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static, <<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, <<<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Error: Send + 'static, - <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, - <::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade> + Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>>::Error: Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>>::Future: Send + 'static, + <::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade>> + Send + 'static, <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, <<<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, - <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Error: Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>>::Future: Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>>::Error: Send + 'static, ::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary TConnInfo: ConnectionInfo + fmt::Debug + Clone + Send + 'static, { @@ -512,15 +512,15 @@ where TBehaviour: NetworkBehaviour, <::Handler as ProtocolsHandler>::OutEvent: Send + 'static, <::Handler as ProtocolsHandler>::Error: Send + 'static, <::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - <::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade> + Send + 'static, - <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, - <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Error: Send + 'static, + <::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade>> + Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>>::Future: Send + 'static, + <<::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>>::Error: Send + 'static, <<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static, <<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, <<<::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - <::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade> + Send + 'static, - <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, - <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Error: Send + 'static, + <::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade>> + Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>>::Future: Send + 'static, + <<::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>>::Error: Send + 'static, <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, <<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, <<<::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, @@ -593,18 +593,18 @@ where TBehaviour: NetworkBehaviour, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent: Send + 'static, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error: Send + 'static, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary - <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade> + Send + 'static, + <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade>> + Send + 'static, <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static, <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, <<<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Error: Send + 'static, - <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, - <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade> + Send + 'static, + <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>>::Error: Send + 'static, + <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade>>>::Future: Send + 'static, + <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade>> + Send + 'static, <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, <<<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, - <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, - <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>::Error: Send + 'static, + <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>>::Future: Send + 'static, + <<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade>>>::Error: Send + 'static, ::ProtocolsHandler as IntoProtocolsHandler>::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary TConnInfo: ConnectionInfo + fmt::Debug + Clone + Send + 'static, { diff --git a/swarm/src/protocols_handler/dummy.rs b/swarm/src/protocols_handler/dummy.rs index f3c6052d..a1de1841 100644 --- a/swarm/src/protocols_handler/dummy.rs +++ b/swarm/src/protocols_handler/dummy.rs @@ -26,7 +26,7 @@ use crate::protocols_handler::{ ProtocolsHandlerUpgrErr }; use futures::prelude::*; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}; +use libp2p_core::{Negotiated, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}}; use std::{marker::PhantomData, task::Context, task::Poll}; use void::Void; @@ -64,14 +64,14 @@ where #[inline] fn inject_fully_negotiated_inbound( &mut self, - _: >::Output + _: >>::Output ) { } #[inline] fn inject_fully_negotiated_outbound( &mut self, - _: >::Output, + _: >>::Output, _: Self::OutboundOpenInfo ) { } @@ -80,7 +80,7 @@ where fn inject_event(&mut self, _: Self::InEvent) {} #[inline] - fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>>::Error>) {} #[inline] fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No } diff --git a/swarm/src/protocols_handler/map_in.rs b/swarm/src/protocols_handler/map_in.rs index dedae4a9..c80e264f 100644 --- a/swarm/src/protocols_handler/map_in.rs +++ b/swarm/src/protocols_handler/map_in.rs @@ -25,7 +25,7 @@ use crate::protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade}; +use libp2p_core::{Negotiated, upgrade::{InboundUpgrade, OutboundUpgrade}}; use std::{marker::PhantomData, task::Context, task::Poll}; /// Wrapper around a protocol handler that turns the input event into something else. @@ -68,7 +68,7 @@ where #[inline] fn inject_fully_negotiated_inbound( &mut self, - protocol: >::Output + protocol: >>::Output ) { self.inner.inject_fully_negotiated_inbound(protocol) } @@ -76,7 +76,7 @@ where #[inline] fn inject_fully_negotiated_outbound( &mut self, - protocol: >::Output, + protocol: >>::Output, info: Self::OutboundOpenInfo ) { self.inner.inject_fully_negotiated_outbound(protocol, info) @@ -90,7 +90,7 @@ where } #[inline] - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>>::Error>) { self.inner.inject_dial_upgrade_error(info, error) } diff --git a/swarm/src/protocols_handler/map_out.rs b/swarm/src/protocols_handler/map_out.rs index 4bc04791..fd521d0d 100644 --- a/swarm/src/protocols_handler/map_out.rs +++ b/swarm/src/protocols_handler/map_out.rs @@ -25,7 +25,7 @@ use crate::protocols_handler::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade}; +use libp2p_core::{Negotiated, upgrade::{InboundUpgrade, OutboundUpgrade}}; use std::task::{Context, Poll}; /// Wrapper around a protocol handler that turns the output event into something else. @@ -66,7 +66,7 @@ where #[inline] fn inject_fully_negotiated_inbound( &mut self, - protocol: >::Output + protocol: >>::Output ) { self.inner.inject_fully_negotiated_inbound(protocol) } @@ -74,7 +74,7 @@ where #[inline] fn inject_fully_negotiated_outbound( &mut self, - protocol: >::Output, + protocol: >>::Output, info: Self::OutboundOpenInfo ) { self.inner.inject_fully_negotiated_outbound(protocol, info) @@ -86,7 +86,7 @@ where } #[inline] - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>>::Error>) { self.inner.inject_dial_upgrade_error(info, error) } diff --git a/swarm/src/protocols_handler/mod.rs b/swarm/src/protocols_handler/mod.rs index f1401c8e..ef339fa4 100644 --- a/swarm/src/protocols_handler/mod.rs +++ b/swarm/src/protocols_handler/mod.rs @@ -47,6 +47,7 @@ mod select; use futures::prelude::*; use libp2p_core::{ ConnectedPoint, + Negotiated, PeerId, upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeError}, }; @@ -102,9 +103,9 @@ pub trait ProtocolsHandler { /// The type of substreams on which the protocol(s) are negotiated. type Substream: AsyncRead + AsyncWrite + Unpin; /// The inbound upgrade for the protocol(s) used by the handler. - type InboundProtocol: InboundUpgrade; + type InboundProtocol: InboundUpgrade>; /// The outbound upgrade for the protocol(s) used by the handler. - type OutboundProtocol: OutboundUpgrade; + type OutboundProtocol: OutboundUpgrade>; /// The type of additional information passed to an `OutboundSubstreamRequest`. type OutboundOpenInfo; @@ -120,7 +121,7 @@ pub trait ProtocolsHandler { /// Injects the output of a successful upgrade on a new inbound substream. fn inject_fully_negotiated_inbound( &mut self, - protocol: >::Output + protocol: >>::Output ); /// Injects the output of a successful upgrade on a new outbound substream. @@ -129,7 +130,7 @@ pub trait ProtocolsHandler { /// [`ProtocolsHandlerEvent::OutboundSubstreamRequest`]. fn inject_fully_negotiated_outbound( &mut self, - protocol: >::Output, + protocol: >>::Output, info: Self::OutboundOpenInfo ); @@ -141,7 +142,7 @@ pub trait ProtocolsHandler { &mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< - >::Error + >>::Error > ); diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 40da87d0..5375a734 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -26,7 +26,7 @@ use crate::protocols_handler::{ SubstreamProtocol }; use futures::prelude::*; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade}; +use libp2p_core::{Negotiated, upgrade::{InboundUpgrade, OutboundUpgrade}}; use smallvec::SmallVec; use std::{error, marker::PhantomData, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; @@ -37,13 +37,13 @@ use wasm_timer::Instant; // TODO: Debug pub struct OneShotHandler where - TOutProto: OutboundUpgrade, + TOutProto: OutboundUpgrade>, { /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. pending_error: - Option>::Error>>, + Option>>::Error>>, /// Queue of events to produce in `poll()`. events_out: SmallVec<[TOutEvent; 4]>, /// Queue of outbound substreams to open. @@ -63,7 +63,7 @@ where impl OneShotHandler where - TOutProto: OutboundUpgrade, + TOutProto: OutboundUpgrade>, { /// Creates a `OneShotHandler`. #[inline] @@ -119,8 +119,8 @@ where impl Default for OneShotHandler where - TOutProto: OutboundUpgrade, - TInProto: InboundUpgrade + Default, + TOutProto: OutboundUpgrade>, + TInProto: InboundUpgrade> + Default, { #[inline] fn default() -> Self { @@ -132,8 +132,8 @@ impl ProtocolsHandler for OneShotHandler where TSubstream: AsyncRead + AsyncWrite + Unpin, - TInProto: InboundUpgrade, - TOutProto: OutboundUpgrade, + TInProto: InboundUpgrade>, + TOutProto: OutboundUpgrade>, TInProto::Output: Into, TOutProto::Output: Into, TOutProto::Error: error::Error + 'static, @@ -142,7 +142,7 @@ where type InEvent = TOutProto; type OutEvent = TOutEvent; type Error = ProtocolsHandlerUpgrErr< - >::Error, + >>::Error, >; type Substream = TSubstream; type InboundProtocol = TInProto; @@ -157,7 +157,7 @@ where #[inline] fn inject_fully_negotiated_inbound( &mut self, - out: >::Output, + out: >>::Output, ) { // If we're shutting down the connection for inactivity, reset the timeout. if !self.keep_alive.is_yes() { @@ -170,7 +170,7 @@ where #[inline] fn inject_fully_negotiated_outbound( &mut self, - out: >::Output, + out: >>::Output, _: Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; @@ -192,7 +192,7 @@ where &mut self, _: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< - >::Error, + >>::Error, >, ) { if self.pending_error.is_none() { diff --git a/swarm/src/protocols_handler/select.rs b/swarm/src/protocols_handler/select.rs index f80ebfcf..b9ddc9e1 100644 --- a/swarm/src/protocols_handler/select.rs +++ b/swarm/src/protocols_handler/select.rs @@ -29,6 +29,7 @@ use crate::protocols_handler::{ use futures::prelude::*; use libp2p_core::{ ConnectedPoint, + Negotiated, PeerId, either::{EitherError, EitherOutput}, upgrade::{InboundUpgrade, OutboundUpgrade, EitherUpgrade, SelectUpgrade, UpgradeError} @@ -62,10 +63,10 @@ where TProto1::Handler: ProtocolsHandler, TProto2::Handler: ProtocolsHandler, TSubstream: AsyncRead + AsyncWrite + Unpin, - ::InboundProtocol: InboundUpgrade, - ::InboundProtocol: InboundUpgrade, - ::OutboundProtocol: OutboundUpgrade, - ::OutboundProtocol: OutboundUpgrade + ::InboundProtocol: InboundUpgrade>, + ::InboundProtocol: InboundUpgrade>, + ::OutboundProtocol: OutboundUpgrade>, + ::OutboundProtocol: OutboundUpgrade> { type Handler = ProtocolsHandlerSelect; @@ -107,10 +108,10 @@ where TProto1: ProtocolsHandler, TProto2: ProtocolsHandler, TSubstream: AsyncRead + AsyncWrite + Unpin, - TProto1::InboundProtocol: InboundUpgrade, - TProto2::InboundProtocol: InboundUpgrade, - TProto1::OutboundProtocol: OutboundUpgrade, - TProto2::OutboundProtocol: OutboundUpgrade + TProto1::InboundProtocol: InboundUpgrade>, + TProto2::InboundProtocol: InboundUpgrade>, + TProto1::OutboundProtocol: OutboundUpgrade>, + TProto2::OutboundProtocol: OutboundUpgrade> { type InEvent = EitherOutput; type OutEvent = EitherOutput; @@ -129,7 +130,7 @@ where SubstreamProtocol::new(choice).with_timeout(timeout) } - fn inject_fully_negotiated_outbound(&mut self, protocol: >::Output, endpoint: Self::OutboundOpenInfo) { + fn inject_fully_negotiated_outbound(&mut self, protocol: >>::Output, endpoint: Self::OutboundOpenInfo) { match (protocol, endpoint) { (EitherOutput::First(protocol), EitherOutput::First(info)) => self.proto1.inject_fully_negotiated_outbound(protocol, info), @@ -142,7 +143,7 @@ where } } - fn inject_fully_negotiated_inbound(&mut self, protocol: >::Output) { + fn inject_fully_negotiated_inbound(&mut self, protocol: >>::Output) { match protocol { EitherOutput::First(protocol) => self.proto1.inject_fully_negotiated_inbound(protocol), @@ -160,7 +161,7 @@ where } #[inline] - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>>::Error>) { match (info, error) { (EitherOutput::First(info), ProtocolsHandlerUpgrErr::Timer) => { self.proto1.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer) diff --git a/swarm/src/toggle.rs b/swarm/src/toggle.rs index c4e42e35..e62a20e0 100644 --- a/swarm/src/toggle.rs +++ b/swarm/src/toggle.rs @@ -31,6 +31,7 @@ use libp2p_core::{ ConnectedPoint, PeerId, Multiaddr, + Negotiated, either::EitherOutput, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade, EitherUpgrade} }; @@ -206,7 +207,7 @@ where fn inject_fully_negotiated_inbound( &mut self, - out: >::Output + out: >>::Output ) { let out = match out { EitherOutput::First(out) => out, @@ -219,7 +220,7 @@ where fn inject_fully_negotiated_outbound( &mut self, - out: >::Output, + out: >>::Output, info: Self::OutboundOpenInfo ) { self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED") @@ -231,7 +232,7 @@ where .inject_event(event) } - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<>::Error>) { + fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<>>::Error>) { self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED") .inject_dial_upgrade_error(info, err) }