Simplify the floodsub handler (#868)

This commit is contained in:
Pierre Krieger 2019-01-22 14:45:03 +01:00 committed by GitHub
parent d59ec09a83
commit a2ab7ff4a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 327 additions and 320 deletions

View File

@ -47,12 +47,14 @@ pub use self::dummy::DummyProtocolsHandler;
pub use self::map_in::MapInEvent;
pub use self::map_out::MapOutEvent;
pub use self::node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder};
pub use self::one_shot::OneShotHandler;
pub use self::select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect};
mod dummy;
mod map_in;
mod map_out;
mod node_handler;
mod one_shot;
mod select;
/// Handler for a set of protocols for a specific connection with a remote.

View File

@ -0,0 +1,218 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr};
use crate::upgrade::{InboundUpgrade, OutboundUpgrade};
use futures::prelude::*;
use smallvec::SmallVec;
use std::{error, marker::PhantomData};
use tokio_io::{AsyncRead, AsyncWrite};
/// Implementation of `ProtocolsHandler` that opens a new substream for each individual message.
///
/// This struct is meant to be a helper for other implementations to use.
// TODO: Debug
pub struct OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where TOutProto: OutboundUpgrade<TSubstream>
{
/// The upgrade for inbound substreams.
listen_protocol: TInProto,
/// If true, we should return as soon as possible.
shutting_down: bool,
/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: Option<ProtocolsHandlerUpgrErr<<TOutProto as OutboundUpgrade<TSubstream>>::Error>>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[TOutEvent; 4]>,
/// Queue of outbound substreams to open.
dial_queue: SmallVec<[TOutProto; 4]>,
/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
max_dial_negotiated: u32,
/// Pin the `TSubstream` generic.
marker: PhantomData<TSubstream>,
}
impl<TSubstream, TInProto, TOutProto, TOutEvent>
OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where TOutProto: OutboundUpgrade<TSubstream>
{
/// Creates a `OneShotHandler`.
#[inline]
pub fn new(listen_protocol: TInProto) -> Self {
OneShotHandler {
listen_protocol,
shutting_down: false,
pending_error: None,
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
dial_negotiated: 0,
max_dial_negotiated: 8,
marker: PhantomData,
}
}
/// Returns the number of pending requests.
#[inline]
pub fn pending_requests(&self) -> u32 {
self.dial_negotiated + self.dial_queue.len() as u32
}
/// Returns a reference to the listen protocol configuration.
///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
/// > substreams, not the ones already being negotiated.
#[inline]
pub fn listen_protocol_ref(&self) -> &TInProto {
&self.listen_protocol
}
/// Returns a mutable reference to the listen protocol configuration.
///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound
/// > substreams, not the ones already being negotiated.
#[inline]
pub fn listen_protocol_mut(&mut self) -> &mut TInProto {
&mut self.listen_protocol
}
/// Opens an outbound substream with `upgrade`.
#[inline]
pub fn send_request(&mut self, upgrade: TOutProto) {
self.dial_queue.push(upgrade);
}
}
impl<TSubstream, TInProto, TOutProto, TOutEvent> Default for
OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where TOutProto: OutboundUpgrade<TSubstream>,
TInProto: Default
{
#[inline]
fn default() -> Self {
OneShotHandler::new(Default::default())
}
}
impl<TSubstream, TInProto, TOutProto, TOutEvent> ProtocolsHandler for
OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
where
TSubstream: AsyncRead + AsyncWrite,
TInProto: InboundUpgrade<TSubstream> + Clone,
TOutProto: OutboundUpgrade<TSubstream>,
TInProto::Output: Into<TOutEvent>,
TOutProto::Output: Into<TOutEvent>,
TOutProto::Error: error::Error + 'static,
{
type InEvent = TOutProto;
type OutEvent = TOutEvent;
type Error = ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>;
type Substream = TSubstream;
type InboundProtocol = TInProto;
type OutboundProtocol = TOutProto;
type OutboundOpenInfo = ();
#[inline]
fn listen_protocol(&self) -> Self::InboundProtocol {
self.listen_protocol.clone()
}
#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgrade<Self::Substream>>::Output
) {
if self.shutting_down {
return;
}
self.events_out.push(out.into());
}
#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Output,
_: Self::OutboundOpenInfo
) {
self.dial_negotiated -= 1;
if self.shutting_down {
return;
}
self.events_out.push(out.into());
}
#[inline]
fn inject_event(&mut self, event: Self::InEvent) {
self.send_request(event);
}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
if self.pending_error.is_none() {
self.pending_error = Some(error);
}
}
#[inline]
fn inject_inbound_closed(&mut self) {}
#[inline]
fn connection_keep_alive(&self) -> bool {
self.dial_negotiated != 0 || !self.dial_queue.is_empty()
}
#[inline]
fn shutdown(&mut self) {
self.shutting_down = true;
}
fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error> {
if let Some(err) = self.pending_error.take() {
return Err(err);
}
if !self.events_out.is_empty() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))));
} else {
self.events_out.shrink_to_fit();
}
if self.shutting_down && self.dial_negotiated == 0 {
return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown));
}
if !self.dial_queue.is_empty() {
if !self.shutting_down && self.dial_negotiated < self.max_dial_negotiated {
self.dial_negotiated += 1;
return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: self.dial_queue.remove(0),
info: (),
}));
}
} else {
self.dial_queue.shrink_to_fit();
}
Ok(Async::NotReady)
}
}

