diff --git a/core/src/nodes/swarm.rs b/core/src/nodes/swarm.rs index d7d51a54..4320d1ad 100644 --- a/core/src/nodes/swarm.rs +++ b/core/src/nodes/swarm.rs @@ -89,7 +89,7 @@ where TBehaviour: NetworkBehaviour, ::OutEvent: Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary ::InboundProtocol: InboundUpgrade> + Send + 'static, - <::InboundProtocol as UpgradeInfo>::NamesIter: Clone + Send + 'static, + <::InboundProtocol as UpgradeInfo>::NamesIter: Send + 'static, <::InboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, <::InboundProtocol as InboundUpgrade>>::Error: fmt::Debug + Send + 'static, <::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, @@ -184,12 +184,12 @@ where TBehaviour: NetworkBehaviour, ::InboundProtocol: InboundUpgrade> + Send + 'static, <::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, <::InboundProtocol as InboundUpgrade>>::Error: fmt::Debug + Send + 'static, - <::InboundProtocol as UpgradeInfo>::NamesIter: Clone + Send + 'static, + <::InboundProtocol as UpgradeInfo>::NamesIter: Send + 'static, <::InboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, ::OutboundProtocol: OutboundUpgrade> + Send + 'static, <::OutboundProtocol as OutboundUpgrade>>::Future: Send + 'static, <::OutboundProtocol as OutboundUpgrade>>::Error: fmt::Debug + Send + 'static, - <::OutboundProtocol as UpgradeInfo>::NamesIter: Clone + Send + 'static, + <::OutboundProtocol as UpgradeInfo>::NamesIter: Send + 'static, <::OutboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary TTopology: Topology, diff --git a/core/src/protocols_handler.rs b/core/src/protocols_handler.rs index 5e8d9128..304d3ed2 100644 --- a/core/src/protocols_handler.rs +++ b/core/src/protocols_handler.rs @@ -26,7 +26,6 @@ use crate::{ InboundUpgrade, InboundUpgradeExt, OutboundUpgrade, - UpgradeInfo, InboundUpgradeApply, OutboundUpgradeApply, DeniedUpgrade, @@ -592,7 +591,6 @@ where impl NodeHandler for NodeHandlerWrapper where TProtoHandler: ProtocolsHandler, - ::NamesIter: Clone, ::Substream>>::Error: std::fmt::Debug { type InEvent = TProtoHandler::InEvent; diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 77dc8956..79927036 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -44,7 +44,7 @@ where D::Output: AsyncRead + AsyncWrite + Send + 'static, U: InboundUpgrade, U: OutboundUpgrade + Send + Clone + 'static, - ::NamesIter: Clone + Send, + ::NamesIter: Send, ::UpgradeId: Send, >::Future: Send, >::Future: Send, diff --git a/core/src/upgrade/apply.rs b/core/src/upgrade/apply.rs index 1c886b88..c734e624 100644 --- a/core/src/upgrade/apply.rs +++ b/core/src/upgrade/apply.rs @@ -19,7 +19,8 @@ // DEALINGS IN THE SOFTWARE. use bytes::Bytes; -use crate::{nodes::ConnectedPoint, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}}; +use crate::nodes::ConnectedPoint; +use crate::upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, UpgradeError}; use futures::{future::Either, prelude::*}; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; use std::mem; @@ -31,7 +32,6 @@ pub fn apply(conn: C, up: U, cp: ConnectedPoint) where C: AsyncRead + AsyncWrite, U: InboundUpgrade + OutboundUpgrade, - U::NamesIter: Clone { if cp.is_listener() { Either::A(apply_inbound(conn, up)) @@ -45,12 +45,10 @@ pub fn apply_inbound(conn: C, up: U) -> InboundUpgradeApply where C: AsyncRead + AsyncWrite, U: InboundUpgrade, - U::NamesIter: Clone { - let iter = ProtocolNames(up.protocol_names()); - let future = multistream_select::listener_select_proto(conn, iter); + let future = multistream_select::listener_select_proto(conn, UpgradeIntoProtocolsIterWrap(up)); InboundUpgradeApply { - inner: InboundUpgradeApplyState::Init { future, upgrade: up } + inner: InboundUpgradeApplyState::Init { future } } } @@ -82,8 +80,7 @@ where U: InboundUpgrade { Init { - future: ListenerSelectFuture, U::UpgradeId>, - upgrade: U + future: ListenerSelectFuture, U::UpgradeId>, }, Upgrade { future: U::Future @@ -95,7 +92,6 @@ impl Future for InboundUpgradeApply where C: AsyncRead + AsyncWrite, U: InboundUpgrade, - U::NamesIter: Clone { type Item = U::Output; type Error = UpgradeError; @@ -103,16 +99,16 @@ where fn poll(&mut self) -> Poll { loop { match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) { - InboundUpgradeApplyState::Init { mut future, upgrade } => { - let (upgrade_id, connection) = match future.poll()? { + InboundUpgradeApplyState::Init { mut future } => { + let (upgrade_id, connection, upgrade) = match future.poll()? { Async::Ready(x) => x, Async::NotReady => { - self.inner = InboundUpgradeApplyState::Init { future, upgrade }; + self.inner = InboundUpgradeApplyState::Init { future }; return Ok(Async::NotReady) } }; self.inner = InboundUpgradeApplyState::Upgrade { - future: upgrade.upgrade_inbound(connection, upgrade_id) + future: upgrade.0.upgrade_inbound(connection, upgrade_id) }; } InboundUpgradeApplyState::Upgrade { mut future } => { @@ -208,6 +204,21 @@ where } } +/// Wraps around a `UpgradeInfo` and satisfies the requirement of `listener_select_proto`. +struct UpgradeIntoProtocolsIterWrap(U); + +impl<'a, U> IntoIterator for &'a UpgradeIntoProtocolsIterWrap +where U: UpgradeInfo +{ + type Item = (Bytes, fn(&Bytes, &Bytes) -> bool, U::UpgradeId); + type IntoIter = ProtocolNames; + + #[inline] + fn into_iter(self) -> Self::IntoIter { + ProtocolNames(self.0.protocol_names()) + } +} + /// Iterator adapter which adds equality matching predicates to items. /// Used in `NegotiationFuture`. #[derive(Clone)] diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index f68e6b42..df7bd62c 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -76,45 +76,6 @@ //! # } //! ``` //! -//! For a listener: -//! -//! ```no_run -//! extern crate bytes; -//! extern crate futures; -//! extern crate multistream_select; -//! extern crate tokio; -//! extern crate tokio_tcp; -//! -//! # fn main() { -//! use bytes::Bytes; -//! use multistream_select::listener_select_proto; -//! use futures::{Future, Sink, Stream}; -//! use tokio_tcp::TcpListener; -//! use tokio::runtime::current_thread::Runtime; -//! -//! #[derive(Debug, Copy, Clone)] -//! enum MyProto { Echo, Hello } -//! -//! let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap() -//! .incoming() -//! .from_err() -//! .and_then(move |connec| { -//! let protos = vec![ -//! (Bytes::from("/echo/1.0.0"), ::eq, MyProto::Echo), -//! (Bytes::from("/hello/2.5.0"), ::eq, MyProto::Hello), -//! ] -//! .into_iter(); -//! listener_select_proto(connec, protos) -//! }) -//! .for_each(|(proto, _connec)| { -//! println!("new remote with {:?} negotiated", proto); -//! Ok(()) -//! }); -//! -//! let mut rt = Runtime::new().unwrap(); -//! let _ = rt.block_on(server).expect("failed to run server"); -//! # } -//! ``` extern crate bytes; #[macro_use] diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index c47c44e2..4d0f7764 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -45,7 +45,7 @@ use ProtocolChoiceError; pub fn listener_select_proto(inner: R, protocols: I) -> ListenerSelectFuture where R: AsyncRead + AsyncWrite, - I: Iterator + Clone, + for<'r> &'r I: IntoIterator, M: FnMut(&Bytes, &Bytes) -> bool, { ListenerSelectFuture { @@ -77,11 +77,11 @@ enum ListenerSelectState { impl Future for ListenerSelectFuture where - I: Iterator + Clone, + for<'r> &'r I: IntoIterator, M: FnMut(&Bytes, &Bytes) -> bool, R: AsyncRead + AsyncWrite, { - type Item = (P, R); + type Item = (P, R, I); type Error = ProtocolChoiceError; fn poll(&mut self) -> Poll { @@ -110,7 +110,7 @@ where match msg { Some(DialerToListenerMessage::ProtocolsListRequest) => { let msg = ListenerToDialerMessage::ProtocolsListResponse { - list: protocols.clone().map(|(p, _, _)| p).collect(), + list: protocols.into_iter().map(|(p, _, _)| p).collect(), }; trace!("protocols list response: {:?}", msg); let sender = listener.send(msg); @@ -123,7 +123,7 @@ where Some(DialerToListenerMessage::ProtocolRequest { name }) => { let mut outcome = None; let mut send_back = ListenerToDialerMessage::NotAvailable; - for (supported, mut matches, value) in protocols.clone() { + for (supported, mut matches, value) in &protocols { if matches(&name, &supported) { send_back = ListenerToDialerMessage::ProtocolAck {name: name.clone()}; outcome = Some(value); @@ -149,7 +149,7 @@ where } }; if let Some(p) = outcome { - return Ok(Async::Ready((p, listener.into_inner()))) + return Ok(Async::Ready((p, listener.into_inner(), protocols))) } else { let stream = listener.into_future(); self.inner = ListenerSelectState::Incoming { stream, protocols } diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index 6120bea6..b3827ea7 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -35,6 +35,19 @@ use protocol::{Dialer, DialerToListenerMessage, Listener, ListenerToDialerMessag use ProtocolChoiceError; use {dialer_select_proto, listener_select_proto}; +/// Holds a `Vec` and satifies the iterator requirements of `listener_select_proto`. +struct VecRefIntoIter(Vec); + +impl<'a, T> IntoIterator for &'a VecRefIntoIter +where T: Clone +{ + type Item = T; + type IntoIter = std::vec::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.0.clone().into_iter() + } +} + #[test] fn negotiate_with_self_succeeds() { let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); @@ -88,8 +101,8 @@ fn select_proto_basic() { let protos = vec![ (Bytes::from("/proto1"), ::eq, 0), (Bytes::from("/proto2"), ::eq, 1), - ].into_iter(); - listener_select_proto(connec, protos).map(|r| r.0) + ]; + listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0) }); let client = TcpStream::connect(&listener_addr) @@ -122,8 +135,8 @@ fn no_protocol_found() { let protos = vec![ (Bytes::from("/proto1"), ::eq, 1), (Bytes::from("/proto2"), ::eq, 2), - ].into_iter(); - listener_select_proto(connec, protos).map(|r| r.0) + ]; + listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0) }); let client = TcpStream::connect(&listener_addr) @@ -156,8 +169,8 @@ fn select_proto_parallel() { let protos = vec![ (Bytes::from("/proto1"), ::eq, 0), (Bytes::from("/proto2"), ::eq, 1), - ].into_iter(); - listener_select_proto(connec, protos).map(|r| r.0) + ]; + listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0) }); let client = TcpStream::connect(&listener_addr) @@ -191,8 +204,8 @@ fn select_proto_serial() { let protos = vec![ (Bytes::from("/proto1"), ::eq, 0), (Bytes::from("/proto2"), ::eq, 1), - ].into_iter(); - listener_select_proto(connec, protos).map(|r| r.0) + ]; + listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0) }); let client = TcpStream::connect(&listener_addr)