diff --git a/core/src/protocols_handler/dummy.rs b/core/src/protocols_handler/dummy.rs index 113c31c9..f400fddc 100644 --- a/core/src/protocols_handler/dummy.rs +++ b/core/src/protocols_handler/dummy.rs @@ -27,7 +27,7 @@ use crate::{ } }; use futures::prelude::*; -use std::{io, marker::PhantomData}; +use std::marker::PhantomData; use tokio_io::{AsyncRead, AsyncWrite}; use void::Void; @@ -53,6 +53,7 @@ where { type InEvent = Void; type OutEvent = Void; + type Error = Void; type Substream = TSubstream; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade; @@ -97,7 +98,7 @@ where &mut self, ) -> Poll< Option>, - io::Error, + Void, > { if self.shutting_down { Ok(Async::Ready(None)) diff --git a/core/src/protocols_handler/map_in.rs b/core/src/protocols_handler/map_in.rs index 93b97810..3a4861a8 100644 --- a/core/src/protocols_handler/map_in.rs +++ b/core/src/protocols_handler/map_in.rs @@ -26,7 +26,7 @@ use crate::{ } }; use futures::prelude::*; -use std::{io, marker::PhantomData}; +use std::marker::PhantomData; /// Wrapper around a protocol handler that turns the input event into something else. pub struct MapInEvent { @@ -54,6 +54,7 @@ where { type InEvent = TNewIn; type OutEvent = TProtoHandler::OutEvent; + type Error = TProtoHandler::Error; type Substream = TProtoHandler::Substream; type InboundProtocol = TProtoHandler::InboundProtocol; type OutboundProtocol = TProtoHandler::OutboundProtocol; @@ -108,7 +109,7 @@ where &mut self, ) -> Poll< Option>, - io::Error, + Self::Error, > { self.inner.poll() } diff --git a/core/src/protocols_handler/map_out.rs b/core/src/protocols_handler/map_out.rs index e7c3e6d9..98103fc2 100644 --- a/core/src/protocols_handler/map_out.rs +++ b/core/src/protocols_handler/map_out.rs @@ -26,7 +26,6 @@ use crate::{ } }; use futures::prelude::*; -use std::io; /// Wrapper around a protocol handler that turns the output event into something else. pub struct MapOutEvent { @@ -52,6 +51,7 @@ where { type InEvent = TProtoHandler::InEvent; type OutEvent = TNewOut; + type Error = TProtoHandler::Error; type Substream = TProtoHandler::Substream; type InboundProtocol = TProtoHandler::InboundProtocol; type OutboundProtocol = TProtoHandler::OutboundProtocol; @@ -104,7 +104,7 @@ where &mut self, ) -> Poll< Option>, - io::Error, + Self::Error, > { Ok(self.inner.poll()?.map(|ev| { ev.map(|ev| match ev { diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index a8b7690c..2889141d 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -24,7 +24,7 @@ use crate::upgrade::{ UpgradeError, }; use futures::prelude::*; -use std::{error, fmt, io, time::Duration}; +use std::{error, fmt, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; pub use self::dummy::DummyProtocolsHandler; @@ -89,6 +89,8 @@ pub trait ProtocolsHandler { type InEvent; /// Custom event that can be produced by the handler and that will be returned to the outside. type OutEvent; + /// Error that can happen when polling. + type Error: error::Error; /// The type of the substream that contains the raw data. type Substream: AsyncRead + AsyncWrite; /// The upgrade for the protocol or protocols handled by this handler. @@ -144,7 +146,7 @@ pub trait ProtocolsHandler { /// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns /// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to /// > `shutdown()` and will eventually be closed and destroyed. - fn poll(&mut self) -> Poll>, io::Error>; + fn poll(&mut self) -> Poll>, Self::Error>; /// Adds a closure that turns the input event into something else. #[inline] diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index af38b841..8f709090 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -29,7 +29,7 @@ use crate::{ } }; use futures::prelude::*; -use std::{io, time::Duration}; +use std::time::Duration; use tokio_timer::Timeout; /// Prototype for a `NodeHandlerWrapper`. @@ -123,7 +123,7 @@ where { type InEvent = TProtoHandler::InEvent; type OutEvent = TProtoHandler::OutEvent; - type Error = io::Error; // TODO: better error type + type Error = TProtoHandler::Error; type Substream = TProtoHandler::Substream; // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). @@ -198,7 +198,7 @@ where self.handler.shutdown(); } - fn poll(&mut self) -> Poll>, io::Error> { + fn poll(&mut self) -> Poll>, Self::Error> { // Continue negotiation of newly-opened substreams on the listening side. // We remove each element from `negotiating_in` one by one and add them back if not ready. for n in (0..self.negotiating_in.len()).rev() { diff --git a/core/src/protocols_handler/select.rs b/core/src/protocols_handler/select.rs index 196ef411..86a27f8d 100644 --- a/core/src/protocols_handler/select.rs +++ b/core/src/protocols_handler/select.rs @@ -31,7 +31,6 @@ use crate::{ } }; use futures::prelude::*; -use std::io; use tokio_io::{AsyncRead, AsyncWrite}; /// Implementation of `ProtocolsHandler` that combines two protocols into one. @@ -65,6 +64,7 @@ where { type InEvent = EitherOutput; type OutEvent = EitherOutput; + type Error = EitherError; type Substream = TSubstream; type InboundProtocol = SelectUpgrade; type OutboundProtocol = EitherUpgrade; @@ -161,8 +161,8 @@ where self.proto2.shutdown(); } - fn poll(&mut self) -> Poll>, io::Error> { - match self.proto1.poll()? { + fn poll(&mut self) -> Poll>, Self::Error> { + match self.proto1.poll().map_err(EitherError::A)? { Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::First(event))))); }, @@ -176,7 +176,7 @@ where Async::NotReady => () }; - match self.proto2.poll()? { + match self.proto2.poll().map_err(EitherError::B)? { Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event))))); }, diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 02c99b7e..24b76dee 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -45,7 +45,7 @@ where TTransport: Transport, <>::ProtocolsHandler as ProtocolsHandler>::InEvent, <>::ProtocolsHandler as ProtocolsHandler>::OutEvent, NodeHandlerWrapper, - io::Error, + <>::ProtocolsHandler as ProtocolsHandler>::Error, >, /// Handles which nodes to connect to and how to handle the events sent back by the protocol @@ -97,6 +97,7 @@ where TBehaviour: NetworkBehaviour, TBehaviour::ProtocolsHandler: ProtocolsHandler> + Send + 'static, ::InEvent: Send + 'static, ::OutEvent: Send + 'static, + ::Error: Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary ::InboundProtocol: InboundUpgrade> + Send + 'static, <::InboundProtocol as UpgradeInfo>::Info: Send + 'static, @@ -213,6 +214,7 @@ where TBehaviour: NetworkBehaviour, TBehaviour::ProtocolsHandler: ProtocolsHandler> + Send + 'static, ::InEvent: Send + 'static, ::OutEvent: Send + 'static, + ::Error: Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary ::InboundProtocol: InboundUpgrade> + Send + 'static, <::InboundProtocol as InboundUpgrade>>::Future: Send + 'static, diff --git a/protocols/floodsub/src/handler.rs b/protocols/floodsub/src/handler.rs index 0b663a39..c084b8cf 100644 --- a/protocols/floodsub/src/handler.rs +++ b/protocols/floodsub/src/handler.rs @@ -105,6 +105,7 @@ where { type InEvent = FloodsubRpc; type OutEvent = FloodsubRpc; + type Error = io::Error; type Substream = TSubstream; type InboundProtocol = FloodsubConfig; type OutboundProtocol = FloodsubConfig; diff --git a/protocols/identify/src/listen_handler.rs b/protocols/identify/src/listen_handler.rs index a7f9982a..650b4780 100644 --- a/protocols/identify/src/listen_handler.rs +++ b/protocols/identify/src/listen_handler.rs @@ -25,7 +25,6 @@ use libp2p_core::{ upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade} }; use smallvec::SmallVec; -use std::io; use tokio_io::{AsyncRead, AsyncWrite}; use void::{Void, unreachable}; @@ -59,6 +58,7 @@ where { type InEvent = Void; type OutEvent = IdentifySender; + type Error = Void; type Substream = TSubstream; type InboundProtocol = IdentifyProtocolConfig; type OutboundProtocol = DeniedUpgrade; @@ -104,7 +104,7 @@ where Self::OutEvent, >, >, - io::Error, + Self::Error, > { if !self.pending_result.is_empty() { return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs index b76036af..ef0dd500 100644 --- a/protocols/identify/src/periodic_id_handler.rs +++ b/protocols/identify/src/periodic_id_handler.rs @@ -26,7 +26,7 @@ use libp2p_core::{ }; use std::{io, marker::PhantomData, time::{Duration, Instant}}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::Delay; +use tokio_timer::{self, Delay}; use void::{Void, unreachable}; /// Delay between the moment we connect and the first time we identify. @@ -81,6 +81,7 @@ where { type InEvent = Void; type OutEvent = PeriodicIdHandlerEvent; + type Error = tokio_timer::Error; type Substream = TSubstream; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = IdentifyProtocolConfig; @@ -132,7 +133,7 @@ where PeriodicIdHandlerEvent, >, >, - io::Error, + Self::Error, > { if let Some(pending_result) = self.pending_result.take() { return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( @@ -146,15 +147,14 @@ where }; // Poll the future that fires when we need to identify the node again. - match next_id.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(())) => { + match next_id.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(()) => { next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); let upgrade = self.config.clone(); let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; Ok(Async::Ready(Some(ev))) } - Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), } } } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 8c1fadad..20c78a83 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -343,6 +343,7 @@ where { type InEvent = KademliaHandlerIn; type OutEvent = KademliaHandlerEvent; + type Error = io::Error; // TODO: better error type? type Substream = TSubstream; type InboundProtocol = upgrade::EitherUpgrade; type OutboundProtocol = KademliaProtocolConfig; diff --git a/protocols/ping/src/dial_handler.rs b/protocols/ping/src/dial_handler.rs index 5acfa1a5..abd7bfbe 100644 --- a/protocols/ping/src/dial_handler.rs +++ b/protocols/ping/src/dial_handler.rs @@ -33,7 +33,7 @@ use std::{ time::{Duration, Instant}, }; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::Delay; +use tokio_timer::{self, Delay}; use void::{Void, unreachable}; /// Protocol handler that handles pinging the remote at a regular period. @@ -153,6 +153,7 @@ where { type InEvent = Void; type OutEvent = OutEvent; + type Error = io::Error; // TODO: more precise error type type Substream = TSubstream; type InboundProtocol = DeniedUpgrade; type OutboundProtocol = Ping; diff --git a/protocols/ping/src/listen_handler.rs b/protocols/ping/src/listen_handler.rs index 10a966ff..43c318a1 100644 --- a/protocols/ping/src/listen_handler.rs +++ b/protocols/ping/src/listen_handler.rs @@ -30,7 +30,6 @@ use libp2p_core::{ upgrade::DeniedUpgrade }; use log::warn; -use std::io; use tokio_io::{AsyncRead, AsyncWrite}; use void::{Void, unreachable}; @@ -72,6 +71,7 @@ where { type InEvent = Void; type OutEvent = Void; + type Error = Void; type Substream = TSubstream; type InboundProtocol = Ping<()>; type OutboundProtocol = DeniedUpgrade; @@ -118,7 +118,7 @@ where &mut self, ) -> Poll< Option>, - io::Error, + Self::Error, > { // Removes each substream one by one, and pushes them back if they're not ready (which // should be the case 99% of the time).