View File

@ -1,246 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::protocol::{FloodsubCodec, FloodsubConfig, FloodsubRpc};
use futures::prelude::*;
use libp2p_core::{
ProtocolsHandler, ProtocolsHandlerEvent,
protocols_handler::ProtocolsHandlerUpgrErr,
upgrade::{InboundUpgrade, OutboundUpgrade}
};
use smallvec::SmallVec;
use std::{fmt, io};
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
/// Protocol handler that handles communication with the remote for the floodsub protocol.
///
/// The handler will automatically open a substream with the remote for each request we make.
///
/// It also handles requests made by the remote.
pub struct FloodsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Configuration for the floodsub protocol.
config: FloodsubConfig,
/// If true, we are trying to shut down the existing floodsub substream and should refuse any
/// incoming connection.
shutting_down: bool,
/// The active substreams.
// TODO: add a limit to the number of allowed substreams
substreams: Vec<SubstreamState<TSubstream>>,
/// Queue of values that we want to send to the remote.
send_queue: SmallVec<[FloodsubRpc; 16]>,
}
/// State of an active substream, opened either by us or by the remote.
enum SubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Waiting for a message from the remote.
WaitingInput(Framed<TSubstream, FloodsubCodec>),
/// Waiting to send a message to the remote.
PendingSend(Framed<TSubstream, FloodsubCodec>, FloodsubRpc),
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(Framed<TSubstream, FloodsubCodec>),
/// The substream is being closed.
Closing(Framed<TSubstream, FloodsubCodec>),
}
impl<TSubstream> SubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Consumes this state and produces the substream.
fn into_substream(self) -> Framed<TSubstream, FloodsubCodec> {
match self {
SubstreamState::WaitingInput(substream) => substream,
SubstreamState::PendingSend(substream, _) => substream,
SubstreamState::PendingFlush(substream) => substream,
SubstreamState::Closing(substream) => substream,
}
}
}
impl<TSubstream> FloodsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Builds a new `FloodsubHandler`.
pub fn new() -> Self {
FloodsubHandler {
config: FloodsubConfig::new(),
shutting_down: false,
substreams: Vec::new(),
send_queue: SmallVec::new(),
}
}
}
impl<TSubstream> ProtocolsHandler for FloodsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type InEvent = FloodsubRpc;
type OutEvent = FloodsubRpc;
type Error = io::Error;
type Substream = TSubstream;
type InboundProtocol = FloodsubConfig;
type OutboundProtocol = FloodsubConfig;
type OutboundOpenInfo = FloodsubRpc;
#[inline]
fn listen_protocol(&self) -> Self::InboundProtocol {
self.config.clone()
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output
) {
if self.shutting_down {
return ()
}
self.substreams.push(SubstreamState::WaitingInput(protocol))
}
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
message: Self::OutboundOpenInfo
) {
if self.shutting_down {
return ()
}
self.substreams.push(SubstreamState::PendingSend(protocol, message))
}
#[inline]
fn inject_event(&mut self, message: FloodsubRpc) {
self.send_queue.push(message);
}
#[inline]
fn inject_inbound_closed(&mut self) {}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
#[inline]
fn connection_keep_alive(&self) -> bool {
!self.substreams.is_empty()
}
#[inline]
fn shutdown(&mut self) {
self.shutting_down = true;
for n in (0..self.substreams.len()).rev() {
let substream = self.substreams.swap_remove(n);
self.substreams.push(SubstreamState::Closing(substream.into_substream()));
}
}
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
io::Error,
> {
if !self.send_queue.is_empty() {
let message = self.send_queue.remove(0);
return Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
info: message,
upgrade: self.config.clone(),
},
));
}
for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n);
loop {
substream = match substream {
SubstreamState::WaitingInput(mut substream) => match substream.poll() {
Ok(Async::Ready(Some(message))) => {
self.substreams
.push(SubstreamState::WaitingInput(substream));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(message)));
}
Ok(Async::Ready(None)) => SubstreamState::Closing(substream),
Ok(Async::NotReady) => {
self.substreams
.push(SubstreamState::WaitingInput(substream));
return Ok(Async::NotReady);
}
Err(_) => SubstreamState::Closing(substream),
},
SubstreamState::PendingSend(mut substream, message) => {
match substream.start_send(message)? {
AsyncSink::Ready => SubstreamState::PendingFlush(substream),
AsyncSink::NotReady(message) => {
self.substreams
.push(SubstreamState::PendingSend(substream, message));
return Ok(Async::NotReady);
}
}
}
SubstreamState::PendingFlush(mut substream) => {
match substream.poll_complete()? {
Async::Ready(()) => SubstreamState::Closing(substream),
Async::NotReady => {
self.substreams
.push(SubstreamState::PendingFlush(substream));
return Ok(Async::NotReady);
}
}
}
SubstreamState::Closing(mut substream) => match substream.close() {
Ok(Async::Ready(())) => break,
Ok(Async::NotReady) => {
self.substreams.push(SubstreamState::Closing(substream));
return Ok(Async::NotReady);
}
Err(_) => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)),
},
}
}
}
Ok(Async::NotReady)
}
}
impl<TSubstream> fmt::Debug for FloodsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("FloodsubHandler")
.field("shutting_down", &self.shutting_down)
.field("substreams", &self.substreams.len())
.field("send_queue", &self.send_queue.len())
.finish()
}
}

