Remove the NamesIter: Clone requirement (#663)

* Remove the NamesIter: Clone requirement

* Fix concerns
This commit is contained in:
Pierre Krieger
2018-11-22 18:15:35 +01:00
committed by GitHub
parent 177c6cf842
commit 1da97242da
7 changed files with 55 additions and 72 deletions

View File

@ -89,7 +89,7 @@ where TBehaviour: NetworkBehaviour,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutEvent: Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutEvent: Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary <TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::NamesIter: Clone + Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::NamesIter: Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
@ -184,12 +184,12 @@ where TBehaviour: NetworkBehaviour,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::NamesIter: Clone + Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::NamesIter: Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Substream<TMuxer>> + Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::NamesIter: Clone + Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::NamesIter: Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::UpgradeId: Send + 'static,
<NodeHandlerWrapper<TBehaviour::ProtocolsHandler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary <NodeHandlerWrapper<TBehaviour::ProtocolsHandler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
TTopology: Topology, TTopology: Topology,

View File

@ -26,7 +26,6 @@ use crate::{
InboundUpgrade, InboundUpgrade,
InboundUpgradeExt, InboundUpgradeExt,
OutboundUpgrade, OutboundUpgrade,
UpgradeInfo,
InboundUpgradeApply, InboundUpgradeApply,
OutboundUpgradeApply, OutboundUpgradeApply,
DeniedUpgrade, DeniedUpgrade,
@ -592,7 +591,6 @@ where
impl<TProtoHandler> NodeHandler for NodeHandlerWrapper<TProtoHandler> impl<TProtoHandler> NodeHandler for NodeHandlerWrapper<TProtoHandler>
where where
TProtoHandler: ProtocolsHandler, TProtoHandler: ProtocolsHandler,
<TProtoHandler::InboundProtocol as UpgradeInfo>::NamesIter: Clone,
<TProtoHandler::OutboundProtocol as OutboundUpgrade<<TProtoHandler as ProtocolsHandler>::Substream>>::Error: std::fmt::Debug <TProtoHandler::OutboundProtocol as OutboundUpgrade<<TProtoHandler as ProtocolsHandler>::Substream>>::Error: std::fmt::Debug
{ {
type InEvent = TProtoHandler::InEvent; type InEvent = TProtoHandler::InEvent;

View File

@ -44,7 +44,7 @@ where
D::Output: AsyncRead + AsyncWrite + Send + 'static, D::Output: AsyncRead + AsyncWrite + Send + 'static,
U: InboundUpgrade<D::Output, Output = O, Error = E>, U: InboundUpgrade<D::Output, Output = O, Error = E>,
U: OutboundUpgrade<D::Output, Output = O, Error = E> + Send + Clone + 'static, U: OutboundUpgrade<D::Output, Output = O, Error = E> + Send + Clone + 'static,
<U as UpgradeInfo>::NamesIter: Clone + Send, <U as UpgradeInfo>::NamesIter: Send,
<U as UpgradeInfo>::UpgradeId: Send, <U as UpgradeInfo>::UpgradeId: Send,
<U as InboundUpgrade<D::Output>>::Future: Send, <U as InboundUpgrade<D::Output>>::Future: Send,
<U as OutboundUpgrade<D::Output>>::Future: Send, <U as OutboundUpgrade<D::Output>>::Future: Send,

View File

@ -19,7 +19,8 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use bytes::Bytes; use bytes::Bytes;
use crate::{nodes::ConnectedPoint, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}}; use crate::nodes::ConnectedPoint;
use crate::upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, UpgradeError};
use futures::{future::Either, prelude::*}; use futures::{future::Either, prelude::*};
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture}; use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
use std::mem; use std::mem;
@ -31,7 +32,6 @@ pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint)
where where
C: AsyncRead + AsyncWrite, C: AsyncRead + AsyncWrite,
U: InboundUpgrade<C> + OutboundUpgrade<C>, U: InboundUpgrade<C> + OutboundUpgrade<C>,
U::NamesIter: Clone
{ {
if cp.is_listener() { if cp.is_listener() {
Either::A(apply_inbound(conn, up)) Either::A(apply_inbound(conn, up))
@ -45,12 +45,10 @@ pub fn apply_inbound<C, U>(conn: C, up: U) -> InboundUpgradeApply<C, U>
where where
C: AsyncRead + AsyncWrite, C: AsyncRead + AsyncWrite,
U: InboundUpgrade<C>, U: InboundUpgrade<C>,
U::NamesIter: Clone
{ {
let iter = ProtocolNames(up.protocol_names()); let future = multistream_select::listener_select_proto(conn, UpgradeIntoProtocolsIterWrap(up));
let future = multistream_select::listener_select_proto(conn, iter);
InboundUpgradeApply { InboundUpgradeApply {
inner: InboundUpgradeApplyState::Init { future, upgrade: up } inner: InboundUpgradeApplyState::Init { future }
} }
} }
@ -82,8 +80,7 @@ where
U: InboundUpgrade<C> U: InboundUpgrade<C>
{ {
Init { Init {
future: ListenerSelectFuture<C, ProtocolNames<U::NamesIter>, U::UpgradeId>, future: ListenerSelectFuture<C, UpgradeIntoProtocolsIterWrap<U>, U::UpgradeId>,
upgrade: U
}, },
Upgrade { Upgrade {
future: U::Future future: U::Future
@ -95,7 +92,6 @@ impl<C, U> Future for InboundUpgradeApply<C, U>
where where
C: AsyncRead + AsyncWrite, C: AsyncRead + AsyncWrite,
U: InboundUpgrade<C>, U: InboundUpgrade<C>,
U::NamesIter: Clone
{ {
type Item = U::Output; type Item = U::Output;
type Error = UpgradeError<U::Error>; type Error = UpgradeError<U::Error>;
@ -103,16 +99,16 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop { loop {
match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) { match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) {
InboundUpgradeApplyState::Init { mut future, upgrade } => { InboundUpgradeApplyState::Init { mut future } => {
let (upgrade_id, connection) = match future.poll()? { let (upgrade_id, connection, upgrade) = match future.poll()? {
Async::Ready(x) => x, Async::Ready(x) => x,
Async::NotReady => { Async::NotReady => {
self.inner = InboundUpgradeApplyState::Init { future, upgrade }; self.inner = InboundUpgradeApplyState::Init { future };
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
}; };
self.inner = InboundUpgradeApplyState::Upgrade { self.inner = InboundUpgradeApplyState::Upgrade {
future: upgrade.upgrade_inbound(connection, upgrade_id) future: upgrade.0.upgrade_inbound(connection, upgrade_id)
}; };
} }
InboundUpgradeApplyState::Upgrade { mut future } => { InboundUpgradeApplyState::Upgrade { mut future } => {
@ -208,6 +204,21 @@ where
} }
} }
/// Wraps around a `UpgradeInfo` and satisfies the requirement of `listener_select_proto`.
struct UpgradeIntoProtocolsIterWrap<U>(U);
impl<'a, U> IntoIterator for &'a UpgradeIntoProtocolsIterWrap<U>
where U: UpgradeInfo
{
type Item = (Bytes, fn(&Bytes, &Bytes) -> bool, U::UpgradeId);
type IntoIter = ProtocolNames<U::NamesIter>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
ProtocolNames(self.0.protocol_names())
}
}
/// Iterator adapter which adds equality matching predicates to items. /// Iterator adapter which adds equality matching predicates to items.
/// Used in `NegotiationFuture`. /// Used in `NegotiationFuture`.
#[derive(Clone)] #[derive(Clone)]

View File

@ -76,45 +76,6 @@
//! # } //! # }
//! ``` //! ```
//! //!
//! For a listener:
//!
//! ```no_run
//! extern crate bytes;
//! extern crate futures;
//! extern crate multistream_select;
//! extern crate tokio;
//! extern crate tokio_tcp;
//!
//! # fn main() {
//! use bytes::Bytes;
//! use multistream_select::listener_select_proto;
//! use futures::{Future, Sink, Stream};
//! use tokio_tcp::TcpListener;
//! use tokio::runtime::current_thread::Runtime;
//!
//! #[derive(Debug, Copy, Clone)]
//! enum MyProto { Echo, Hello }
//!
//! let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap()
//! .incoming()
//! .from_err()
//! .and_then(move |connec| {
//! let protos = vec![
//! (Bytes::from("/echo/1.0.0"), <Bytes as PartialEq>::eq, MyProto::Echo),
//! (Bytes::from("/hello/2.5.0"), <Bytes as PartialEq>::eq, MyProto::Hello),
//! ]
//! .into_iter();
//! listener_select_proto(connec, protos)
//! })
//! .for_each(|(proto, _connec)| {
//! println!("new remote with {:?} negotiated", proto);
//! Ok(())
//! });
//!
//! let mut rt = Runtime::new().unwrap();
//! let _ = rt.block_on(server).expect("failed to run server");
//! # }
//! ```
extern crate bytes; extern crate bytes;
#[macro_use] #[macro_use]

View File

@ -45,7 +45,7 @@ use ProtocolChoiceError;
pub fn listener_select_proto<R, I, M, P>(inner: R, protocols: I) -> ListenerSelectFuture<R, I, P> pub fn listener_select_proto<R, I, M, P>(inner: R, protocols: I) -> ListenerSelectFuture<R, I, P>
where where
R: AsyncRead + AsyncWrite, R: AsyncRead + AsyncWrite,
I: Iterator<Item = (Bytes, M, P)> + Clone, for<'r> &'r I: IntoIterator<Item = (Bytes, M, P)>,
M: FnMut(&Bytes, &Bytes) -> bool, M: FnMut(&Bytes, &Bytes) -> bool,
{ {
ListenerSelectFuture { ListenerSelectFuture {
@ -77,11 +77,11 @@ enum ListenerSelectState<R: AsyncRead + AsyncWrite, I, P> {
impl<R, I, M, P> Future for ListenerSelectFuture<R, I, P> impl<R, I, M, P> Future for ListenerSelectFuture<R, I, P>
where where
I: Iterator<Item=(Bytes, M, P)> + Clone, for<'r> &'r I: IntoIterator<Item=(Bytes, M, P)>,
M: FnMut(&Bytes, &Bytes) -> bool, M: FnMut(&Bytes, &Bytes) -> bool,
R: AsyncRead + AsyncWrite, R: AsyncRead + AsyncWrite,
{ {
type Item = (P, R); type Item = (P, R, I);
type Error = ProtocolChoiceError; type Error = ProtocolChoiceError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -110,7 +110,7 @@ where
match msg { match msg {
Some(DialerToListenerMessage::ProtocolsListRequest) => { Some(DialerToListenerMessage::ProtocolsListRequest) => {
let msg = ListenerToDialerMessage::ProtocolsListResponse { let msg = ListenerToDialerMessage::ProtocolsListResponse {
list: protocols.clone().map(|(p, _, _)| p).collect(), list: protocols.into_iter().map(|(p, _, _)| p).collect(),
}; };
trace!("protocols list response: {:?}", msg); trace!("protocols list response: {:?}", msg);
let sender = listener.send(msg); let sender = listener.send(msg);
@ -123,7 +123,7 @@ where
Some(DialerToListenerMessage::ProtocolRequest { name }) => { Some(DialerToListenerMessage::ProtocolRequest { name }) => {
let mut outcome = None; let mut outcome = None;
let mut send_back = ListenerToDialerMessage::NotAvailable; let mut send_back = ListenerToDialerMessage::NotAvailable;
for (supported, mut matches, value) in protocols.clone() { for (supported, mut matches, value) in &protocols {
if matches(&name, &supported) { if matches(&name, &supported) {
send_back = ListenerToDialerMessage::ProtocolAck {name: name.clone()}; send_back = ListenerToDialerMessage::ProtocolAck {name: name.clone()};
outcome = Some(value); outcome = Some(value);
@ -149,7 +149,7 @@ where
} }
}; };
if let Some(p) = outcome { if let Some(p) = outcome {
return Ok(Async::Ready((p, listener.into_inner()))) return Ok(Async::Ready((p, listener.into_inner(), protocols)))
} else { } else {
let stream = listener.into_future(); let stream = listener.into_future();
self.inner = ListenerSelectState::Incoming { stream, protocols } self.inner = ListenerSelectState::Incoming { stream, protocols }

View File

@ -35,6 +35,19 @@ use protocol::{Dialer, DialerToListenerMessage, Listener, ListenerToDialerMessag
use ProtocolChoiceError; use ProtocolChoiceError;
use {dialer_select_proto, listener_select_proto}; use {dialer_select_proto, listener_select_proto};
/// Holds a `Vec` and satifies the iterator requirements of `listener_select_proto`.
struct VecRefIntoIter<T>(Vec<T>);
impl<'a, T> IntoIterator for &'a VecRefIntoIter<T>
where T: Clone
{
type Item = T;
type IntoIter = std::vec::IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
self.0.clone().into_iter()
}
}
#[test] #[test]
fn negotiate_with_self_succeeds() { fn negotiate_with_self_succeeds() {
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
@ -88,8 +101,8 @@ fn select_proto_basic() {
let protos = vec![ let protos = vec![
(Bytes::from("/proto1"), <Bytes as PartialEq>::eq, 0), (Bytes::from("/proto1"), <Bytes as PartialEq>::eq, 0),
(Bytes::from("/proto2"), <Bytes as PartialEq>::eq, 1), (Bytes::from("/proto2"), <Bytes as PartialEq>::eq, 1),
].into_iter(); ];
listener_select_proto(connec, protos).map(|r| r.0) listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0)
}); });
let client = TcpStream::connect(&listener_addr) let client = TcpStream::connect(&listener_addr)
@ -122,8 +135,8 @@ fn no_protocol_found() {
let protos = vec![ let protos = vec![
(Bytes::from("/proto1"), <Bytes as PartialEq>::eq, 1), (Bytes::from("/proto1"), <Bytes as PartialEq>::eq, 1),
(Bytes::from("/proto2"), <Bytes as PartialEq>::eq, 2), (Bytes::from("/proto2"), <Bytes as PartialEq>::eq, 2),
].into_iter(); ];
listener_select_proto(connec, protos).map(|r| r.0) listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0)
}); });
let client = TcpStream::connect(&listener_addr) let client = TcpStream::connect(&listener_addr)
@ -156,8 +169,8 @@ fn select_proto_parallel() {
let protos = vec![ let protos = vec![
(Bytes::from("/proto1"), <Bytes as PartialEq>::eq, 0), (Bytes::from("/proto1"), <Bytes as PartialEq>::eq, 0),
(Bytes::from("/proto2"), <Bytes as PartialEq>::eq, 1), (Bytes::from("/proto2"), <Bytes as PartialEq>::eq, 1),
].into_iter(); ];
listener_select_proto(connec, protos).map(|r| r.0) listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0)
}); });
let client = TcpStream::connect(&listener_addr) let client = TcpStream::connect(&listener_addr)
@ -191,8 +204,8 @@ fn select_proto_serial() {
let protos = vec![ let protos = vec![
(Bytes::from("/proto1"), <Bytes as PartialEq>::eq, 0), (Bytes::from("/proto1"), <Bytes as PartialEq>::eq, 0),
(Bytes::from("/proto2"), <Bytes as PartialEq>::eq, 1), (Bytes::from("/proto2"), <Bytes as PartialEq>::eq, 1),
].into_iter(); ];
listener_select_proto(connec, protos).map(|r| r.0) listener_select_proto(connec, VecRefIntoIter(protos)).map(|r| r.0)
}); });
let client = TcpStream::connect(&listener_addr) let client = TcpStream::connect(&listener_addr)