From 2c1807b646562d2bd1e7a3da0c450d82aa94daae Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 16 Nov 2018 13:59:56 +0100 Subject: [PATCH] Remove Send/Sync/'static requirements in identify (#649) --- protocols/identify/src/lib.rs | 3 +- protocols/identify/src/listen_handler.rs | 2 +- protocols/identify/src/listen_layer.rs | 11 +- protocols/identify/src/periodic_id_handler.rs | 2 +- protocols/identify/src/periodic_id_layer.rs | 2 +- protocols/identify/src/protocol.rs | 127 ++++++++++++------ 6 files changed, 95 insertions(+), 52 deletions(-) diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index b8253842..52e898bc 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -67,6 +67,7 @@ extern crate bytes; extern crate fnv; +#[macro_use] extern crate futures; extern crate libp2p_peerstore; extern crate libp2p_core; @@ -87,7 +88,7 @@ pub use self::listen_layer::IdentifyListen; pub use self::periodic_id_handler::{PeriodicIdentification, PeriodicIdentificationEvent}; pub use self::periodic_id_layer::{PeriodicIdentifyBehaviour, PeriodicIdentifyBehaviourEvent}; pub use self::protocol::{IdentifyInfo, RemoteInfo}; -pub use self::protocol::{IdentifyProtocolConfig, IdentifySender}; +pub use self::protocol::{IdentifyProtocolConfig, IdentifySender, IdentifySenderFuture}; mod id_transport; mod listen_handler; diff --git a/protocols/identify/src/listen_handler.rs b/protocols/identify/src/listen_handler.rs index 1d9e6f02..a4a23396 100644 --- a/protocols/identify/src/listen_handler.rs +++ b/protocols/identify/src/listen_handler.rs @@ -55,7 +55,7 @@ impl IdentifyListenHandler { impl ProtocolsHandler for IdentifyListenHandler where - TSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static, // TODO: remove useless bounds + TSubstream: AsyncRead + AsyncWrite, { type InEvent = Void; type OutEvent = IdentifySender; diff --git a/protocols/identify/src/listen_layer.rs b/protocols/identify/src/listen_layer.rs index 5942570e..7ae2cfec 100644 --- a/protocols/identify/src/listen_layer.rs +++ b/protocols/identify/src/listen_layer.rs @@ -22,10 +22,10 @@ use futures::prelude::*; use libp2p_core::nodes::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use smallvec::SmallVec; -use std::{collections::HashMap, io, marker::PhantomData}; +use std::collections::HashMap; use tokio_io::{AsyncRead, AsyncWrite}; use void::Void; -use {IdentifyListenHandler, IdentifyInfo}; +use {IdentifyListenHandler, IdentifyInfo, IdentifySenderFuture}; /// Network behaviour that automatically identifies nodes periodically, and returns information /// about them. @@ -35,9 +35,7 @@ pub struct IdentifyListen { /// For each peer we're connected to, the observed address to send back to it. observed_addresses: HashMap, /// List of futures that send back information back to remotes. - futures: SmallVec<[Box + Send>; 4]>, - /// Marker to pin the generics. - marker: PhantomData, + futures: SmallVec<[IdentifySenderFuture; 4]>, } impl IdentifyListen { @@ -47,7 +45,6 @@ impl IdentifyListen { send_back_info: info, observed_addresses: HashMap::new(), futures: SmallVec::new(), - marker: PhantomData, } } @@ -66,7 +63,7 @@ impl IdentifyListen { impl NetworkBehaviour for IdentifyListen where - TSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static, + TSubstream: AsyncRead + AsyncWrite, { type ProtocolsHandler = IdentifyListenHandler; type OutEvent = Void; diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs index 627e8f5a..c13aaa5b 100644 --- a/protocols/identify/src/periodic_id_handler.rs +++ b/protocols/identify/src/periodic_id_handler.rs @@ -77,7 +77,7 @@ impl PeriodicIdentification { impl ProtocolsHandler for PeriodicIdentification where - TSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static, // TODO: remove useless bounds + TSubstream: AsyncRead + AsyncWrite, { type InEvent = Void; type OutEvent = PeriodicIdentificationEvent; diff --git a/protocols/identify/src/periodic_id_layer.rs b/protocols/identify/src/periodic_id_layer.rs index c4198b26..b6806476 100644 --- a/protocols/identify/src/periodic_id_layer.rs +++ b/protocols/identify/src/periodic_id_layer.rs @@ -46,7 +46,7 @@ impl PeriodicIdentifyBehaviour { impl NetworkBehaviour for PeriodicIdentifyBehaviour where - TSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static, + TSubstream: AsyncRead + AsyncWrite, { type ProtocolsHandler = PeriodicIdentification; type OutEvent = PeriodicIdentifyBehaviourEvent; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 7405f5e6..7d9f5176 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use bytes::{Bytes, BytesMut}; -use futures::{future::{self, FutureResult}, Future, Sink, Stream}; +use futures::{future::{self, FutureResult}, Async, AsyncSink, Future, Poll, Sink, Stream}; use libp2p_core::{ Multiaddr, PublicKey, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} @@ -54,15 +54,10 @@ pub struct IdentifySender { inner: Framed>>, } -impl<'a, T> IdentifySender -where - T: AsyncWrite + Send + 'a, -{ +impl IdentifySender where T: AsyncWrite { /// Sends back information to the client. Returns a future that is signalled whenever the /// info have been sent. - pub fn send(self, info: IdentifyInfo, observed_addr: &Multiaddr) - -> Box + Send + 'a> - { + pub fn send(self, info: IdentifyInfo, observed_addr: &Multiaddr) -> IdentifySenderFuture { debug!("Sending identify info to client"); trace!("Sending: {:?}", info); @@ -83,8 +78,42 @@ where .write_to_bytes() .expect("writing protobuf failed; should never happen"); - let future = self.inner.send(bytes).map(|_| ()); - Box::new(future) as Box<_> + IdentifySenderFuture { + inner: self.inner, + item: Some(bytes), + } + } +} + +/// Future returned by `IdentifySender::send()`. Must be processed to the end in order to send +/// the information to the remote. +// Note: we don't use a `futures::sink::Sink` because it requires `T` to implement `Sink`, which +// means that we would require `T: AsyncWrite` in this struct definition. This requirement +// would then propagate everywhere. +#[must_use = "futures do nothing unless polled"] +pub struct IdentifySenderFuture { + /// The Sink where to send the data. + inner: Framed>>, + /// Bytes to send, or `None` if we've already sent them. + item: Option>, +} + +impl Future for IdentifySenderFuture +where T: AsyncWrite +{ + type Item = (); + type Error = IoError; + + fn poll(&mut self) -> Poll { + if let Some(item) = self.item.take() { + if let AsyncSink::NotReady(item) = self.inner.start_send(item)? { + self.item = Some(item); + return Ok(Async::NotReady); + } + } + + try_ready!(self.inner.poll_complete()); + Ok(Async::Ready(())) } } @@ -118,7 +147,7 @@ impl UpgradeInfo for IdentifyProtocolConfig { impl InboundUpgrade for IdentifyProtocolConfig where - C: AsyncRead + AsyncWrite + Send + 'static, + C: AsyncRead + AsyncWrite, { type Output = IdentifySender; type Error = IoError; @@ -134,41 +163,57 @@ where impl OutboundUpgrade for IdentifyProtocolConfig where - C: AsyncRead + AsyncWrite + Send + 'static, + C: AsyncRead + AsyncWrite, { type Output = RemoteInfo; type Error = IoError; - type Future = Box + Send>; + type Future = IdentifyOutboundFuture; fn upgrade_outbound(self, socket: C, _: ()) -> Self::Future { - let socket = Framed::new(socket, codec::UviBytes::::default()); - let future = socket - .into_future() - .map(|(msg, _)| msg) - .map_err(|(err, _)| err) - .and_then(|msg| { - debug!("Received identify message"); - if let Some(msg) = msg { - let (info, observed_addr) = match parse_proto_msg(msg) { - Ok(v) => v, - Err(err) => { - debug!("Failed to parse protobuf message; error = {:?}", err); - return Err(err.into()); - } - }; - trace!("Remote observes us as {:?}", observed_addr); - trace!("Information received: {:?}", info); - Ok(RemoteInfo { - info, - observed_addr: observed_addr.clone(), - _priv: () - }) - } else { - debug!("Identify protocol stream closed before receiving info"); - Err(IoErrorKind::InvalidData.into()) - } - }); - Box::new(future) as Box<_> + IdentifyOutboundFuture { + inner: Framed::new(socket, codec::UviBytes::::default()), + } + } +} + +/// Future returned by `OutboundUpgrade::upgrade_outbound`. +pub struct IdentifyOutboundFuture { + inner: Framed>, +} + +impl Future for IdentifyOutboundFuture +where T: AsyncRead +{ + type Item = RemoteInfo; + type Error = IoError; + + fn poll(&mut self) -> Poll { + let msg = match try_ready!(self.inner.poll()) { + Some(i) => i, + None => { + debug!("Identify protocol stream closed before receiving info"); + return Err(IoErrorKind::InvalidData.into()); + } + }; + + debug!("Received identify message"); + + let (info, observed_addr) = match parse_proto_msg(msg) { + Ok(v) => v, + Err(err) => { + debug!("Failed to parse protobuf message; error = {:?}", err); + return Err(err.into()); + } + }; + + trace!("Remote observes us as {:?}", observed_addr); + trace!("Information received: {:?}", info); + + Ok(Async::Ready(RemoteInfo { + info, + observed_addr: observed_addr.clone(), + _priv: () + })) } }