diff --git a/core/src/lib.rs b/core/src/lib.rs index e296c8d2..a2ac1b34 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -61,6 +61,7 @@ /// Multi-address re-export. pub use multiaddr; +pub use multistream_select::Negotiated; mod keys_proto; mod peer_id; diff --git a/core/src/protocols_handler/one_shot.rs b/core/src/protocols_handler/one_shot.rs index c0a50456..88601125 100644 --- a/core/src/protocols_handler/one_shot.rs +++ b/core/src/protocols_handler/one_shot.rs @@ -188,7 +188,7 @@ where self.pending_error = Some(error); } } - + #[inline] fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index d1de858e..9dec47ee 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -20,6 +20,7 @@ use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use futures::future; +use multistream_select::Negotiated; use std::iter; use void::Void; @@ -42,7 +43,7 @@ impl InboundUpgrade for DeniedUpgrade { type Error = Void; type Future = future::Empty; - fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, _: Negotiated, _: Self::Info) -> Self::Future { future::empty() } } @@ -52,7 +53,7 @@ impl OutboundUpgrade for DeniedUpgrade { type Error = Void; type Future = future::Empty; - fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, _: Negotiated, _: Self::Info) -> Self::Future { future::empty() } } diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index 28db987c..bf3d86b8 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -22,6 +22,7 @@ use crate::{ either::{EitherOutput, EitherError, EitherFuture2, EitherName}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} }; +use multistream_select::Negotiated; /// A type to represent two possible upgrade types (inbound or outbound). #[derive(Debug, Clone)] @@ -55,7 +56,7 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { match (self, info) { (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_inbound(sock, info)), (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_inbound(sock, info)), @@ -73,7 +74,7 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { match (self, info) { (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info)), (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info)), diff --git a/core/src/upgrade/map.rs b/core/src/upgrade/map.rs index d8415b5f..ee17b845 100644 --- a/core/src/upgrade/map.rs +++ b/core/src/upgrade/map.rs @@ -20,6 +20,7 @@ use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use futures::{prelude::*, try_ready}; +use multistream_select::Negotiated; /// Wraps around an upgrade and applies a closure to the output. #[derive(Debug, Clone)] @@ -52,7 +53,7 @@ where type Error = U::Error; type Future = MapFuture; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { MapFuture { inner: self.upgrade.upgrade_inbound(sock, info), map: Some(self.fun) @@ -68,7 +69,7 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { self.upgrade.upgrade_outbound(sock, info) } } @@ -103,7 +104,7 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { self.upgrade.upgrade_inbound(sock, info) } } @@ -117,7 +118,7 @@ where type Error = U::Error; type Future = MapFuture; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { MapFuture { inner: self.upgrade.upgrade_outbound(sock, info), map: Some(self.fun) @@ -156,7 +157,7 @@ where type Error = T; type Future = MapErrFuture; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { MapErrFuture { fut: self.upgrade.upgrade_inbound(sock, info), fun: Some(self.fun) @@ -172,7 +173,7 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { self.upgrade.upgrade_outbound(sock, info) } } @@ -208,7 +209,7 @@ where type Error = T; type Future = MapErrFuture; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { MapErrFuture { fut: self.upgrade.upgrade_outbound(sock, info), fun: Some(self.fun) @@ -224,7 +225,7 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { self.upgrade.upgrade_inbound(sock, info) } } diff --git a/core/src/upgrade/mod.rs b/core/src/upgrade/mod.rs index 50032bd3..e3087745 100644 --- a/core/src/upgrade/mod.rs +++ b/core/src/upgrade/mod.rs @@ -67,6 +67,7 @@ mod transfer; use futures::future::Future; +pub use multistream_select::Negotiated; pub use self::{ apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply}, denied::DeniedUpgrade, @@ -114,7 +115,7 @@ pub trait InboundUpgrade: UpgradeInfo { /// method is called to start the handshake. /// /// The `info` is the identifier of the protocol, as produced by `protocol_info`. - fn upgrade_inbound(self, socket: C, info: Self::Info) -> Self::Future; + fn upgrade_inbound(self, socket: Negotiated, info: Self::Info) -> Self::Future; } /// Extension trait for `InboundUpgrade`. Automatically implemented on all types that implement @@ -154,7 +155,7 @@ pub trait OutboundUpgrade: UpgradeInfo { /// method is called to start the handshake. /// /// The `info` is the identifier of the protocol, as produced by `protocol_info`. - fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future; + fn upgrade_outbound(self, socket: Negotiated, info: Self::Info) -> Self::Future; } /// Extention trait for `OutboundUpgrade`. Automatically implemented on all types that implement diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index 8fa4c5b8..61c3ec5e 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -22,6 +22,7 @@ use crate::{ either::{EitherOutput, EitherError, EitherFuture2, EitherName}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} }; +use multistream_select::Negotiated; /// Upgrade that combines two upgrades into one. Supports all the protocols supported by either /// sub-upgrade. @@ -64,7 +65,7 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_inbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { match info { EitherName::A(info) => EitherFuture2::A(self.0.upgrade_inbound(sock, info)), EitherName::B(info) => EitherFuture2::B(self.1.upgrade_inbound(sock, info)) @@ -81,7 +82,7 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + fn upgrade_outbound(self, sock: Negotiated, info: Self::Info) -> Self::Future { match info { EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info)), EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info)) diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 8e591168..bbd40c1a 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -31,7 +31,7 @@ use crate::protocol::{ use log::trace; use std::mem; use tokio_io::{AsyncRead, AsyncWrite}; -use crate::ProtocolChoiceError; +use crate::{Negotiated, ProtocolChoiceError}; /// Future, returned by `dialer_select_proto`, which selects a protocol and dialer /// either sequentially of by considering all protocols in parallel. @@ -125,7 +125,7 @@ where I: Iterator, I::Item: AsRef<[u8]> + Clone { - type Item = (I::Item, R); + type Item = (I::Item, Negotiated); type Error = ProtocolChoiceError; fn poll(&mut self) -> Poll { @@ -207,7 +207,7 @@ where ListenerToDialerMessage::ProtocolAck { ref name } if name.as_ref() == proto_name.as_ref() => { - return Ok(Async::Ready((proto_name, r.into_inner()))) + return Ok(Async::Ready((proto_name, Negotiated(r.into_inner())))) } ListenerToDialerMessage::NotAvailable => { let proto_name = protocols.next() @@ -300,7 +300,7 @@ where I: Iterator, I::Item: AsRef<[u8]> + Clone { - type Item = (I::Item, R); + type Item = (I::Item, Negotiated); type Error = ProtocolChoiceError; fn poll(&mut self) -> Poll { @@ -423,7 +423,7 @@ where Some(ListenerToDialerMessage::ProtocolAck { ref name }) if name.as_ref() == proto_name.as_ref() => { - return Ok(Async::Ready((proto_name, dialer.into_inner()))) + return Ok(Async::Ready((proto_name, Negotiated(dialer.into_inner())))) } _ => return Err(ProtocolChoiceError::UnexpectedMessage) } diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 984c4d17..7d50555d 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -74,6 +74,49 @@ mod tests; mod protocol; +use futures::prelude::*; +use std::io; + pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture}; pub use self::error::ProtocolChoiceError; pub use self::listener_select::{listener_select_proto, ListenerSelectFuture}; + +/// A stream after it has been negotiated. +pub struct Negotiated(pub(crate) TInner); + +impl io::Read for Negotiated +where + TInner: io::Read +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.read(buf) + } +} + +impl tokio_io::AsyncRead for Negotiated +where + TInner: tokio_io::AsyncRead +{ +} + +impl io::Write for Negotiated +where + TInner: io::Write +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl tokio_io::AsyncWrite for Negotiated +where + TInner: tokio_io::AsyncWrite +{ + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.0.shutdown() + } +} diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index 261155ae..59492dc0 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -31,7 +31,7 @@ use crate::protocol::{ use log::{debug, trace}; use std::mem; use tokio_io::{AsyncRead, AsyncWrite}; -use crate::ProtocolChoiceError; +use crate::{Negotiated, ProtocolChoiceError}; /// Helps selecting a protocol amongst the ones supported. /// @@ -99,7 +99,7 @@ where for<'a> &'a I: IntoIterator, X: AsRef<[u8]> + Clone { - type Item = (X, R, I); + type Item = (X, Negotiated, I); type Error = ProtocolChoiceError; fn poll(&mut self) -> Poll { @@ -171,7 +171,7 @@ where } }; if let Some(p) = outcome { - return Ok(Async::Ready((p, listener.into_inner(), protocols))) + return Ok(Async::Ready((p, Negotiated(listener.into_inner()), protocols))) } else { let stream = listener.into_future(); self.inner = ListenerSelectState::Incoming { stream, protocols } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 37306f52..d5f93e64 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -27,7 +27,7 @@ use bytes::Bytes; use libp2p_core::{ Endpoint, StreamMuxer, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated} }; use log::{debug, trace}; use parking_lot::Mutex; @@ -158,11 +158,11 @@ impl InboundUpgrade for MplexConfig where C: AsyncRead + AsyncWrite, { - type Output = Multiplex; + type Output = Multiplex>; type Error = IoError; type Future = future::FutureResult; - fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { future::ok(self.upgrade(socket)) } } @@ -171,11 +171,11 @@ impl OutboundUpgrade for MplexConfig where C: AsyncRead + AsyncWrite, { - type Output = Multiplex; + type Output = Multiplex>; type Error = IoError; type Future = future::FutureResult; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { future::ok(self.upgrade(socket)) } } diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 97c8ca16..fbca83d9 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -22,7 +22,7 @@ //! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md). use futures::{future::{self, FutureResult}, prelude::*}; -use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use log::debug; use std::{io, iter, sync::atomic}; use std::io::{Error as IoError}; @@ -155,11 +155,11 @@ impl InboundUpgrade for Config where C: AsyncRead + AsyncWrite + 'static, { - type Output = Yamux; + type Output = Yamux>; type Error = io::Error; - type Future = FutureResult, io::Error>; + type Future = FutureResult>, io::Error>; - fn upgrade_inbound(self, i: C, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, i: Negotiated, _: Self::Info) -> Self::Future { future::ok(Yamux::new(i, self.0, yamux::Mode::Server)) } } @@ -168,11 +168,11 @@ impl OutboundUpgrade for Config where C: AsyncRead + AsyncWrite + 'static, { - type Output = Yamux; + type Output = Yamux>; type Error = io::Error; - type Future = FutureResult, io::Error>; + type Future = FutureResult>, io::Error>; - fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, i: Negotiated, _: Self::Info) -> Self::Future { future::ok(Yamux::new(i, self.0, yamux::Mode::Client)) } } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 5c72794b..532a0f88 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -53,10 +53,10 @@ where { type Output = FloodsubRpc; type Error = FloodsubDecodeError; - type Future = upgrade::ReadOneThen, ()) -> Result>; + type Future = upgrade::ReadOneThen, (), fn(Vec, ()) -> Result>; #[inline] - fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: upgrade::Negotiated, _: Self::Info) -> Self::Future { upgrade::read_one_then(socket, 2048, (), |packet, ()| { let mut rpc: rpc_proto::RPC = protobuf::parse_from_bytes(&packet)?; @@ -168,10 +168,10 @@ where { type Output = (); type Error = io::Error; - type Future = upgrade::WriteOne; + type Future = upgrade::WriteOne>; #[inline] - fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: upgrade::Negotiated, _: Self::Info) -> Self::Future { let bytes = self.into_bytes(); upgrade::write_one(socket, bytes) } diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index fdf2ea90..ece74624 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -24,7 +24,7 @@ use crate::protocol::{IdentifyInfo, IdentifySender, IdentifySenderFuture}; use futures::prelude::*; use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerSelect, ProtocolsHandlerUpgrErr}; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p_core::{Multiaddr, PeerId, PublicKey, either::EitherOutput}; +use libp2p_core::{Multiaddr, PeerId, PublicKey, either::EitherOutput, upgrade::Negotiated}; use smallvec::SmallVec; use std::{collections::HashMap, collections::VecDeque, io}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -42,9 +42,9 @@ pub struct Identify { /// For each peer we're connected to, the observed address to send back to it. observed_addresses: HashMap, /// List of senders to answer, with the observed multiaddr. - to_answer: SmallVec<[(PeerId, IdentifySender, Multiaddr); 4]>, + to_answer: SmallVec<[(PeerId, IdentifySender>, Multiaddr); 4]>, /// List of futures that send back information back to remotes. - futures: SmallVec<[(PeerId, IdentifySenderFuture); 4]>, + futures: SmallVec<[(PeerId, IdentifySenderFuture>); 4]>, /// Events that need to be produced outside when polling.. events: VecDeque, IdentifyEvent>>, } diff --git a/protocols/identify/src/listen_handler.rs b/protocols/identify/src/listen_handler.rs index eebaa793..ca57b101 100644 --- a/protocols/identify/src/listen_handler.rs +++ b/protocols/identify/src/listen_handler.rs @@ -22,7 +22,7 @@ use crate::protocol::{IdentifySender, IdentifyProtocolConfig}; use futures::prelude::*; use libp2p_core::{ protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, - upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade} + upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade, Negotiated} }; use smallvec::SmallVec; use tokio_io::{AsyncRead, AsyncWrite}; @@ -34,7 +34,7 @@ pub struct IdentifyListenHandler { config: IdentifyProtocolConfig, /// List of senders to yield to the user. - pending_result: SmallVec<[IdentifySender; 4]>, + pending_result: SmallVec<[IdentifySender>; 4]>, } impl IdentifyListenHandler { @@ -53,7 +53,7 @@ where TSubstream: AsyncRead + AsyncWrite, { type InEvent = Void; - type OutEvent = IdentifySender; + type OutEvent = IdentifySender>; type Error = Void; type Substream = TSubstream; type InboundProtocol = IdentifyProtocolConfig; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 22454a8c..27f84c4b 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -24,7 +24,7 @@ use futures::{future::{self, FutureResult}, Async, AsyncSink, Future, Poll, Sink use futures::try_ready; use libp2p_core::{ Multiaddr, PublicKey, - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} + upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated} }; use log::{debug, trace}; use protobuf::Message as ProtobufMessage; @@ -150,11 +150,11 @@ impl InboundUpgrade for IdentifyProtocolConfig where C: AsyncRead + AsyncWrite, { - type Output = IdentifySender; + type Output = IdentifySender>; type Error = IoError; type Future = FutureResult; - fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { trace!("Upgrading inbound connection"); let socket = Framed::new(socket, codec::UviBytes::default()); let sender = IdentifySender { inner: socket }; @@ -168,9 +168,9 @@ where { type Output = RemoteInfo; type Error = IoError; - type Future = IdentifyOutboundFuture; + type Future = IdentifyOutboundFuture>; - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { IdentifyOutboundFuture { inner: Framed::new(socket, codec::UviBytes::::default()), shutdown: false, diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 00f66a53..1d765830 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -24,7 +24,7 @@ use crate::protocol::{ }; use futures::prelude::*; use libp2p_core::protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; -use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId}; +use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId, upgrade::Negotiated}; use multihash::Multihash; use std::{error, fmt, io, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -49,7 +49,7 @@ where next_connec_unique_id: UniqueConnecId, /// List of active substreams with the state they are in. - substreams: Vec>, + substreams: Vec, TUserData>>, /// Until when to keep the connection alive. keep_alive: KeepAlive, diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index e401e3ef..5efc0fcb 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -29,7 +29,7 @@ use bytes::BytesMut; use crate::protobuf_structs; use futures::{future, sink, stream, Sink, Stream}; -use libp2p_core::{InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId, UpgradeInfo, upgrade::Negotiated}; use multihash::Multihash; use protobuf::{self, Message}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; @@ -153,12 +153,12 @@ impl InboundUpgrade for KademliaProtocolConfig where C: AsyncRead + AsyncWrite, { - type Output = KadInStreamSink; + type Output = KadInStreamSink>; type Future = future::FutureResult; type Error = IoError; #[inline] - fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, incoming: Negotiated, _: Self::Info) -> Self::Future { let mut codec = codec::UviBytes::default(); codec.set_max_len(4096); @@ -182,12 +182,12 @@ impl OutboundUpgrade for KademliaProtocolConfig where C: AsyncRead + AsyncWrite, { - type Output = KadOutStreamSink; + type Output = KadOutStreamSink>; type Future = future::FutureResult; type Error = IoError; #[inline] - fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, incoming: Negotiated, _: Self::Info) -> Self::Future { let mut codec = codec::UviBytes::default(); codec.set_max_len(4096); diff --git a/protocols/noise/src/lib.rs b/protocols/noise/src/lib.rs index 2bc68584..c4e34b31 100644 --- a/protocols/noise/src/lib.rs +++ b/protocols/noise/src/lib.rs @@ -58,7 +58,7 @@ pub use io::NoiseOutput; pub use protocol::{Keypair, PublicKey, Protocol, ProtocolParams, IX, IK, XX}; pub use protocol::x25519::X25519; -use libp2p_core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade}; +use libp2p_core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::Negotiated}; use tokio_io::{AsyncRead, AsyncWrite}; use zeroize::Zeroize; @@ -127,11 +127,11 @@ where NoiseConfig: UpgradeInfo, C: Protocol + AsRef<[u8]> + Zeroize { - type Output = (PublicKey, NoiseOutput); + type Output = (PublicKey, NoiseOutput>); type Error = NoiseError; - type Future = rt1::NoiseInboundFuture; + type Future = rt1::NoiseInboundFuture, C>; - fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.keys.secret().as_ref()) .build_responder() @@ -146,11 +146,11 @@ where NoiseConfig: UpgradeInfo, C: Protocol + AsRef<[u8]> + Zeroize { - type Output = (PublicKey, NoiseOutput); + type Output = (PublicKey, NoiseOutput>); type Error = NoiseError; - type Future = rt1::NoiseOutboundFuture; + type Future = rt1::NoiseOutboundFuture, C>; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.keys.secret().as_ref()) .build_initiator() @@ -167,11 +167,11 @@ where NoiseConfig: UpgradeInfo, C: Protocol + AsRef<[u8]> + Zeroize { - type Output = (PublicKey, NoiseOutput); + type Output = (PublicKey, NoiseOutput>); type Error = NoiseError; - type Future = rt15::NoiseInboundFuture; + type Future = rt15::NoiseInboundFuture, C>; - fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.keys.secret().as_ref()) .build_responder() @@ -186,11 +186,11 @@ where NoiseConfig: UpgradeInfo, C: Protocol + AsRef<[u8]> + Zeroize { - type Output = (PublicKey, NoiseOutput); + type Output = (PublicKey, NoiseOutput>); type Error = NoiseError; - type Future = rt15::NoiseOutboundFuture; + type Future = rt15::NoiseOutboundFuture, C>; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.keys.secret().as_ref()) .build_initiator() @@ -207,11 +207,11 @@ where NoiseConfig: UpgradeInfo, C: Protocol + AsRef<[u8]> + Zeroize { - type Output = (PublicKey, NoiseOutput); + type Output = (PublicKey, NoiseOutput>); type Error = NoiseError; - type Future = rt1::NoiseInboundFuture; + type Future = rt1::NoiseInboundFuture, C>; - fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.keys.secret().as_ref()) .build_responder() @@ -226,11 +226,11 @@ where NoiseConfig>: UpgradeInfo, C: Protocol + AsRef<[u8]> + Zeroize { - type Output = (PublicKey, NoiseOutput); + type Output = (PublicKey, NoiseOutput>); type Error = NoiseError; - type Future = rt1::NoiseOutboundFuture; + type Future = rt1::NoiseOutboundFuture, C>; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let session = self.params.into_builder() .local_private_key(self.keys.secret().as_ref()) .remote_public_key(self.remote.as_ref()) diff --git a/protocols/observed/src/lib.rs b/protocols/observed/src/lib.rs index 86818a2e..227e958e 100644 --- a/protocols/observed/src/lib.rs +++ b/protocols/observed/src/lib.rs @@ -23,7 +23,7 @@ use bytes::Bytes; use futures::{future, prelude::*}; -use libp2p_core::{Multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}}; +use libp2p_core::{Multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}}; use std::{io, iter}; use tokio_codec::{FramedRead, FramedWrite}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -51,11 +51,11 @@ impl InboundUpgrade for Observed where C: AsyncRead + AsyncWrite + Send + 'static { - type Output = Sender; + type Output = Sender>; type Error = io::Error; type Future = Box + Send>; - fn upgrade_inbound(self, conn: C, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, conn: Negotiated, _: Self::Info) -> Self::Future { let io = FramedWrite::new(conn, UviBytes::default()); Box::new(future::ok(Sender { io })) } @@ -69,10 +69,10 @@ where type Error = io::Error; type Future = Box + Send>; - fn upgrade_outbound(self, conn: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, conn: Negotiated, _: Self::Info) -> Self::Future { let io = FramedRead::new(conn, UviBytes::default()); let future = io.into_future() - .map_err(|(e, _): (io::Error, FramedRead)| e) + .map_err(|(e, _): (io::Error, FramedRead, UviBytes>)| e) .and_then(move |(bytes, _)| { if let Some(b) = bytes { let ma = Multiaddr::from_bytes(b.to_vec()) @@ -100,7 +100,7 @@ impl Sender { #[cfg(test)] mod tests { - use libp2p_core::{Multiaddr, upgrade::{InboundUpgrade, OutboundUpgrade}}; + use libp2p_core::{Multiaddr, upgrade::{apply_inbound, apply_outbound}}; use tokio::runtime::current_thread; use tokio::net::{TcpListener, TcpStream}; use super::*; @@ -115,17 +115,19 @@ mod tests { let server = server.incoming() .into_future() - .map_err(|(e, _)| e.into()) + .map_err(|_| panic!()) .and_then(move |(conn, _)| { - Observed::new().upgrade_inbound(conn.unwrap(), b"/paritytech/observed-address/0.1.0") + apply_inbound(conn.unwrap(), Observed::new()) }) + .map_err(|_| panic!()) .and_then(move |sender| sender.send_address(observed_addr1)); let client = TcpStream::connect(&server_addr) - .map_err(|e| e.into()) + .map_err(|_| panic!()) .and_then(|conn| { - Observed::new().upgrade_outbound(conn, b"/paritytech/observed-address/0.1.0") + apply_outbound(conn, Observed::new()) }) + .map_err(|_| panic!()) .map(move |addr| { eprintln!("{} {}", addr, observed_addr2); assert_eq!(addr, observed_addr2) @@ -133,7 +135,7 @@ mod tests { current_thread::block_on_all(future::lazy(move || { current_thread::spawn(server.map_err(|e| panic!("server error: {}", e)).map(|_| ())); - client.map_err(|e| panic!("client error: {}", e)) + client })) .unwrap(); } diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index a3e7b8bf..bb820e51 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::{prelude::*, future, try_ready}; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated}; use log::debug; use rand::{distributions::Standard, prelude::*, rngs::EntropyRng}; use std::{io, iter, time::Duration, time::Instant}; @@ -53,10 +53,10 @@ where { type Output = (); type Error = io::Error; - type Future = future::Map, tokio_io::io::WriteAll, fn((TSocket, [u8; 32])) -> tokio_io::io::WriteAll>, tokio_io::io::Flush, fn((TSocket, [u8; 32])) -> tokio_io::io::Flush>, tokio_io::io::Shutdown, fn(TSocket) -> tokio_io::io::Shutdown>, fn(TSocket) -> ()>; + type Future = future::Map, [u8; 32]>, tokio_io::io::WriteAll, [u8; 32]>, fn((Negotiated, [u8; 32])) -> tokio_io::io::WriteAll, [u8; 32]>>, tokio_io::io::Flush>, fn((Negotiated, [u8; 32])) -> tokio_io::io::Flush>>, tokio_io::io::Shutdown>, fn(Negotiated) -> tokio_io::io::Shutdown>>, fn(Negotiated) -> ()>; #[inline] - fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { tokio_io::io::read_exact(socket, [0; 32]) .and_then:: _, _>(|(socket, buffer)| tokio_io::io::write_all(socket, buffer)) .and_then:: _, _>(|(socket, _)| tokio_io::io::flush(socket)) @@ -71,10 +71,10 @@ where { type Output = Duration; type Error = io::Error; - type Future = PingDialer; + type Future = PingDialer>; #[inline] - fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let payload: [u8; 32] = EntropyRng::default().sample(Standard); debug!("Preparing for ping with payload {:?}", payload); diff --git a/protocols/plaintext/src/lib.rs b/protocols/plaintext/src/lib.rs index c3ece927..c8c6aafb 100644 --- a/protocols/plaintext/src/lib.rs +++ b/protocols/plaintext/src/lib.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::future::{self, FutureResult}; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated}; use std::iter; use void::Void; @@ -36,21 +36,21 @@ impl UpgradeInfo for PlainTextConfig { } impl InboundUpgrade for PlainTextConfig { - type Output = C; + type Output = Negotiated; type Error = Void; - type Future = FutureResult; + type Future = FutureResult, Self::Error>; - fn upgrade_inbound(self, i: C, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, i: Negotiated, _: Self::Info) -> Self::Future { future::ok(i) } } impl OutboundUpgrade for PlainTextConfig { - type Output = C; + type Output = Negotiated; type Error = Void; - type Future = FutureResult; + type Future = FutureResult, Self::Error>; - fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, i: Negotiated, _: Self::Info) -> Self::Future { future::ok(i) } } diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index 24c057a7..5bc6b161 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -84,7 +84,7 @@ pub use self::error::SecioError; use bytes::BytesMut; use futures::stream::MapErr as StreamMapErr; use futures::{Future, Poll, Sink, StartSend, Stream}; -use libp2p_core::{PublicKey, identity, upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade}}; +use libp2p_core::{PublicKey, identity, upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, Negotiated}}; use log::debug; use rw_stream_sink::RwStreamSink; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; @@ -195,11 +195,11 @@ impl InboundUpgrade for SecioConfig where T: AsyncRead + AsyncWrite + Send + 'static { - type Output = SecioOutput; + type Output = SecioOutput>; type Error = SecioError; type Future = Box + Send>; - fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { Box::new(self.handshake(socket)) } } @@ -208,11 +208,11 @@ impl OutboundUpgrade for SecioConfig where T: AsyncRead + AsyncWrite + Send + 'static { - type Output = SecioOutput; + type Output = SecioOutput>; type Error = SecioError; type Future = Box + Send>; - fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { Box::new(self.handshake(socket)) } } diff --git a/src/simple.rs b/src/simple.rs index bf5b8dab..2395fb37 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use bytes::Bytes; use futures::{future::FromErr, prelude::*}; use std::{iter, io::Error as IoError, sync::Arc}; @@ -70,7 +70,7 @@ impl UpgradeInfo for SimpleProtocol { impl InboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, - F: Fn(C) -> O, + F: Fn(Negotiated) -> O, O: IntoFuture { type Output = O::Item; @@ -78,7 +78,7 @@ where type Future = FromErr; #[inline] - fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket).into_future().from_err() } @@ -87,7 +87,7 @@ where impl OutboundUpgrade for SimpleProtocol where C: AsyncRead + AsyncWrite, - F: Fn(C) -> O, + F: Fn(Negotiated) -> O, O: IntoFuture { type Output = O::Item; @@ -95,7 +95,7 @@ where type Future = FromErr; #[inline] - fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket).into_future().from_err() }