View File

@ -18,13 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::handler::FloodsubHandler;
use crate::protocol::{FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::topic::{Topic, TopicHash};
use cuckoofilter::CuckooFilter;
use futures::prelude::*;
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId};
use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::OneShotHandler, PeerId};
use rand;
use smallvec::SmallVec;
use std::{collections::VecDeque, iter, marker::PhantomData};
@ -176,11 +175,11 @@ impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for Floodsub<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = FloodsubHandler<TSubstream>;
type ProtocolsHandler = OneShotHandler<TSubstream, FloodsubConfig, FloodsubRpc, InnerMessage>;
type OutEvent = FloodsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
FloodsubHandler::new()
Default::default()
}
fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
@ -209,8 +208,14 @@ where
fn inject_node_event(
&mut self,
propagation_source: PeerId,
event: FloodsubRpc,
event: InnerMessage,
) {
// We ignore successful sends event.
let event = match event {
InnerMessage::Rx(event) => event,
InnerMessage::Sent => return,
};
// Update connected peers topics
for subscription in event.subscriptions {
let remote_peer_topics = self.connected_peers
@ -300,6 +305,28 @@ where
}
}
/// Transmission between the `OneShotHandler` and the `FloodsubHandler`.
pub enum InnerMessage {
/// We received an RPC from a remote.
Rx(FloodsubRpc),
/// We successfully sent an RPC request.
Sent,
}
impl From<FloodsubRpc> for InnerMessage {
#[inline]
fn from(rpc: FloodsubRpc) -> InnerMessage {
InnerMessage::Rx(rpc)
}
}
impl From<()> for InnerMessage {
#[inline]
fn from(_: ()) -> InnerMessage {
InnerMessage::Sent
}
}
/// Event that can happen on the floodsub behaviour.
#[derive(Debug)]
pub enum FloodsubEvent {

View File

@ -34,7 +34,6 @@ extern crate tokio_codec;
extern crate tokio_io;
extern crate unsigned_varint;
pub mod handler;
pub mod protocol;
mod layer;

View File

@ -18,19 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::{BufMut, BytesMut};
use crate::rpc_proto;
use crate::topic::TopicHash;
use futures::future;
use bytes::BytesMut;
use futures::{future, stream, Future, Stream};
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId};
use protobuf::Message as ProtobufMessage;
use std::{io, iter};
use tokio_codec::{Decoder, Encoder, Framed};
use tokio_codec::{Decoder, FramedRead};
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec;
/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct FloodsubConfig {}
impl FloodsubConfig {
@ -53,29 +53,20 @@ impl UpgradeInfo for FloodsubConfig {
impl<TSocket> InboundUpgrade<TSocket> for FloodsubConfig
where
TSocket: AsyncRead + AsyncWrite,
TSocket: AsyncRead,
{
type Output = Framed<TSocket, FloodsubCodec>;
type Output = FloodsubRpc;
type Error = io::Error;
type Future = future::FutureResult<Self::Output, Self::Error>;
type Future = future::MapErr<future::AndThen<stream::StreamFuture<FramedRead<TSocket, FloodsubCodec>>, Result<FloodsubRpc, (io::Error, FramedRead<TSocket, FloodsubCodec>)>, fn((Option<FloodsubRpc>, FramedRead<TSocket, FloodsubCodec>)) -> Result<FloodsubRpc, (io::Error, FramedRead<TSocket, FloodsubCodec>)>>, fn((io::Error, FramedRead<TSocket, FloodsubCodec>)) -> io::Error>;
#[inline]
fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
future::ok(Framed::new(socket, FloodsubCodec { length_prefix: Default::default() }))
}
}
impl<TSocket> OutboundUpgrade<TSocket> for FloodsubConfig
where
TSocket: AsyncRead + AsyncWrite,
{
type Output = Framed<TSocket, FloodsubCodec>;
type Error = io::Error;
type Future = future::FutureResult<Self::Output, Self::Error>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
future::ok(Framed::new(socket, FloodsubCodec { length_prefix: Default::default() }))
FramedRead::new(socket, FloodsubCodec { length_prefix: Default::default() })
.into_future()
.and_then::<fn(_) -> _, _>(|(val, socket)| {
val.ok_or_else(move || (io::ErrorKind::UnexpectedEof.into(), socket))
})
.map_err(|(err, _)| err)
}
}
@ -85,50 +76,6 @@ pub struct FloodsubCodec {
length_prefix: codec::UviBytes,
}
impl Encoder for FloodsubCodec {
type Item = FloodsubRpc;
type Error = io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut proto = rpc_proto::RPC::new();
for message in item.messages.into_iter() {
let mut msg = rpc_proto::Message::new();
msg.set_from(message.source.into_bytes());
msg.set_data(message.data);
msg.set_seqno(message.sequence_number);
msg.set_topicIDs(
message
.topics
.into_iter()
.map(TopicHash::into_string)
.collect(),
);
proto.mut_publish().push(msg);
}
for topic in item.subscriptions.into_iter() {
let mut subscription = rpc_proto::RPC_SubOpts::new();
subscription.set_subscribe(topic.action == FloodsubSubscriptionAction::Subscribe);
subscription.set_topicid(topic.topic.into_string());
proto.mut_subscriptions().push(subscription);
}
let msg_size = proto.compute_size();
// Reserve enough space for the data and the length. The length has a maximum of 32 bits,
// which means that 5 bytes is enough for the variable-length integer.
dst.reserve(msg_size as usize + 5);
proto
.write_length_delimited_to_writer(&mut dst.by_ref().writer())
.expect(
"there is no situation in which the protobuf message can be invalid, and \
writing to a BytesMut never fails as we reserved enough space beforehand",
);
Ok(())
}
}
impl Decoder for FloodsubCodec {
type Item = FloodsubRpc;
type Error = io::Error;
@ -152,7 +99,7 @@ impl Decoder for FloodsubCodec {
topics: publish
.take_topicIDs()
.into_iter()
.map(|topic| TopicHash::from_raw(topic))
.map(TopicHash::from_raw)
.collect(),
});
}
@ -184,6 +131,66 @@ pub struct FloodsubRpc {
pub subscriptions: Vec<FloodsubSubscription>,
}
impl UpgradeInfo for FloodsubRpc {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/floodsub/1.0.0")
}
}
impl<TSocket> OutboundUpgrade<TSocket> for FloodsubRpc
where
TSocket: AsyncWrite,
{
type Output = ();
type Error = io::Error;
type Future = future::Map<future::AndThen<tokio_io::io::WriteAll<TSocket, Vec<u8>>, tokio_io::io::Shutdown<TSocket>, fn((TSocket, Vec<u8>)) -> tokio_io::io::Shutdown<TSocket>>, fn(TSocket) -> ()>;
#[inline]
fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future {
let bytes = self.into_length_delimited_bytes();
tokio_io::io::write_all(socket, bytes)
.and_then::<fn(_) -> _, _>(|(socket, _)| tokio_io::io::shutdown(socket))
.map(|_| ())
}
}
impl FloodsubRpc {
/// Turns this `FloodsubRpc` into a message that can be sent to a substream.
fn into_length_delimited_bytes(self) -> Vec<u8> {
let mut proto = rpc_proto::RPC::new();
for message in self.messages {
let mut msg = rpc_proto::Message::new();
msg.set_from(message.source.into_bytes());
msg.set_data(message.data);
msg.set_seqno(message.sequence_number);
msg.set_topicIDs(
message
.topics
.into_iter()
.map(TopicHash::into_string)
.collect(),
);
proto.mut_publish().push(msg);
}
for topic in self.subscriptions {
let mut subscription = rpc_proto::RPC_SubOpts::new();
subscription.set_subscribe(topic.action == FloodsubSubscriptionAction::Subscribe);
subscription.set_topicid(topic.topic.into_string());
proto.mut_subscriptions().push(subscription);
}
proto
.write_length_delimited_to_bytes()
.expect("there is no situation in which the protobuf message can be invalid")
}
}
/// A message received by the floodsub system.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FloodsubMessage {