Fix floodsub with new futures (#1249)

This commit is contained in:
Pierre Krieger
2019-10-07 11:32:47 +02:00
committed by GitHub
parent 73aa27827f
commit 6667fb8016
2 changed files with 15 additions and 12 deletions

View File

@ -230,7 +230,7 @@ impl<TSubstream> Floodsub<TSubstream> {
impl<TSubstream> NetworkBehaviour for Floodsub<TSubstream> impl<TSubstream> NetworkBehaviour for Floodsub<TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite + Unpin, TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type ProtocolsHandler = OneShotHandler<TSubstream, FloodsubConfig, FloodsubRpc, InnerMessage>; type ProtocolsHandler = OneShotHandler<TSubstream, FloodsubConfig, FloodsubRpc, InnerMessage>;
type OutEvent = FloodsubEvent; type OutEvent = FloodsubEvent;

View File

@ -23,7 +23,7 @@ use crate::topic::TopicHash;
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade};
use protobuf::{ProtobufError, Message as ProtobufMessage}; use protobuf::{ProtobufError, Message as ProtobufMessage};
use std::{error, fmt, io, iter}; use std::{error, fmt, io, iter, pin::Pin};
/// Implementation of `ConnectionUpgrade` for the floodsub protocol. /// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
@ -49,15 +49,15 @@ impl UpgradeInfo for FloodsubConfig {
impl<TSocket> InboundUpgrade<TSocket> for FloodsubConfig impl<TSocket> InboundUpgrade<TSocket> for FloodsubConfig
where where
TSocket: AsyncRead + AsyncWrite + Unpin, TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
type Output = FloodsubRpc; type Output = FloodsubRpc;
type Error = FloodsubDecodeError; type Error = FloodsubDecodeError;
type Future = upgrade::ReadOneThen<upgrade::Negotiated<TSocket>, (), fn(Vec<u8>, ()) -> Result<FloodsubRpc, FloodsubDecodeError>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
#[inline] fn upgrade_inbound(self, mut socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future {
fn upgrade_inbound(self, socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future { Box::pin(async move {
upgrade::read_one_then(socket, 2048, (), |packet, ()| { let packet = upgrade::read_one(&mut socket, 2048).await?;
let mut rpc: rpc_proto::RPC = protobuf::parse_from_bytes(&packet)?; let mut rpc: rpc_proto::RPC = protobuf::parse_from_bytes(&packet)?;
let mut messages = Vec::with_capacity(rpc.get_publish().len()); let mut messages = Vec::with_capacity(rpc.get_publish().len());
@ -164,16 +164,19 @@ impl UpgradeInfo for FloodsubRpc {
impl<TSocket> OutboundUpgrade<TSocket> for FloodsubRpc impl<TSocket> OutboundUpgrade<TSocket> for FloodsubRpc
where where
TSocket: AsyncWrite + AsyncRead + Unpin, TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static,
{ {
type Output = (); type Output = ();
type Error = io::Error; type Error = io::Error;
type Future = upgrade::WriteOne<upgrade::Negotiated<TSocket>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
#[inline] #[inline]
fn upgrade_outbound(self, socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future { fn upgrade_outbound(self, mut socket: upgrade::Negotiated<TSocket>, _: Self::Info) -> Self::Future {
let bytes = self.into_bytes(); Box::pin(async move {
upgrade::write_one(socket, bytes) let bytes = self.into_bytes();
upgrade::write_one(&mut socket, bytes).await?;
Ok(())
})
} }
} }