diff --git a/core/src/either.rs b/core/src/either.rs index e4d7fea0..9e448de0 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{muxing::{Shutdown, StreamMuxer}, Multiaddr}; +use crate::{muxing::{Shutdown, StreamMuxer}, Multiaddr, ProtocolName}; use futures::prelude::*; use std::{fmt, io::{Error as IoError, Read, Write}}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -342,3 +342,14 @@ where } } +#[derive(Debug, Clone)] +pub enum EitherName { A(A), B(B) } + +impl ProtocolName for EitherName { + fn protocol_name(&self) -> &[u8] { + match self { + EitherName::A(a) => a.protocol_name(), + EitherName::B(b) => b.protocol_name() + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index a5e668ec..0d7225fc 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -111,7 +111,7 @@ pub use self::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent}; pub use self::public_key::PublicKey; pub use self::swarm::Swarm; pub use self::transport::Transport; -pub use self::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError}; +pub use self::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName}; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum Endpoint { diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 28837f89..47629d97 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - Transport, Multiaddr, PublicKey, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, + Transport, Multiaddr, PublicKey, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName, muxing::StreamMuxer, nodes::{ handled_node::NodeHandler, @@ -104,13 +104,15 @@ where TBehaviour: NetworkBehaviour, ::OutEvent: Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary ::InboundProtocol: InboundUpgrade> + Send + 'static, - <::InboundProtocol as UpgradeInfo>::NamesIter: Send + 'static, - <::InboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, + <::InboundProtocol as UpgradeInfo>::Info: Send + 'static, + <::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, + <<::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, <::InboundProtocol as InboundUpgrade>>::Error: fmt::Debug + Send + 'static, <::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, ::OutboundProtocol: OutboundUpgrade> + Send + 'static, - <::OutboundProtocol as UpgradeInfo>::NamesIter: Send + 'static, - <::OutboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, + <::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, + <::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, + <<::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, <::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, <::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary @@ -122,8 +124,9 @@ where TBehaviour: NetworkBehaviour, let supported_protocols = behaviour .new_handler() .listen_protocol() - .protocol_names() - .map(|(name, _)| name.to_vec()) + .protocol_info() + .into_iter() + .map(|info| info.protocol_name().to_vec()) .collect(); let local_peer_id = local_public_key.clone().into_peer_id(); @@ -222,13 +225,15 @@ where TBehaviour: NetworkBehaviour, ::InboundProtocol: InboundUpgrade> + Send + 'static, <::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, <::InboundProtocol as InboundUpgrade>>::Error: fmt::Debug + Send + 'static, - <::InboundProtocol as UpgradeInfo>::NamesIter: Send + 'static, - <::InboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, + <::InboundProtocol as UpgradeInfo>::Info: Send + 'static, + <::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, + <<::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, ::OutboundProtocol: OutboundUpgrade> + Send + 'static, <::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, <::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, - <::OutboundProtocol as UpgradeInfo>::NamesIter: Send + 'static, - <::OutboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, + <::OutboundProtocol as UpgradeInfo>::Info: Send + 'static, + <::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static, + <<::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static, as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary TTopology: Topology, { @@ -392,7 +397,7 @@ impl<'a, TTopology> PollParameters<'a, TTopology> { /// Returns the public key of the local node. #[inline] - pub fn local_public_key(&self) -> &PublicKey { + pub fn local_public_key(&self) -> &PublicKey { self.local_public_key } diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index c734e624..1f820cca 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -18,9 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bytes::Bytes; use crate::nodes::ConnectedPoint; -use crate::upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, UpgradeError}; +use crate::upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, UpgradeError, ProtocolName}; use futures::{future::Either, prelude::*}; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; use std::mem; @@ -46,7 +45,8 @@ where C: AsyncRead + AsyncWrite, U: InboundUpgrade, { - let future = multistream_select::listener_select_proto(conn, UpgradeIntoProtocolsIterWrap(up)); + let iter = UpgradeInfoIterWrap(up); + let future = multistream_select::listener_select_proto(conn, iter); InboundUpgradeApply { inner: InboundUpgradeApplyState::Init { future } } @@ -58,7 +58,7 @@ where C: AsyncRead + AsyncWrite, U: OutboundUpgrade { - let iter = ProtocolNames(up.protocol_names()); + let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>); let future = multistream_select::dialer_select_proto(conn, iter); OutboundUpgradeApply { inner: OutboundUpgradeApplyState::Init { future, upgrade: up } @@ -80,7 +80,7 @@ where U: InboundUpgrade { Init { - future: ListenerSelectFuture, U::UpgradeId>, + future: ListenerSelectFuture, NameWrap>, }, Upgrade { future: U::Future @@ -100,7 +100,7 @@ where loop { match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) { InboundUpgradeApplyState::Init { mut future } => { - let (upgrade_id, connection, upgrade) = match future.poll()? { + let (info, connection, upgrade) = match future.poll()? { Async::Ready(x) => x, Async::NotReady => { self.inner = InboundUpgradeApplyState::Init { future }; @@ -108,7 +108,7 @@ where } }; self.inner = InboundUpgradeApplyState::Upgrade { - future: upgrade.0.upgrade_inbound(connection, upgrade_id) + future: upgrade.0.upgrade_inbound(connection, info.0) }; } InboundUpgradeApplyState::Upgrade { mut future } => { @@ -149,7 +149,7 @@ where U: OutboundUpgrade { Init { - future: DialerSelectFuture, U::UpgradeId>, + future: DialerSelectFuture::IntoIter>>, upgrade: U }, Upgrade { @@ -170,7 +170,7 @@ where loop { match mem::replace(&mut self.inner, OutboundUpgradeApplyState::Undefined) { OutboundUpgradeApplyState::Init { mut future, upgrade } => { - let (upgrade_id, connection) = match future.poll()? { + let (info, connection) = match future.poll()? { Async::Ready(x) => x, Async::NotReady => { self.inner = OutboundUpgradeApplyState::Init { future, upgrade }; @@ -178,7 +178,7 @@ where } }; self.inner = OutboundUpgradeApplyState::Upgrade { - future: upgrade.upgrade_outbound(connection, upgrade_id) + future: upgrade.upgrade_outbound(connection, info.0) }; } OutboundUpgradeApplyState::Upgrade { mut future } => { @@ -205,37 +205,29 @@ where } /// Wraps around a `UpgradeInfo` and satisfies the requirement of `listener_select_proto`. -struct UpgradeIntoProtocolsIterWrap(U); +struct UpgradeInfoIterWrap(U); -impl<'a, U> IntoIterator for &'a UpgradeIntoProtocolsIterWrap -where U: UpgradeInfo -{ - type Item = (Bytes, fn(&Bytes, &Bytes) -> bool, U::UpgradeId); - type IntoIter = ProtocolNames; - - #[inline] - fn into_iter(self) -> Self::IntoIter { - ProtocolNames(self.0.protocol_names()) - } -} - -/// Iterator adapter which adds equality matching predicates to items. -/// Used in `NegotiationFuture`. -#[derive(Clone)] -pub struct ProtocolNames(I); - -impl Iterator for ProtocolNames +impl<'a, U> IntoIterator for &'a UpgradeInfoIterWrap where - I: Iterator + U: UpgradeInfo { - type Item = (Bytes, fn(&Bytes, &Bytes) -> bool, Id); + type Item = NameWrap; + type IntoIter = NameWrapIter<::IntoIter>; - fn next(&mut self) -> Option { - let f = ::eq as fn(&Bytes, &Bytes) -> bool; - self.0.next().map(|(b, id)| (b, f, id)) - } - - fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() + fn into_iter(self) -> Self::IntoIter { + self.0.protocol_info().into_iter().map(NameWrap) } } + +type NameWrapIter = + std::iter::Map::Item) -> NameWrap<::Item>>; + +/// Wrapper type to expose an `AsRef<[u8]>` impl for all types implementing `ProtocolName`. +struct NameWrap(N); + +impl AsRef<[u8]> for NameWrap { + fn as_ref(&self) -> &[u8] { + self.0.protocol_name() + } +} + diff --git a/core/src/upgrade/denied.rs b/core/src/upgrade/denied.rs index 2a71e94d..d1de858e 100644 --- a/core/src/upgrade/denied.rs +++ b/core/src/upgrade/denied.rs @@ -18,11 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bytes::Bytes; use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use futures::future; use std::iter; -use void::{unreachable, Void}; +use void::Void; /// Dummy implementation of `UpgradeInfo`/`InboundUpgrade`/`OutboundUpgrade` that doesn't support /// any protocol. @@ -30,10 +29,10 @@ use void::{unreachable, Void}; pub struct DeniedUpgrade; impl UpgradeInfo for DeniedUpgrade { - type UpgradeId = Void; - type NamesIter = iter::Empty<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Empty; - fn protocol_names(&self) -> Self::NamesIter { + fn protocol_info(&self) -> Self::InfoIter { iter::empty() } } @@ -43,8 +42,8 @@ impl InboundUpgrade for DeniedUpgrade { type Error = Void; type Future = future::Empty; - fn upgrade_inbound(self, _: C, id: Self::UpgradeId) -> Self::Future { - unreachable(id) + fn upgrade_inbound(self, _: C, _: Self::Info) -> Self::Future { + future::empty() } } @@ -53,8 +52,8 @@ impl OutboundUpgrade for DeniedUpgrade { type Error = Void; type Future = future::Empty; - fn upgrade_outbound(self, _: C, id: Self::UpgradeId) -> Self::Future { - unreachable(id) + fn upgrade_outbound(self, _: C, _: Self::Info) -> Self::Future { + future::empty() } } diff --git a/core/src/upgrade/either.rs b/core/src/upgrade/either.rs index dcf830b3..28db987c 100644 --- a/core/src/upgrade/either.rs +++ b/core/src/upgrade/either.rs @@ -18,10 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bytes::Bytes; -use futures::future::Either; use crate::{ - either::{EitherOutput, EitherError, EitherFuture2}, + either::{EitherOutput, EitherError, EitherFuture2, EitherName}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} }; @@ -34,13 +32,16 @@ where A: UpgradeInfo, B: UpgradeInfo { - type UpgradeId = Either; - type NamesIter = EitherIter; + type Info = EitherName; + type InfoIter = EitherIter< + ::IntoIter, + ::IntoIter + >; - fn protocol_names(&self) -> Self::NamesIter { + fn protocol_info(&self) -> Self::InfoIter { match self { - EitherUpgrade::A(a) => EitherIter::A(a.protocol_names()), - EitherUpgrade::B(b) => EitherIter::B(b.protocol_names()) + EitherUpgrade::A(a) => EitherIter::A(a.protocol_info().into_iter()), + EitherUpgrade::B(b) => EitherIter::B(b.protocol_info().into_iter()) } } } @@ -54,10 +55,10 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_inbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { - match (self, id) { - (EitherUpgrade::A(a), Either::A(id)) => EitherFuture2::A(a.upgrade_inbound(sock, id)), - (EitherUpgrade::B(b), Either::B(id)) => EitherFuture2::B(b.upgrade_inbound(sock, id)), + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + match (self, info) { + (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_inbound(sock, info)), + (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_inbound(sock, info)), _ => panic!("Invalid invocation of EitherUpgrade::upgrade_inbound") } } @@ -72,10 +73,10 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { - match (self, id) { - (EitherUpgrade::A(a), Either::A(id)) => EitherFuture2::A(a.upgrade_outbound(sock, id)), - (EitherUpgrade::B(b), Either::B(id)) => EitherFuture2::B(b.upgrade_outbound(sock, id)), + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + match (self, info) { + (EitherUpgrade::A(a), EitherName::A(info)) => EitherFuture2::A(a.upgrade_outbound(sock, info)), + (EitherUpgrade::B(b), EitherName::B(info)) => EitherFuture2::B(b.upgrade_outbound(sock, info)), _ => panic!("Invalid invocation of EitherUpgrade::upgrade_outbound") } } @@ -85,17 +86,17 @@ where #[derive(Debug, Clone)] pub enum EitherIter { A(A), B(B) } -impl Iterator for EitherIter +impl Iterator for EitherIter where - A: Iterator, - B: Iterator, + A: Iterator, + B: Iterator { - type Item = (Bytes, Either); + type Item = EitherName; fn next(&mut self) -> Option { match self { - EitherIter::A(a) => a.next().map(|(name, id)| (name, Either::A(id))), - EitherIter::B(b) => b.next().map(|(name, id)| (name, Either::B(id))) + EitherIter::A(a) => a.next().map(EitherName::A), + EitherIter::B(b) => b.next().map(EitherName::B) } } diff --git a/core/src/upgrade/map.rs b/core/src/upgrade/map.rs index d1cf0004..feb4cd12 100644 --- a/core/src/upgrade/map.rs +++ b/core/src/upgrade/map.rs @@ -35,11 +35,11 @@ impl UpgradeInfo for MapInboundUpgrade where U: UpgradeInfo { - type UpgradeId = U::UpgradeId; - type NamesIter = U::NamesIter; + type Info = U::Info; + type InfoIter = U::InfoIter; - fn protocol_names(&self) -> Self::NamesIter { - self.upgrade.protocol_names() + fn protocol_info(&self) -> Self::InfoIter { + self.upgrade.protocol_info() } } @@ -52,9 +52,9 @@ where type Error = U::Error; type Future = MapFuture; - fn upgrade_inbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { MapFuture { - inner: self.upgrade.upgrade_inbound(sock, id), + inner: self.upgrade.upgrade_inbound(sock, info), map: Some(self.fun) } } @@ -68,8 +68,8 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { - self.upgrade.upgrade_outbound(sock, id) + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + self.upgrade.upgrade_outbound(sock, info) } } @@ -87,11 +87,11 @@ impl UpgradeInfo for MapOutboundUpgrade where U: UpgradeInfo { - type UpgradeId = U::UpgradeId; - type NamesIter = U::NamesIter; + type Info = U::Info; + type InfoIter = U::InfoIter; - fn protocol_names(&self) -> Self::NamesIter { - self.upgrade.protocol_names() + fn protocol_info(&self) -> Self::InfoIter { + self.upgrade.protocol_info() } } @@ -103,8 +103,8 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_inbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { - self.upgrade.upgrade_inbound(sock, id) + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + self.upgrade.upgrade_inbound(sock, info) } } @@ -117,9 +117,9 @@ where type Error = U::Error; type Future = MapFuture; - fn upgrade_outbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { MapFuture { - inner: self.upgrade.upgrade_outbound(sock, id), + inner: self.upgrade.upgrade_outbound(sock, info), map: Some(self.fun) } } @@ -139,11 +139,11 @@ impl UpgradeInfo for MapInboundUpgradeErr where U: UpgradeInfo { - type UpgradeId = U::UpgradeId; - type NamesIter = U::NamesIter; + type Info = U::Info; + type InfoIter = U::InfoIter; - fn protocol_names(&self) -> Self::NamesIter { - self.upgrade.protocol_names() + fn protocol_info(&self) -> Self::InfoIter { + self.upgrade.protocol_info() } } @@ -156,9 +156,9 @@ where type Error = T; type Future = MapErrFuture; - fn upgrade_inbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { MapErrFuture { - fut: self.upgrade.upgrade_inbound(sock, id), + fut: self.upgrade.upgrade_inbound(sock, info), fun: Some(self.fun) } } @@ -172,8 +172,8 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_outbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { - self.upgrade.upgrade_outbound(sock, id) + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + self.upgrade.upgrade_outbound(sock, info) } } @@ -191,11 +191,11 @@ impl UpgradeInfo for MapOutboundUpgradeErr where U: UpgradeInfo { - type UpgradeId = U::UpgradeId; - type NamesIter = U::NamesIter; + type Info = U::Info; + type InfoIter = U::InfoIter; - fn protocol_names(&self) -> Self::NamesIter { - self.upgrade.protocol_names() + fn protocol_info(&self) -> Self::InfoIter { + self.upgrade.protocol_info() } } @@ -208,9 +208,9 @@ where type Error = T; type Future = MapErrFuture; - fn upgrade_outbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { MapErrFuture { - fut: self.upgrade.upgrade_outbound(sock, id), + fut: self.upgrade.upgrade_outbound(sock, info), fun: Some(self.fun) } } @@ -224,8 +224,8 @@ where type Error = U::Error; type Future = U::Future; - fn upgrade_inbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { - self.upgrade.upgrade_inbound(sock, id) + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + self.upgrade.upgrade_inbound(sock, info) } } diff --git a/core/src/upgrade/mod.rs b/core/src/upgrade/mod.rs index 5858d867..b8478e94 100644 --- a/core/src/upgrade/mod.rs +++ b/core/src/upgrade/mod.rs @@ -36,7 +36,7 @@ //! //! An upgrade is performed in two steps: //! -//! - A protocol negotiation step. The `UpgradeInfo::protocol_names` method is called to determine +//! - A protocol negotiation step. The `UpgradeInfo::protocol_info` method is called to determine //! which protocols are supported by the trait implementation. The `multistream-select` protocol //! is used in order to agree on which protocol to use amongst the ones supported. //! @@ -64,7 +64,6 @@ mod error; mod map; mod select; -use bytes::Bytes; use futures::future::Future; pub use self::{ @@ -76,18 +75,28 @@ pub use self::{ select::SelectUpgrade }; +/// Types serving as protocol names. +pub trait ProtocolName { + /// The protocol name as bytes. + fn protocol_name(&self) -> &[u8]; +} + +impl> ProtocolName for T { + fn protocol_name(&self) -> &[u8] { + self.as_ref() + } +} + /// Common trait for upgrades that can be applied on inbound substreams, outbound substreams, /// or both. pub trait UpgradeInfo { /// Opaque type representing a negotiable protocol. - type UpgradeId; - /// Iterator returned by `protocol_names`. - type NamesIter: Iterator; + type Info: ProtocolName; + /// Iterator returned by `protocol_info`. + type InfoIter: IntoIterator; /// Returns the list of protocols that are supported. Used during the negotiation process. - /// - /// Each item returned by the iterator is a pair of a protocol name and an opaque identifier. - fn protocol_names(&self) -> Self::NamesIter; + fn protocol_info(&self) -> Self::InfoIter; } /// Possible upgrade on an inbound connection or substream. @@ -102,8 +111,8 @@ pub trait InboundUpgrade: UpgradeInfo { /// After we have determined that the remote supports one of the protocols we support, this /// method is called to start the handshake. /// - /// The `id` is the identifier of the protocol, as produced by `protocol_names()`. - fn upgrade_inbound(self, socket: C, id: Self::UpgradeId) -> Self::Future; + /// The `info` is the identifier of the protocol, as produced by `protocol_info`. + fn upgrade_inbound(self, socket: C, info: Self::Info) -> Self::Future; } /// Extension trait for `InboundUpgrade`. Automatically implemented on all types that implement @@ -142,8 +151,8 @@ pub trait OutboundUpgrade: UpgradeInfo { /// After we have determined that the remote supports one of the protocols we support, this /// method is called to start the handshake. /// - /// The `id` is the identifier of the protocol, as produced by `protocol_names()`. - fn upgrade_outbound(self, socket: C, id: Self::UpgradeId) -> Self::Future; + /// The `info` is the identifier of the protocol, as produced by `protocol_info`. + fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future; } /// Extention trait for `OutboundUpgrade`. Automatically implemented on all types that implement diff --git a/core/src/upgrade/select.rs b/core/src/upgrade/select.rs index 4beff531..8fa4c5b8 100644 --- a/core/src/upgrade/select.rs +++ b/core/src/upgrade/select.rs @@ -18,10 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bytes::Bytes; -use futures::future::Either; use crate::{ - either::{EitherOutput, EitherError, EitherFuture2}, + either::{EitherOutput, EitherError, EitherFuture2, EitherName}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} }; @@ -46,11 +44,14 @@ where A: UpgradeInfo, B: UpgradeInfo { - type UpgradeId = Either; - type NamesIter = NamesIterChain; + type Info = EitherName; + type InfoIter = InfoIterChain< + ::IntoIter, + ::IntoIter + >; - fn protocol_names(&self) -> Self::NamesIter { - NamesIterChain(self.0.protocol_names(), self.1.protocol_names()) + fn protocol_info(&self) -> Self::InfoIter { + InfoIterChain(self.0.protocol_info().into_iter(), self.1.protocol_info().into_iter()) } } @@ -63,10 +64,10 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_inbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { - match id { - Either::A(id) => EitherFuture2::A(self.0.upgrade_inbound(sock, id)), - Either::B(id) => EitherFuture2::B(self.1.upgrade_inbound(sock, id)) + fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future { + match info { + EitherName::A(info) => EitherFuture2::A(self.0.upgrade_inbound(sock, info)), + EitherName::B(info) => EitherFuture2::B(self.1.upgrade_inbound(sock, info)) } } } @@ -80,31 +81,31 @@ where type Error = EitherError; type Future = EitherFuture2; - fn upgrade_outbound(self, sock: C, id: Self::UpgradeId) -> Self::Future { - match id { - Either::A(id) => EitherFuture2::A(self.0.upgrade_outbound(sock, id)), - Either::B(id) => EitherFuture2::B(self.1.upgrade_outbound(sock, id)) + fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future { + match info { + EitherName::A(info) => EitherFuture2::A(self.0.upgrade_outbound(sock, info)), + EitherName::B(info) => EitherFuture2::B(self.1.upgrade_outbound(sock, info)) } } } /// Iterator that combines the protocol names of twp upgrades. #[derive(Debug, Clone)] -pub struct NamesIterChain(A, B); +pub struct InfoIterChain(A, B); -impl Iterator for NamesIterChain +impl Iterator for InfoIterChain where - A: Iterator, - B: Iterator, + A: Iterator, + B: Iterator { - type Item = (Bytes, Either); + type Item = EitherName; fn next(&mut self) -> Option { - if let Some((name, id)) = self.0.next() { - return Some((name, Either::A(id))) + if let Some(info) = self.0.next() { + return Some(EitherName::A(info)) } - if let Some((name, id)) = self.1.next() { - return Some((name, Either::B(id))) + if let Some(info) = self.1.next() { + return Some(EitherName::B(info)) } None } diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 5e3d27df..b114a8cb 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -31,8 +31,7 @@ use crate::ProtocolChoiceError; /// Future, returned by `dialer_select_proto`, which selects a protocol and dialer /// either sequentially of by considering all protocols in parallel. -pub type DialerSelectFuture = - Either, P>, DialerSelectPar>; +pub type DialerSelectFuture = Either, DialerSelectPar>; /// Helps selecting a protocol amongst the ones supported. /// @@ -46,48 +45,30 @@ pub type DialerSelectFuture = /// success, the function returns the identifier (of type `P`), plus the socket which now uses that /// chosen protocol. #[inline] -pub fn dialer_select_proto(inner: R, protocols: I) -> DialerSelectFuture +pub fn dialer_select_proto(inner: R, protocols: I) -> DialerSelectFuture where R: AsyncRead + AsyncWrite, - I: Iterator, - M: FnMut(&Bytes, &Bytes) -> bool, + I: IntoIterator, + I::Item: AsRef<[u8]> { + let iter = protocols.into_iter(); // We choose between the "serial" and "parallel" strategies based on the number of protocols. - if protocols.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::A(dialer_select_proto_serial(inner, IgnoreMatchFn(protocols))) + if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { + Either::A(dialer_select_proto_serial(inner, iter)) } else { - Either::B(dialer_select_proto_parallel(inner, protocols)) + Either::B(dialer_select_proto_parallel(inner, iter)) } } - -/// Iterator, which ignores match predicates of the iterator it wraps. -pub struct IgnoreMatchFn(I); - -impl Iterator for IgnoreMatchFn -where - I: Iterator -{ - type Item = (Bytes, P); - - fn next(&mut self) -> Option { - self.0.next().map(|(b, _, p)| (b, p)) - } - - fn size_hint(&self) -> (usize, Option) { - self.0.size_hint() - } -} - - /// Helps selecting a protocol amongst the ones supported. /// /// Same as `dialer_select_proto`. Tries protocols one by one. The iterator doesn't need to produce /// match functions, because it's not needed. -pub fn dialer_select_proto_serial(inner: R, protocols: I,) -> DialerSelectSeq +pub fn dialer_select_proto_serial(inner: R, protocols: I,) -> DialerSelectSeq where R: AsyncRead + AsyncWrite, - I: Iterator, + I: Iterator, + I::Item: AsRef<[u8]> { DialerSelectSeq { inner: DialerSelectSeqState::AwaitDialer { dialer_fut: Dialer::new(inner), protocols } @@ -97,11 +78,11 @@ where /// Future, returned by `dialer_select_proto_serial` which selects a protocol /// and dialer sequentially. -pub struct DialerSelectSeq { - inner: DialerSelectSeqState +pub struct DialerSelectSeq { + inner: DialerSelectSeqState } -enum DialerSelectSeqState { +enum DialerSelectSeqState { AwaitDialer { dialer_fut: DialerFuture, protocols: I @@ -112,25 +93,24 @@ enum DialerSelectSeqState { }, SendProtocol { sender: sink::Send>, - proto_name: Bytes, - proto_value: P, + proto_name: I::Item, protocols: I }, AwaitProtocol { stream: StreamFuture>, - proto_name: Bytes, - proto_value: P, + proto_name: I::Item, protocols: I }, Undefined } -impl Future for DialerSelectSeq +impl Future for DialerSelectSeq where - I: Iterator, + I: Iterator, + I::Item: AsRef<[u8]>, R: AsyncRead + AsyncWrite, { - type Item = (P, R); + type Item = (I::Item, R); type Error = ProtocolChoiceError; fn poll(&mut self) -> Poll { @@ -147,28 +127,26 @@ where self.inner = DialerSelectSeqState::NextProtocol { dialer, protocols } } DialerSelectSeqState::NextProtocol { dialer, mut protocols } => { - let (proto_name, proto_value) = + let proto_name = protocols.next().ok_or(ProtocolChoiceError::NoProtocolFound)?; let req = DialerToListenerMessage::ProtocolRequest { - name: proto_name.clone() + name: Bytes::from(proto_name.as_ref()) }; trace!("sending {:?}", req); let sender = dialer.send(req); self.inner = DialerSelectSeqState::SendProtocol { sender, proto_name, - proto_value, protocols } } - DialerSelectSeqState::SendProtocol { mut sender, proto_name, proto_value, protocols } => { + DialerSelectSeqState::SendProtocol { mut sender, proto_name, protocols } => { let dialer = match sender.poll()? { Async::Ready(d) => d, Async::NotReady => { self.inner = DialerSelectSeqState::SendProtocol { sender, proto_name, - proto_value, protocols }; return Ok(Async::NotReady) @@ -178,18 +156,16 @@ where self.inner = DialerSelectSeqState::AwaitProtocol { stream, proto_name, - proto_value, protocols }; } - DialerSelectSeqState::AwaitProtocol { mut stream, proto_name, proto_value, protocols } => { + DialerSelectSeqState::AwaitProtocol { mut stream, proto_name, protocols } => { let (m, r) = match stream.poll() { Ok(Async::Ready(x)) => x, Ok(Async::NotReady) => { self.inner = DialerSelectSeqState::AwaitProtocol { stream, proto_name, - proto_value, protocols }; return Ok(Async::NotReady) @@ -198,8 +174,10 @@ where }; trace!("received {:?}", m); match m.ok_or(ProtocolChoiceError::UnexpectedMessage)? { - ListenerToDialerMessage::ProtocolAck { ref name } if name == &proto_name => { - return Ok(Async::Ready((proto_value, r.into_inner()))) + ListenerToDialerMessage::ProtocolAck { ref name } + if name.as_ref() == proto_name.as_ref() => + { + return Ok(Async::Ready((proto_name, r.into_inner()))) }, ListenerToDialerMessage::NotAvailable => { self.inner = DialerSelectSeqState::NextProtocol { dialer: r, protocols } @@ -219,11 +197,11 @@ where /// /// Same as `dialer_select_proto`. Queries the list of supported protocols from the remote, then /// chooses the most appropriate one. -pub fn dialer_select_proto_parallel(inner: R, protocols: I) -> DialerSelectPar +pub fn dialer_select_proto_parallel(inner: R, protocols: I) -> DialerSelectPar where - R: AsyncRead + AsyncWrite, - I: Iterator, - M: FnMut(&Bytes, &Bytes) -> bool, + I: Iterator, + I::Item: AsRef<[u8]>, + R: AsyncRead + AsyncWrite { DialerSelectPar { inner: DialerSelectParState::AwaitDialer { dialer_fut: Dialer::new(inner), protocols } @@ -234,11 +212,11 @@ where /// Future, returned by `dialer_select_proto_parallel`, which selects a protocol and dialer in /// parellel, by first requesting the liste of protocols supported by the remote endpoint and /// then selecting the most appropriate one by applying a match predicate to the result. -pub struct DialerSelectPar { - inner: DialerSelectParState +pub struct DialerSelectPar { + inner: DialerSelectParState } -enum DialerSelectParState { +enum DialerSelectParState { AwaitDialer { dialer_fut: DialerFuture, protocols: I @@ -253,24 +231,22 @@ enum DialerSelectParState { }, SendProtocol { sender: sink::Send>, - proto_name: Bytes, - proto_val: P + proto_name: I::Item }, AwaitProtocol { stream: StreamFuture>, - proto_name: Bytes, - proto_val: P + proto_name: I::Item }, Undefined } -impl Future for DialerSelectPar +impl Future for DialerSelectPar where - I: Iterator, - M: FnMut(&Bytes, &Bytes) -> bool, + I: Iterator, + I::Item: AsRef<[u8]>, R: AsyncRead + AsyncWrite, { - type Item = (P, R); + type Item = (I::Item, R); type Error = ProtocolChoiceError; fn poll(&mut self) -> Poll { @@ -314,10 +290,10 @@ where _ => return Err(ProtocolChoiceError::UnexpectedMessage), }; let mut found = None; - for (local_name, mut match_fn, ident) in protocols { + for local_name in protocols { for remote_name in &list { - if match_fn(remote_name, &local_name) { - found = Some((remote_name.clone(), ident)); + if remote_name.as_ref() == local_name.as_ref() { + found = Some(local_name); break; } } @@ -325,21 +301,20 @@ where break; } } - let (proto_name, proto_val) = found.ok_or(ProtocolChoiceError::NoProtocolFound)?; - trace!("sending {:?}", proto_name); + let proto_name = found.ok_or(ProtocolChoiceError::NoProtocolFound)?; + trace!("sending {:?}", proto_name.as_ref()); let sender = d.send(DialerToListenerMessage::ProtocolRequest { - name: proto_name.clone(), + name: Bytes::from(proto_name.as_ref()) }); - self.inner = DialerSelectParState::SendProtocol { sender, proto_name, proto_val }; + self.inner = DialerSelectParState::SendProtocol { sender, proto_name }; } - DialerSelectParState::SendProtocol { mut sender, proto_name, proto_val } => { + DialerSelectParState::SendProtocol { mut sender, proto_name } => { let dialer = match sender.poll()? { Async::Ready(d) => d, Async::NotReady => { self.inner = DialerSelectParState::SendProtocol { sender, - proto_name, - proto_val + proto_name }; return Ok(Async::NotReady) } @@ -347,18 +322,16 @@ where let stream = dialer.into_future(); self.inner = DialerSelectParState::AwaitProtocol { stream, - proto_name, - proto_val + proto_name }; } - DialerSelectParState::AwaitProtocol { mut stream, proto_name, proto_val } => { + DialerSelectParState::AwaitProtocol { mut stream, proto_name } => { let (m, r) = match stream.poll() { Ok(Async::Ready(x)) => x, Ok(Async::NotReady) => { self.inner = DialerSelectParState::AwaitProtocol { stream, - proto_name, - proto_val + proto_name }; return Ok(Async::NotReady) } @@ -366,8 +339,10 @@ where }; trace!("received {:?}", m); match m { - Some(ListenerToDialerMessage::ProtocolAck { ref name }) if name == &proto_name => { - return Ok(Async::Ready((proto_val, r.into_inner()))) + Some(ListenerToDialerMessage::ProtocolAck { ref name }) + if name.as_ref() == proto_name.as_ref() => + { + return Ok(Async::Ready((proto_name, r.into_inner()))) } _ => return Err(ProtocolChoiceError::UnexpectedMessage) } diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index cb8d88a8..b8a50096 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -55,17 +55,12 @@ //! let client = TcpStream::connect(&"127.0.0.1:10333".parse().unwrap()) //! .from_err() //! .and_then(move |connec| { -//! let protos = vec![ -//! (Bytes::from("/echo/1.0.0"), ::eq, MyProto::Echo), -//! (Bytes::from("/hello/2.5.0"), ::eq, MyProto::Hello), -//! ] -//! .into_iter(); +//! let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; //! dialer_select_proto(connec, protos).map(|r| r.0) //! }); //! //! let mut rt = Runtime::new().unwrap(); -//! let negotiated_protocol: MyProto = rt.block_on(client) -//! .expect("failed to find a protocol"); +//! let negotiated_protocol = rt.block_on(client).expect("failed to find a protocol"); //! println!("negotiated: {:?}", negotiated_protocol); //! # } //! ``` diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index c9c40d36..6b5863a7 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -43,23 +43,32 @@ use crate::ProtocolChoiceError; /// /// On success, returns the socket and the identifier of the chosen protocol (of type `P`). The /// socket now uses this protocol. -pub fn listener_select_proto(inner: R, protocols: I) -> ListenerSelectFuture +pub fn listener_select_proto(inner: R, protocols: I) -> ListenerSelectFuture where R: AsyncRead + AsyncWrite, - for<'r> &'r I: IntoIterator, - M: FnMut(&Bytes, &Bytes) -> bool, + for<'r> &'r I: IntoIterator, + X: AsRef<[u8]> { ListenerSelectFuture { - inner: ListenerSelectState::AwaitListener { listener_fut: Listener::new(inner), protocols } + inner: ListenerSelectState::AwaitListener { + listener_fut: Listener::new(inner), + protocols: protocols + } } } /// Future, returned by `listener_select_proto` which selects a protocol among the ones supported. -pub struct ListenerSelectFuture { - inner: ListenerSelectState +pub struct ListenerSelectFuture +where + for<'a> &'a I: IntoIterator +{ + inner: ListenerSelectState } -enum ListenerSelectState { +enum ListenerSelectState +where + for<'a> &'a I: IntoIterator +{ AwaitListener { listener_fut: ListenerFuture, protocols: I @@ -71,18 +80,18 @@ enum ListenerSelectState { Outgoing { sender: sink::Send>, protocols: I, - outcome: Option

