From a2ab7ff4a9883f579f3cff05a01429d55c41128c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 22 Jan 2019 14:45:03 +0100 Subject: [PATCH] Simplify the floodsub handler (#868) --- core/src/protocols_handler/mod.rs | 2 + core/src/protocols_handler/one_shot.rs | 218 ++++++++++++++++++++++ protocols/floodsub/src/handler.rs | 246 ------------------------- protocols/floodsub/src/layer.rs | 39 +++- protocols/floodsub/src/lib.rs | 1 - protocols/floodsub/src/protocol.rs | 141 +++++++------- 6 files changed, 327 insertions(+), 320 deletions(-) create mode 100644 core/src/protocols_handler/one_shot.rs delete mode 100644 protocols/floodsub/src/handler.rs diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index 956e8a0a..d809a3bc 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -47,12 +47,14 @@ pub use self::dummy::DummyProtocolsHandler; pub use self::map_in::MapInEvent; pub use self::map_out::MapOutEvent; pub use self::node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder}; +pub use self::one_shot::OneShotHandler; pub use self::select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; mod dummy; mod map_in; mod map_out; mod node_handler; +mod one_shot; mod select; /// Handler for a set of protocols for a specific connection with a remote. diff --git a/core/src/protocols_handler/one_shot.rs b/core/src/protocols_handler/one_shot.rs new file mode 100644 index 00000000..f161a7b5 --- /dev/null +++ b/core/src/protocols_handler/one_shot.rs @@ -0,0 +1,218 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; +use crate::upgrade::{InboundUpgrade, OutboundUpgrade}; +use futures::prelude::*; +use smallvec::SmallVec; +use std::{error, marker::PhantomData}; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Implementation of `ProtocolsHandler` that opens a new substream for each individual message. +/// +/// This struct is meant to be a helper for other implementations to use. +// TODO: Debug +pub struct OneShotHandler +where TOutProto: OutboundUpgrade +{ + /// The upgrade for inbound substreams. + listen_protocol: TInProto, + /// If true, we should return as soon as possible. + shutting_down: bool, + /// If `Some`, something bad happened and we should shut down the handler with an error. + pending_error: Option>::Error>>, + /// Queue of events to produce in `poll()`. + events_out: SmallVec<[TOutEvent; 4]>, + /// Queue of outbound substreams to open. + dial_queue: SmallVec<[TOutProto; 4]>, + /// Current number of concurrent outbound substreams being opened. + dial_negotiated: u32, + /// Maximum number of concurrent outbound substreams being opened. Value is never modified. + max_dial_negotiated: u32, + /// Pin the `TSubstream` generic. + marker: PhantomData, +} + +impl + OneShotHandler +where TOutProto: OutboundUpgrade +{ + /// Creates a `OneShotHandler`. + #[inline] + pub fn new(listen_protocol: TInProto) -> Self { + OneShotHandler { + listen_protocol, + shutting_down: false, + pending_error: None, + events_out: SmallVec::new(), + dial_queue: SmallVec::new(), + dial_negotiated: 0, + max_dial_negotiated: 8, + marker: PhantomData, + } + } + + /// Returns the number of pending requests. + #[inline] + pub fn pending_requests(&self) -> u32 { + self.dial_negotiated + self.dial_queue.len() as u32 + } + + /// Returns a reference to the listen protocol configuration. + /// + /// > **Note**: If you modify the protocol, modifications will only applies to future inbound + /// > substreams, not the ones already being negotiated. + #[inline] + pub fn listen_protocol_ref(&self) -> &TInProto { + &self.listen_protocol + } + + /// Returns a mutable reference to the listen protocol configuration. + /// + /// > **Note**: If you modify the protocol, modifications will only applies to future inbound + /// > substreams, not the ones already being negotiated. + #[inline] + pub fn listen_protocol_mut(&mut self) -> &mut TInProto { + &mut self.listen_protocol + } + + /// Opens an outbound substream with `upgrade`. + #[inline] + pub fn send_request(&mut self, upgrade: TOutProto) { + self.dial_queue.push(upgrade); + } +} + +impl Default for + OneShotHandler +where TOutProto: OutboundUpgrade, + TInProto: Default +{ + #[inline] + fn default() -> Self { + OneShotHandler::new(Default::default()) + } +} + +impl ProtocolsHandler for + OneShotHandler +where + TSubstream: AsyncRead + AsyncWrite, + TInProto: InboundUpgrade + Clone, + TOutProto: OutboundUpgrade, + TInProto::Output: Into, + TOutProto::Output: Into, + TOutProto::Error: error::Error + 'static, +{ + type InEvent = TOutProto; + type OutEvent = TOutEvent; + type Error = ProtocolsHandlerUpgrErr<>::Error>; + type Substream = TSubstream; + type InboundProtocol = TInProto; + type OutboundProtocol = TOutProto; + type OutboundOpenInfo = (); + + #[inline] + fn listen_protocol(&self) -> Self::InboundProtocol { + self.listen_protocol.clone() + } + + #[inline] + fn inject_fully_negotiated_inbound( + &mut self, + out: >::Output + ) { + if self.shutting_down { + return; + } + + self.events_out.push(out.into()); + } + + #[inline] + fn inject_fully_negotiated_outbound( + &mut self, + out: >::Output, + _: Self::OutboundOpenInfo + ) { + self.dial_negotiated -= 1; + + if self.shutting_down { + return; + } + + self.events_out.push(out.into()); + } + + #[inline] + fn inject_event(&mut self, event: Self::InEvent) { + self.send_request(event); + } + + #[inline] + fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { + if self.pending_error.is_none() { + self.pending_error = Some(error); + } + } + + #[inline] + fn inject_inbound_closed(&mut self) {} + + #[inline] + fn connection_keep_alive(&self) -> bool { + self.dial_negotiated != 0 || !self.dial_queue.is_empty() + } + + #[inline] + fn shutdown(&mut self) { + self.shutting_down = true; + } + + fn poll(&mut self) -> Poll, Self::Error> { + if let Some(err) = self.pending_error.take() { + return Err(err); + } + + if !self.events_out.is_empty() { + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)))); + } else { + self.events_out.shrink_to_fit(); + } + + if self.shutting_down && self.dial_negotiated == 0 { + return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)); + } + + if !self.dial_queue.is_empty() { + if !self.shutting_down && self.dial_negotiated < self.max_dial_negotiated { + self.dial_negotiated += 1; + return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade: self.dial_queue.remove(0), + info: (), + })); + } + } else { + self.dial_queue.shrink_to_fit(); + } + + Ok(Async::NotReady) + } +} diff --git a/protocols/floodsub/src/handler.rs b/protocols/floodsub/src/handler.rs deleted file mode 100644 index 8112e928..00000000 --- a/protocols/floodsub/src/handler.rs +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::protocol::{FloodsubCodec, FloodsubConfig, FloodsubRpc}; -use futures::prelude::*; -use libp2p_core::{ - ProtocolsHandler, ProtocolsHandlerEvent, - protocols_handler::ProtocolsHandlerUpgrErr, - upgrade::{InboundUpgrade, OutboundUpgrade} -}; -use smallvec::SmallVec; -use std::{fmt, io}; -use tokio_codec::Framed; -use tokio_io::{AsyncRead, AsyncWrite}; - -/// Protocol handler that handles communication with the remote for the floodsub protocol. -/// -/// The handler will automatically open a substream with the remote for each request we make. -/// -/// It also handles requests made by the remote. -pub struct FloodsubHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// Configuration for the floodsub protocol. - config: FloodsubConfig, - - /// If true, we are trying to shut down the existing floodsub substream and should refuse any - /// incoming connection. - shutting_down: bool, - - /// The active substreams. - // TODO: add a limit to the number of allowed substreams - substreams: Vec>, - - /// Queue of values that we want to send to the remote. - send_queue: SmallVec<[FloodsubRpc; 16]>, -} - -/// State of an active substream, opened either by us or by the remote. -enum SubstreamState -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// Waiting for a message from the remote. - WaitingInput(Framed), - /// Waiting to send a message to the remote. - PendingSend(Framed, FloodsubRpc), - /// Waiting to flush the substream so that the data arrives to the remote. - PendingFlush(Framed), - /// The substream is being closed. - Closing(Framed), -} - -impl SubstreamState -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// Consumes this state and produces the substream. - fn into_substream(self) -> Framed { - match self { - SubstreamState::WaitingInput(substream) => substream, - SubstreamState::PendingSend(substream, _) => substream, - SubstreamState::PendingFlush(substream) => substream, - SubstreamState::Closing(substream) => substream, - } - } -} - -impl FloodsubHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// Builds a new `FloodsubHandler`. - pub fn new() -> Self { - FloodsubHandler { - config: FloodsubConfig::new(), - shutting_down: false, - substreams: Vec::new(), - send_queue: SmallVec::new(), - } - } -} - -impl ProtocolsHandler for FloodsubHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - type InEvent = FloodsubRpc; - type OutEvent = FloodsubRpc; - type Error = io::Error; - type Substream = TSubstream; - type InboundProtocol = FloodsubConfig; - type OutboundProtocol = FloodsubConfig; - type OutboundOpenInfo = FloodsubRpc; - - #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { - self.config.clone() - } - - fn inject_fully_negotiated_inbound( - &mut self, - protocol: >::Output - ) { - if self.shutting_down { - return () - } - self.substreams.push(SubstreamState::WaitingInput(protocol)) - } - - fn inject_fully_negotiated_outbound( - &mut self, - protocol: >::Output, - message: Self::OutboundOpenInfo - ) { - if self.shutting_down { - return () - } - self.substreams.push(SubstreamState::PendingSend(protocol, message)) - } - - #[inline] - fn inject_event(&mut self, message: FloodsubRpc) { - self.send_queue.push(message); - } - - #[inline] - fn inject_inbound_closed(&mut self) {} - - #[inline] - fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} - - #[inline] - fn connection_keep_alive(&self) -> bool { - !self.substreams.is_empty() - } - - #[inline] - fn shutdown(&mut self) { - self.shutting_down = true; - for n in (0..self.substreams.len()).rev() { - let substream = self.substreams.swap_remove(n); - self.substreams.push(SubstreamState::Closing(substream.into_substream())); - } - } - - fn poll( - &mut self, - ) -> Poll< - ProtocolsHandlerEvent, - io::Error, - > { - if !self.send_queue.is_empty() { - let message = self.send_queue.remove(0); - return Ok(Async::Ready( - ProtocolsHandlerEvent::OutboundSubstreamRequest { - info: message, - upgrade: self.config.clone(), - }, - )); - } - - for n in (0..self.substreams.len()).rev() { - let mut substream = self.substreams.swap_remove(n); - loop { - substream = match substream { - SubstreamState::WaitingInput(mut substream) => match substream.poll() { - Ok(Async::Ready(Some(message))) => { - self.substreams - .push(SubstreamState::WaitingInput(substream)); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(message))); - } - Ok(Async::Ready(None)) => SubstreamState::Closing(substream), - Ok(Async::NotReady) => { - self.substreams - .push(SubstreamState::WaitingInput(substream)); - return Ok(Async::NotReady); - } - Err(_) => SubstreamState::Closing(substream), - }, - SubstreamState::PendingSend(mut substream, message) => { - match substream.start_send(message)? { - AsyncSink::Ready => SubstreamState::PendingFlush(substream), - AsyncSink::NotReady(message) => { - self.substreams - .push(SubstreamState::PendingSend(substream, message)); - return Ok(Async::NotReady); - } - } - } - SubstreamState::PendingFlush(mut substream) => { - match substream.poll_complete()? { - Async::Ready(()) => SubstreamState::Closing(substream), - Async::NotReady => { - self.substreams - .push(SubstreamState::PendingFlush(substream)); - return Ok(Async::NotReady); - } - } - } - SubstreamState::Closing(mut substream) => match substream.close() { - Ok(Async::Ready(())) => break, - Ok(Async::NotReady) => { - self.substreams.push(SubstreamState::Closing(substream)); - return Ok(Async::NotReady); - } - Err(_) => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)), - }, - } - } - } - - Ok(Async::NotReady) - } -} - -impl fmt::Debug for FloodsubHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - f.debug_struct("FloodsubHandler") - .field("shutting_down", &self.shutting_down) - .field("substreams", &self.substreams.len()) - .field("send_queue", &self.send_queue.len()) - .finish() - } -} diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 2e362ec3..c1ac0a18 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -18,13 +18,12 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::handler::FloodsubHandler; -use crate::protocol::{FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; +use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; use crate::topic::{Topic, TopicHash}; use cuckoofilter::CuckooFilter; use futures::prelude::*; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId}; +use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::OneShotHandler, PeerId}; use rand; use smallvec::SmallVec; use std::{collections::VecDeque, iter, marker::PhantomData}; @@ -176,11 +175,11 @@ impl NetworkBehaviour for Floodsub where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = FloodsubHandler; + type ProtocolsHandler = OneShotHandler; type OutEvent = FloodsubEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - FloodsubHandler::new() + Default::default() } fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { @@ -209,8 +208,14 @@ where fn inject_node_event( &mut self, propagation_source: PeerId, - event: FloodsubRpc, + event: InnerMessage, ) { + // We ignore successful sends event. + let event = match event { + InnerMessage::Rx(event) => event, + InnerMessage::Sent => return, + }; + // Update connected peers topics for subscription in event.subscriptions { let remote_peer_topics = self.connected_peers @@ -300,6 +305,28 @@ where } } +/// Transmission between the `OneShotHandler` and the `FloodsubHandler`. +pub enum InnerMessage { + /// We received an RPC from a remote. + Rx(FloodsubRpc), + /// We successfully sent an RPC request. + Sent, +} + +impl From for InnerMessage { + #[inline] + fn from(rpc: FloodsubRpc) -> InnerMessage { + InnerMessage::Rx(rpc) + } +} + +impl From<()> for InnerMessage { + #[inline] + fn from(_: ()) -> InnerMessage { + InnerMessage::Sent + } +} + /// Event that can happen on the floodsub behaviour. #[derive(Debug)] pub enum FloodsubEvent { diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index 07325ad2..3ac1c033 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -34,7 +34,6 @@ extern crate tokio_codec; extern crate tokio_io; extern crate unsigned_varint; -pub mod handler; pub mod protocol; mod layer; diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 30c6592e..f9282931 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -18,19 +18,19 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bytes::{BufMut, BytesMut}; use crate::rpc_proto; use crate::topic::TopicHash; -use futures::future; +use bytes::BytesMut; +use futures::{future, stream, Future, Stream}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId}; use protobuf::Message as ProtobufMessage; use std::{io, iter}; -use tokio_codec::{Decoder, Encoder, Framed}; +use tokio_codec::{Decoder, FramedRead}; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec; /// Implementation of `ConnectionUpgrade` for the floodsub protocol. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct FloodsubConfig {} impl FloodsubConfig { @@ -53,29 +53,20 @@ impl UpgradeInfo for FloodsubConfig { impl InboundUpgrade for FloodsubConfig where - TSocket: AsyncRead + AsyncWrite, + TSocket: AsyncRead, { - type Output = Framed; + type Output = FloodsubRpc; type Error = io::Error; - type Future = future::FutureResult; + type Future = future::MapErr>, Result)>, fn((Option, FramedRead)) -> Result)>>, fn((io::Error, FramedRead)) -> io::Error>; #[inline] fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { - future::ok(Framed::new(socket, FloodsubCodec { length_prefix: Default::default() })) - } -} - -impl OutboundUpgrade for FloodsubConfig -where - TSocket: AsyncRead + AsyncWrite, -{ - type Output = Framed; - type Error = io::Error; - type Future = future::FutureResult; - - #[inline] - fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { - future::ok(Framed::new(socket, FloodsubCodec { length_prefix: Default::default() })) + FramedRead::new(socket, FloodsubCodec { length_prefix: Default::default() }) + .into_future() + .and_then:: _, _>(|(val, socket)| { + val.ok_or_else(move || (io::ErrorKind::UnexpectedEof.into(), socket)) + }) + .map_err(|(err, _)| err) } } @@ -85,50 +76,6 @@ pub struct FloodsubCodec { length_prefix: codec::UviBytes, } -impl Encoder for FloodsubCodec { - type Item = FloodsubRpc; - type Error = io::Error; - - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - let mut proto = rpc_proto::RPC::new(); - - for message in item.messages.into_iter() { - let mut msg = rpc_proto::Message::new(); - msg.set_from(message.source.into_bytes()); - msg.set_data(message.data); - msg.set_seqno(message.sequence_number); - msg.set_topicIDs( - message - .topics - .into_iter() - .map(TopicHash::into_string) - .collect(), - ); - proto.mut_publish().push(msg); - } - - for topic in item.subscriptions.into_iter() { - let mut subscription = rpc_proto::RPC_SubOpts::new(); - subscription.set_subscribe(topic.action == FloodsubSubscriptionAction::Subscribe); - subscription.set_topicid(topic.topic.into_string()); - proto.mut_subscriptions().push(subscription); - } - - let msg_size = proto.compute_size(); - // Reserve enough space for the data and the length. The length has a maximum of 32 bits, - // which means that 5 bytes is enough for the variable-length integer. - dst.reserve(msg_size as usize + 5); - - proto - .write_length_delimited_to_writer(&mut dst.by_ref().writer()) - .expect( - "there is no situation in which the protobuf message can be invalid, and \ - writing to a BytesMut never fails as we reserved enough space beforehand", - ); - Ok(()) - } -} - impl Decoder for FloodsubCodec { type Item = FloodsubRpc; type Error = io::Error; @@ -152,7 +99,7 @@ impl Decoder for FloodsubCodec { topics: publish .take_topicIDs() .into_iter() - .map(|topic| TopicHash::from_raw(topic)) + .map(TopicHash::from_raw) .collect(), }); } @@ -184,6 +131,66 @@ pub struct FloodsubRpc { pub subscriptions: Vec, } +impl UpgradeInfo for FloodsubRpc { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + #[inline] + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/floodsub/1.0.0") + } +} + +impl OutboundUpgrade for FloodsubRpc +where + TSocket: AsyncWrite, +{ + type Output = (); + type Error = io::Error; + type Future = future::Map>, tokio_io::io::Shutdown, fn((TSocket, Vec)) -> tokio_io::io::Shutdown>, fn(TSocket) -> ()>; + + #[inline] + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { + let bytes = self.into_length_delimited_bytes(); + tokio_io::io::write_all(socket, bytes) + .and_then:: _, _>(|(socket, _)| tokio_io::io::shutdown(socket)) + .map(|_| ()) + } +} + +impl FloodsubRpc { + /// Turns this `FloodsubRpc` into a message that can be sent to a substream. + fn into_length_delimited_bytes(self) -> Vec { + let mut proto = rpc_proto::RPC::new(); + + for message in self.messages { + let mut msg = rpc_proto::Message::new(); + msg.set_from(message.source.into_bytes()); + msg.set_data(message.data); + msg.set_seqno(message.sequence_number); + msg.set_topicIDs( + message + .topics + .into_iter() + .map(TopicHash::into_string) + .collect(), + ); + proto.mut_publish().push(msg); + } + + for topic in self.subscriptions { + let mut subscription = rpc_proto::RPC_SubOpts::new(); + subscription.set_subscribe(topic.action == FloodsubSubscriptionAction::Subscribe); + subscription.set_topicid(topic.topic.into_string()); + proto.mut_subscriptions().push(subscription); + } + + proto + .write_length_delimited_to_bytes() + .expect("there is no situation in which the protobuf message can be invalid") + } +} + /// A message received by the floodsub system. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct FloodsubMessage {