+ outcome: Option }, Undefined } -impl Future for ListenerSelectFuture +impl Future for ListenerSelectFuture where - for<'r> &'r I: IntoIterator, - M: FnMut(&Bytes, &Bytes) -> bool, + for<'a> &'a I: IntoIterator, R: AsyncRead + AsyncWrite, + X: AsRef<[u8]> { - type Item = (P, R, I); + type Item = (X, R, I); type Error = ProtocolChoiceError; fn poll(&mut self) -> Poll { @@ -111,7 +120,7 @@ where match msg { Some(DialerToListenerMessage::ProtocolsListRequest) => { let msg = ListenerToDialerMessage::ProtocolsListResponse { - list: protocols.into_iter().map(|(p, _, _)| p).collect(), + list: protocols.into_iter().map(|x| Bytes::from(x.as_ref())).collect(), }; trace!("protocols list response: {:?}", msg); let sender = listener.send(msg); @@ -124,10 +133,10 @@ where Some(DialerToListenerMessage::ProtocolRequest { name }) => { let mut outcome = None; let mut send_back = ListenerToDialerMessage::NotAvailable; - for (supported, mut matches, value) in &protocols { - if matches(&name, &supported) { + for supported in &protocols { + if name.as_ref() == supported.as_ref() { send_back = ListenerToDialerMessage::ProtocolAck {name: name.clone()}; - outcome = Some(value); + outcome = Some(supported); break; } } diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index ae4a3371..05f8961f 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -95,27 +95,21 @@ fn select_proto_basic() { .map(|s| s.0.unwrap()) .map_err(|(e, _)| e.into()) .and_then(move |connec| { - let protos = vec![ - (Bytes::from("/proto1"), ::eq, 0), - (Bytes::from("/proto2"), ::eq, 1), - ]; + let protos = vec![b"/proto1", b"/proto2"]; listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0) }); let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |connec| { - let protos = vec![ - (Bytes::from("/proto3"), ::eq, 2), - (Bytes::from("/proto2"), ::eq, 3), - ].into_iter(); + let protos = vec![b"/proto3", b"/proto2"]; dialer_select_proto(connec, protos).map(|r| r.0) }); let mut rt = Runtime::new().unwrap(); let (dialer_chosen, listener_chosen) = rt.block_on(client.join(server)).unwrap(); - assert_eq!(dialer_chosen, 3); - assert_eq!(listener_chosen, 1); + assert_eq!(dialer_chosen, b"/proto2"); + assert_eq!(listener_chosen, b"/proto2"); } #[test] @@ -129,20 +123,14 @@ fn no_protocol_found() { .map(|s| s.0.unwrap()) .map_err(|(e, _)| e.into()) .and_then(move |connec| { - let protos = vec![ - (Bytes::from("/proto1"), ::eq, 1), - (Bytes::from("/proto2"), ::eq, 2), - ]; + let protos = vec![b"/proto1", b"/proto2"]; listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0) }); let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |connec| { - let protos = vec![ - (Bytes::from("/proto3"), ::eq, 3), - (Bytes::from("/proto4"), ::eq, 4), - ].into_iter(); + let protos = vec![b"/proto3", b"/proto4"]; dialer_select_proto(connec, protos).map(|r| r.0) }); let mut rt = Runtime::new().unwrap(); @@ -163,28 +151,22 @@ fn select_proto_parallel() { .map(|s| s.0.unwrap()) .map_err(|(e, _)| e.into()) .and_then(move |connec| { - let protos = vec![ - (Bytes::from("/proto1"), ::eq, 0), - (Bytes::from("/proto2"), ::eq, 1), - ]; + let protos = vec![b"/proto1", b"/proto2"]; listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0) }); let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |connec| { - let protos = vec![ - (Bytes::from("/proto3"), ::eq, 2), - (Bytes::from("/proto2"), ::eq, 3), - ].into_iter(); - dialer_select_proto_parallel(connec, protos).map(|r| r.0) + let protos = vec![b"/proto3", b"/proto2"]; + dialer_select_proto_parallel(connec, protos.into_iter()).map(|r| r.0) }); let mut rt = Runtime::new().unwrap(); let (dialer_chosen, listener_chosen) = rt.block_on(client.join(server)).unwrap(); - assert_eq!(dialer_chosen, 3); - assert_eq!(listener_chosen, 1); + assert_eq!(dialer_chosen, b"/proto2"); + assert_eq!(listener_chosen, b"/proto2"); } #[test] @@ -198,23 +180,20 @@ fn select_proto_serial() { .map(|s| s.0.unwrap()) .map_err(|(e, _)| e.into()) .and_then(move |connec| { - let protos = vec![ - (Bytes::from("/proto1"), ::eq, 0), - (Bytes::from("/proto2"), ::eq, 1), - ]; + let protos = vec![b"/proto1", b"/proto2"]; listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0) }); let client = TcpStream::connect(&listener_addr) .from_err() .and_then(move |connec| { - let protos = vec![(Bytes::from("/proto3"), 2), (Bytes::from("/proto2"), 3)].into_iter(); - dialer_select_proto_serial(connec, protos).map(|r| r.0) + let protos = vec![b"/proto3", b"/proto2"]; + dialer_select_proto_serial(connec, protos.into_iter()).map(|r| r.0) }); let mut rt = Runtime::new().unwrap(); let (dialer_chosen, listener_chosen) = rt.block_on(client.join(server)).unwrap(); - assert_eq!(dialer_chosen, 3); - assert_eq!(listener_chosen, 1); + assert_eq!(dialer_chosen, b"/proto2"); + assert_eq!(listener_chosen, b"/proto2"); } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 39ed99ae..ef4a2808 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -148,12 +148,12 @@ pub enum MaxBufferBehaviour { } impl UpgradeInfo for MplexConfig { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Once; #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/mplex/6.7.0"), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/mplex/6.7.0") } } @@ -165,7 +165,7 @@ where type Error = IoError; type Future = future::FutureResult; - fn upgrade_inbound(self, socket: C, _: Self::UpgradeId) -> Self::Future { + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { future::ok(self.upgrade(socket, Endpoint::Listener)) } } @@ -178,7 +178,7 @@ where type Error = IoError; type Future = future::FutureResult; - fn upgrade_outbound(self, socket: C, _: Self::UpgradeId) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { future::ok(self.upgrade(socket, Endpoint::Dialer)) } } diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 5b6a6dab..88e862db 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -9,7 +9,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -bytes = "0.4" futures = "0.1" libp2p-core = { version = "0.1.0", path = "../../core" } log = "0.4" diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index b57369d7..c8c502ea 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -21,7 +21,6 @@ //! Implements the Yamux multiplexing protocol for libp2p, see also the //! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md). -extern crate bytes; extern crate futures; #[macro_use] extern crate log; @@ -29,7 +28,6 @@ extern crate libp2p_core; extern crate tokio_io; extern crate yamux; -use bytes::Bytes; use futures::{future::{self, FutureResult}, prelude::*}; use libp2p_core::{muxing::Shutdown, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}}; use std::{io, iter}; @@ -136,11 +134,11 @@ impl Default for Config { } impl UpgradeInfo for Config { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Once; - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/yamux/1.0.0"), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/yamux/1.0.0") } } @@ -152,7 +150,7 @@ where type Error = io::Error; type Future = FutureResult, io::Error>; - fn upgrade_inbound(self, i: C, _: Self::UpgradeId) -> Self::Future { + fn upgrade_inbound(self, i: C, _: Self::Info) -> Self::Future { future::ok(Yamux::new(i, self.0, yamux::Mode::Server)) } } @@ -165,7 +163,7 @@ where type Error = io::Error; type Future = FutureResult, io::Error>; - fn upgrade_outbound(self, i: C, _: Self::UpgradeId) -> Self::Future { + fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future { future::ok(Yamux::new(i, self.0, yamux::Mode::Client)) } } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index e418284b..16016d66 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use crate::rpc_proto; use futures::future; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId}; @@ -42,12 +42,12 @@ impl FloodsubConfig { } impl UpgradeInfo for FloodsubConfig { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Once; #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once(("/floodsub/1.0.0".into(), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/floodsub/1.0.0") } } @@ -60,7 +60,7 @@ where type Future = future::FutureResult; #[inline] - fn upgrade_inbound(self, socket: TSocket, _: Self::UpgradeId) -> Self::Future { + fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { future::ok(Framed::new(socket, FloodsubCodec { length_prefix: Default::default() })) } } @@ -74,7 +74,7 @@ where type Future = future::FutureResult; #[inline] - fn upgrade_outbound(self, socket: TSocket, _: Self::UpgradeId) -> Self::Future { + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { future::ok(Framed::new(socket, FloodsubCodec { length_prefix: Default::default() })) } } diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 91828422..83b1b1f2 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use futures::{future::{self, FutureResult}, Async, AsyncSink, Future, Poll, Sink, Stream}; use libp2p_core::{ Multiaddr, PublicKey, @@ -134,12 +134,11 @@ pub struct IdentifyInfo { } impl UpgradeInfo for IdentifyProtocolConfig { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Once; - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/ipfs/id/1.0.0"), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/ipfs/id/1.0.0") } } @@ -151,7 +150,7 @@ where type Error = IoError; type Future = FutureResult; - fn upgrade_inbound(self, socket: C, _: ()) -> Self::Future { + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { trace!("Upgrading inbound connection"); let socket = Framed::new(socket, codec::UviBytes::default()); let sender = IdentifySender { inner: socket }; @@ -167,7 +166,7 @@ where type Error = IoError; type Future = IdentifyOutboundFuture; - fn upgrade_outbound(self, socket: C, _: ()) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { IdentifyOutboundFuture { inner: Framed::new(socket, codec::UviBytes::::default()), } diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 246c0d7d..84b45d5c 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -26,7 +26,7 @@ //! The `Stream` component is used to poll the underlying transport, and the `Sink` component is //! used to send messages. -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use futures::{future, sink, stream, Sink, Stream}; use libp2p_core::{InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId, UpgradeInfo}; use multihash::Multihash; @@ -136,12 +136,12 @@ impl Into for KadPeer { pub struct KademliaProtocolConfig; impl UpgradeInfo for KademliaProtocolConfig { - type NamesIter = iter::Once<(Bytes, ())>; - type UpgradeId = (); + type Info = &'static [u8]; + type InfoIter = iter::Once; #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once(("/ipfs/kad/1.0.0".into(), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/ipfs/kad/1.0.0") } } @@ -154,7 +154,7 @@ where type Error = IoError; #[inline] - fn upgrade_inbound(self, incoming: C, _: ()) -> Self::Future { + fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future { future::ok( Framed::new(incoming, codec::UviBytes::default()) .from_err::() @@ -180,7 +180,7 @@ where type Error = IoError; #[inline] - fn upgrade_outbound(self, incoming: C, _: ()) -> Self::Future { + fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future { future::ok( Framed::new(incoming, codec::UviBytes::default()) .from_err::() diff --git a/protocols/observed/src/lib.rs b/protocols/observed/src/lib.rs index 919e3ebf..5230b891 100644 --- a/protocols/observed/src/lib.rs +++ b/protocols/observed/src/lib.rs @@ -46,11 +46,11 @@ impl Observed { } impl UpgradeInfo for Observed { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Once; - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/paritytech/observed-address/0.1.0"), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/paritytech/observed-address/0.1.0") } } @@ -62,7 +62,7 @@ where type Error = io::Error; type Future = Box + Send>; - fn upgrade_inbound(self, conn: C, _: ()) -> Self::Future { + fn upgrade_inbound(self, conn: C, _: Self::Info) -> Self::Future { let io = FramedWrite::new(conn, UviBytes::default()); Box::new(future::ok(Sender { io })) } @@ -76,7 +76,7 @@ where type Error = io::Error; type Future = Box + Send>; - fn upgrade_outbound(self, conn: C, _: ()) -> Self::Future { + fn upgrade_outbound(self, conn: C, _: Self::Info) -> Self::Future { let io = FramedRead::new(conn, UviBytes::default()); let future = io.into_future() .map_err(|(e, _): (io::Error, FramedRead)| e) @@ -126,14 +126,14 @@ mod tests { .into_future() .map_err(|(e, _)| e.into()) .and_then(move |(conn, _)| { - Observed::new().upgrade_inbound(conn.unwrap(), ()) + Observed::new().upgrade_inbound(conn.unwrap(), b"/paritytech/observed-address/0.1.0") }) .and_then(move |sender| sender.send_address(observed_addr1)); let client = TcpStream::connect(&server_addr) .map_err(|e| e.into()) .and_then(|conn| { - Observed::new().upgrade_outbound(conn, ()) + Observed::new().upgrade_outbound(conn, b"/paritytech/observed-address/0.1.0") }) .map(move |addr| { eprintln!("{} {}", addr, observed_addr2); diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 24ab1953..5eb44b3d 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -44,11 +44,11 @@ impl Default for Ping { } impl UpgradeInfo for Ping { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Once; - fn protocol_names(&self) -> Self::NamesIter { - iter::once(("/ipfs/ping/1.0.0".into(), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/ipfs/ping/1.0.0") } } @@ -61,7 +61,7 @@ where type Future = FutureResult; #[inline] - fn upgrade_inbound(self, socket: TSocket, _: Self::UpgradeId) -> Self::Future { + fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { let listener = PingListener { inner: Framed::new(socket, Codec), state: PingListenerState::Listening, @@ -79,7 +79,7 @@ where type Future = FutureResult; #[inline] - fn upgrade_outbound(self, socket: TSocket, _: Self::UpgradeId) -> Self::Future { + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { let dialer = PingDialer { inner: Framed::new(socket, Codec), need_writer_flush: false, @@ -341,14 +341,14 @@ mod tests { .into_future() .map_err(|(e, _)| e.into()) .and_then(|(c, _)| { - Ping::<()>::default().upgrade_inbound(c.unwrap(), ()) + Ping::<()>::default().upgrade_inbound(c.unwrap(), b"/ipfs/ping/1.0.0") }) .flatten(); let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) .and_then(|c| { - Ping::<()>::default().upgrade_outbound(c, ()) + Ping::<()>::default().upgrade_outbound(c, b"/ipfs/ping/1.0.0") }) .and_then(|mut pinger| { pinger.ping(()); @@ -371,14 +371,14 @@ mod tests { .into_future() .map_err(|(e, _)| e.into()) .and_then(|(c, _)| { - Ping::::default().upgrade_inbound(c.unwrap(), ()) + Ping::::default().upgrade_inbound(c.unwrap(), b"/ipfs/ping/1.0.0") }) .flatten(); let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) .and_then(|c| { - Ping::::default().upgrade_outbound(c, ()) + Ping::::default().upgrade_outbound(c, b"/ipfs/ping/1.0.0") }) .and_then(|mut pinger| { for n in 0..20 { diff --git a/protocols/plaintext/Cargo.toml b/protocols/plaintext/Cargo.toml index 006ffc38..29d4a249 100644 --- a/protocols/plaintext/Cargo.toml +++ b/protocols/plaintext/Cargo.toml @@ -9,7 +9,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -bytes = "0.4" futures = "0.1" libp2p-core = { version = "0.1.0", path = "../../core" } void = "1" diff --git a/protocols/plaintext/src/lib.rs b/protocols/plaintext/src/lib.rs index d2a64370..7b203ee8 100644 --- a/protocols/plaintext/src/lib.rs +++ b/protocols/plaintext/src/lib.rs @@ -18,12 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -extern crate bytes; extern crate futures; extern crate libp2p_core; extern crate void; -use bytes::Bytes; use futures::future::{self, FutureResult}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::iter; @@ -33,11 +31,11 @@ use void::Void; pub struct PlainTextConfig; impl UpgradeInfo for PlainTextConfig { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Once; - fn protocol_names(&self) -> Self::NamesIter { - iter::once((Bytes::from("/plaintext/1.0.0"), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/plaintext/1.0.0") } } @@ -46,7 +44,7 @@ impl InboundUpgrade for PlainTextConfig { type Error = Void; type Future = FutureResult; - fn upgrade_inbound(self, i: C, _: Self::UpgradeId) -> Self::Future { + fn upgrade_inbound(self, i: C, _: Self::Info) -> Self::Future { future::ok(i) } } @@ -56,7 +54,7 @@ impl OutboundUpgrade for PlainTextConfig { type Error = Void; type Future = FutureResult; - fn upgrade_outbound(self, i: C, _: Self::UpgradeId) -> Self::Future { + fn upgrade_outbound(self, i: C, _: Self::Info) -> Self::Future { future::ok(i) } } diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index 0da4913e..4a0a8ca3 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -116,7 +116,7 @@ pub use self::error::SecioError; #[cfg(feature = "secp256k1")] use asn1_der::{traits::FromDerEncoded, traits::FromDerObject, DerObject}; -use bytes::{Bytes, BytesMut}; +use bytes::BytesMut; use ed25519_dalek::Keypair as Ed25519KeyPair; use futures::stream::MapErr as StreamMapErr; use futures::{Future, Poll, Sink, StartSend, Stream}; @@ -193,7 +193,7 @@ impl SecioConfig { self } - fn handshake(self, socket: T, _: ()) -> impl Future, Error=SecioError> + fn handshake(self, socket: T) -> impl Future, Error=SecioError> where T: AsyncRead + AsyncWrite + Send + 'static { @@ -371,11 +371,11 @@ where } impl UpgradeInfo for SecioConfig { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = &'static [u8]; + type InfoIter = iter::Once; - fn protocol_names(&self) -> Self::NamesIter { - iter::once(("/secio/1.0.0".into(), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/secio/1.0.0") } } @@ -387,8 +387,8 @@ where type Error = SecioError; type Future = Box + Send>; - fn upgrade_inbound(self, socket: T, id: Self::UpgradeId) -> Self::Future { - Box::new(self.handshake(socket, id)) + fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { + Box::new(self.handshake(socket)) } } @@ -400,8 +400,8 @@ where type Error = SecioError; type Future = Box + Send>; - fn upgrade_outbound(self, socket: T, id: Self::UpgradeId) -> Self::Future { - Box::new(self.handshake(socket, id)) + fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { + Box::new(self.handshake(socket)) } } diff --git a/src/simple.rs b/src/simple.rs index 94332c93..f17ebbc6 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -27,7 +27,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// Implementation of `ConnectionUpgrade`. Convenient to use with small protocols. #[derive(Debug)] pub struct SimpleProtocol { - name: Bytes, + info: Bytes, // Note: we put the closure `F` in an `Arc` because Rust closures aren't automatically clonable // yet. upgrade: Arc, @@ -36,12 +36,12 @@ pub struct SimpleProtocol { impl SimpleProtocol { /// Builds a `SimpleProtocol`. #[inline] - pub fn new(name: N, upgrade: F) -> SimpleProtocol + pub fn new(info: N, upgrade: F) -> SimpleProtocol where N: Into, { SimpleProtocol { - name: name.into(), + info: info.into(), upgrade: Arc::new(upgrade), } } @@ -51,19 +51,19 @@ impl Clone for SimpleProtocol { #[inline] fn clone(&self) -> Self { SimpleProtocol { - name: self.name.clone(), + info: self.info.clone(), upgrade: self.upgrade.clone(), } } } impl UpgradeInfo for SimpleProtocol { - type UpgradeId = (); - type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>; + type Info = Bytes; + type InfoIter = iter::Once; #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once((self.name.clone(), ())) + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.info.clone()) } } @@ -78,7 +78,7 @@ where type Future = FromErr; #[inline] - fn upgrade_inbound(self, socket: C, _: Self::UpgradeId) -> Self::Future { + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket).into_future().from_err() } @@ -95,7 +95,7 @@ where type Future = FromErr; #[inline] - fn upgrade_outbound(self, socket: C, _: Self::UpgradeId) -> Self::Future { + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket).into_future().from_err() }