Add an error associated type to ProtocolsHandler (#795)

This commit is contained in:
Pierre Krieger
2018-12-28 15:11:35 +01:00
committed by GitHub
parent d10cafa804
commit 7798e23e78
13 changed files with 36 additions and 27 deletions

View File

@ -27,7 +27,7 @@ use crate::{
} }
}; };
use futures::prelude::*; use futures::prelude::*;
use std::{io, marker::PhantomData}; use std::marker::PhantomData;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use void::Void; use void::Void;
@ -53,6 +53,7 @@ where
{ {
type InEvent = Void; type InEvent = Void;
type OutEvent = Void; type OutEvent = Void;
type Error = Void;
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = DeniedUpgrade; type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade;
@ -97,7 +98,7 @@ where
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>,
io::Error, Void,
> { > {
if self.shutting_down { if self.shutting_down {
Ok(Async::Ready(None)) Ok(Async::Ready(None))

View File

@ -26,7 +26,7 @@ use crate::{
} }
}; };
use futures::prelude::*; use futures::prelude::*;
use std::{io, marker::PhantomData}; use std::marker::PhantomData;
/// Wrapper around a protocol handler that turns the input event into something else. /// Wrapper around a protocol handler that turns the input event into something else.
pub struct MapInEvent<TProtoHandler, TNewIn, TMap> { pub struct MapInEvent<TProtoHandler, TNewIn, TMap> {
@ -54,6 +54,7 @@ where
{ {
type InEvent = TNewIn; type InEvent = TNewIn;
type OutEvent = TProtoHandler::OutEvent; type OutEvent = TProtoHandler::OutEvent;
type Error = TProtoHandler::Error;
type Substream = TProtoHandler::Substream; type Substream = TProtoHandler::Substream;
type InboundProtocol = TProtoHandler::InboundProtocol; type InboundProtocol = TProtoHandler::InboundProtocol;
type OutboundProtocol = TProtoHandler::OutboundProtocol; type OutboundProtocol = TProtoHandler::OutboundProtocol;
@ -108,7 +109,7 @@ where
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>,
io::Error, Self::Error,
> { > {
self.inner.poll() self.inner.poll()
} }

View File

@ -26,7 +26,6 @@ use crate::{
} }
}; };
use futures::prelude::*; use futures::prelude::*;
use std::io;
/// Wrapper around a protocol handler that turns the output event into something else. /// Wrapper around a protocol handler that turns the output event into something else.
pub struct MapOutEvent<TProtoHandler, TMap> { pub struct MapOutEvent<TProtoHandler, TMap> {
@ -52,6 +51,7 @@ where
{ {
type InEvent = TProtoHandler::InEvent; type InEvent = TProtoHandler::InEvent;
type OutEvent = TNewOut; type OutEvent = TNewOut;
type Error = TProtoHandler::Error;
type Substream = TProtoHandler::Substream; type Substream = TProtoHandler::Substream;
type InboundProtocol = TProtoHandler::InboundProtocol; type InboundProtocol = TProtoHandler::InboundProtocol;
type OutboundProtocol = TProtoHandler::OutboundProtocol; type OutboundProtocol = TProtoHandler::OutboundProtocol;
@ -104,7 +104,7 @@ where
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>,
io::Error, Self::Error,
> { > {
Ok(self.inner.poll()?.map(|ev| { Ok(self.inner.poll()?.map(|ev| {
ev.map(|ev| match ev { ev.map(|ev| match ev {

View File

@ -24,7 +24,7 @@ use crate::upgrade::{
UpgradeError, UpgradeError,
}; };
use futures::prelude::*; use futures::prelude::*;
use std::{error, fmt, io, time::Duration}; use std::{error, fmt, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
pub use self::dummy::DummyProtocolsHandler; pub use self::dummy::DummyProtocolsHandler;
@ -89,6 +89,8 @@ pub trait ProtocolsHandler {
type InEvent; type InEvent;
/// Custom event that can be produced by the handler and that will be returned to the outside. /// Custom event that can be produced by the handler and that will be returned to the outside.
type OutEvent; type OutEvent;
/// Error that can happen when polling.
type Error: error::Error;
/// The type of the substream that contains the raw data. /// The type of the substream that contains the raw data.
type Substream: AsyncRead + AsyncWrite; type Substream: AsyncRead + AsyncWrite;
/// The upgrade for the protocol or protocols handled by this handler. /// The upgrade for the protocol or protocols handled by this handler.
@ -144,7 +146,7 @@ pub trait ProtocolsHandler {
/// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns /// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns
/// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to /// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to
/// > `shutdown()` and will eventually be closed and destroyed. /// > `shutdown()` and will eventually be closed and destroyed.
fn poll(&mut self) -> Poll<Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, io::Error>; fn poll(&mut self) -> Poll<Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, Self::Error>;
/// Adds a closure that turns the input event into something else. /// Adds a closure that turns the input event into something else.
#[inline] #[inline]

View File

@ -29,7 +29,7 @@ use crate::{
} }
}; };
use futures::prelude::*; use futures::prelude::*;
use std::{io, time::Duration}; use std::time::Duration;
use tokio_timer::Timeout; use tokio_timer::Timeout;
/// Prototype for a `NodeHandlerWrapper`. /// Prototype for a `NodeHandlerWrapper`.
@ -123,7 +123,7 @@ where
{ {
type InEvent = TProtoHandler::InEvent; type InEvent = TProtoHandler::InEvent;
type OutEvent = TProtoHandler::OutEvent; type OutEvent = TProtoHandler::OutEvent;
type Error = io::Error; // TODO: better error type type Error = TProtoHandler::Error;
type Substream = TProtoHandler::Substream; type Substream = TProtoHandler::Substream;
// The first element of the tuple is the unique upgrade identifier // The first element of the tuple is the unique upgrade identifier
// (see `unique_dial_upgrade_id`). // (see `unique_dial_upgrade_id`).
@ -198,7 +198,7 @@ where
self.handler.shutdown(); self.handler.shutdown();
} }
fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>>, io::Error> { fn poll(&mut self) -> Poll<Option<NodeHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>>, Self::Error> {
// Continue negotiation of newly-opened substreams on the listening side. // Continue negotiation of newly-opened substreams on the listening side.
// We remove each element from `negotiating_in` one by one and add them back if not ready. // We remove each element from `negotiating_in` one by one and add them back if not ready.
for n in (0..self.negotiating_in.len()).rev() { for n in (0..self.negotiating_in.len()).rev() {

View File

@ -31,7 +31,6 @@ use crate::{
} }
}; };
use futures::prelude::*; use futures::prelude::*;
use std::io;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
/// Implementation of `ProtocolsHandler` that combines two protocols into one. /// Implementation of `ProtocolsHandler` that combines two protocols into one.
@ -65,6 +64,7 @@ where
{ {
type InEvent = EitherOutput<TProto1::InEvent, TProto2::InEvent>; type InEvent = EitherOutput<TProto1::InEvent, TProto2::InEvent>;
type OutEvent = EitherOutput<TProto1::OutEvent, TProto2::OutEvent>; type OutEvent = EitherOutput<TProto1::OutEvent, TProto2::OutEvent>;
type Error = EitherError<TProto1::Error, TProto2::Error>;
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = SelectUpgrade<TProto1::InboundProtocol, TProto2::InboundProtocol>; type InboundProtocol = SelectUpgrade<TProto1::InboundProtocol, TProto2::InboundProtocol>;
type OutboundProtocol = EitherUpgrade<TProto1::OutboundProtocol, TProto2::OutboundProtocol>; type OutboundProtocol = EitherUpgrade<TProto1::OutboundProtocol, TProto2::OutboundProtocol>;
@ -161,8 +161,8 @@ where
self.proto2.shutdown(); self.proto2.shutdown();
} }
fn poll(&mut self) -> Poll<Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, io::Error> { fn poll(&mut self) -> Poll<Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, Self::Error> {
match self.proto1.poll()? { match self.proto1.poll().map_err(EitherError::A)? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::First(event))))); return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::First(event)))));
}, },
@ -176,7 +176,7 @@ where
Async::NotReady => () Async::NotReady => ()
}; };
match self.proto2.poll()? { match self.proto2.poll().map_err(EitherError::B)? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => { Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event))))); return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event)))));
}, },

View File

@ -45,7 +45,7 @@ where TTransport: Transport,
<<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as ProtocolsHandler>::InEvent, <<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as ProtocolsHandler>::InEvent,
<<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as ProtocolsHandler>::OutEvent, <<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as ProtocolsHandler>::OutEvent,
NodeHandlerWrapper<TBehaviour::ProtocolsHandler>, NodeHandlerWrapper<TBehaviour::ProtocolsHandler>,
io::Error, <<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as ProtocolsHandler>::Error,
>, >,
/// Handles which nodes to connect to and how to handle the events sent back by the protocol /// Handles which nodes to connect to and how to handle the events sent back by the protocol
@ -97,6 +97,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
TBehaviour::ProtocolsHandler: ProtocolsHandler<Substream = Substream<TMuxer>> + Send + 'static, TBehaviour::ProtocolsHandler: ProtocolsHandler<Substream = Substream<TMuxer>> + Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InEvent: Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::InEvent: Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutEvent: Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutEvent: Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::Error: Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary <TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static,
@ -213,6 +214,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
TBehaviour::ProtocolsHandler: ProtocolsHandler<Substream = Substream<TMuxer>> + Send + 'static, TBehaviour::ProtocolsHandler: ProtocolsHandler<Substream = Substream<TMuxer>> + Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InEvent: Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::InEvent: Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutEvent: Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutEvent: Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::Error: Send + 'static,
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary <TBehaviour::ProtocolsHandler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static, <TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Substream<TMuxer>> + Send + 'static,
<<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static, <<TBehaviour::ProtocolsHandler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,

View File

@ -105,6 +105,7 @@ where
{ {
type InEvent = FloodsubRpc; type InEvent = FloodsubRpc;
type OutEvent = FloodsubRpc; type OutEvent = FloodsubRpc;
type Error = io::Error;
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = FloodsubConfig; type InboundProtocol = FloodsubConfig;
type OutboundProtocol = FloodsubConfig; type OutboundProtocol = FloodsubConfig;

View File

@ -25,7 +25,6 @@ use libp2p_core::{
upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade} upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}
}; };
use smallvec::SmallVec; use smallvec::SmallVec;
use std::io;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use void::{Void, unreachable}; use void::{Void, unreachable};
@ -59,6 +58,7 @@ where
{ {
type InEvent = Void; type InEvent = Void;
type OutEvent = IdentifySender<TSubstream>; type OutEvent = IdentifySender<TSubstream>;
type Error = Void;
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = IdentifyProtocolConfig; type InboundProtocol = IdentifyProtocolConfig;
type OutboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade;
@ -104,7 +104,7 @@ where
Self::OutEvent, Self::OutEvent,
>, >,
>, >,
io::Error, Self::Error,
> { > {
if !self.pending_result.is_empty() { if !self.pending_result.is_empty() {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(

View File

@ -26,7 +26,7 @@ use libp2p_core::{
}; };
use std::{io, marker::PhantomData, time::{Duration, Instant}}; use std::{io, marker::PhantomData, time::{Duration, Instant}};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay; use tokio_timer::{self, Delay};
use void::{Void, unreachable}; use void::{Void, unreachable};
/// Delay between the moment we connect and the first time we identify. /// Delay between the moment we connect and the first time we identify.
@ -81,6 +81,7 @@ where
{ {
type InEvent = Void; type InEvent = Void;
type OutEvent = PeriodicIdHandlerEvent; type OutEvent = PeriodicIdHandlerEvent;
type Error = tokio_timer::Error;
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = DeniedUpgrade; type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = IdentifyProtocolConfig; type OutboundProtocol = IdentifyProtocolConfig;
@ -132,7 +133,7 @@ where
PeriodicIdHandlerEvent, PeriodicIdHandlerEvent,
>, >,
>, >,
io::Error, Self::Error,
> { > {
if let Some(pending_result) = self.pending_result.take() { if let Some(pending_result) = self.pending_result.take() {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom( return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(
@ -146,15 +147,14 @@ where
}; };
// Poll the future that fires when we need to identify the node again. // Poll the future that fires when we need to identify the node again.
match next_id.poll() { match next_id.poll()? {
Ok(Async::NotReady) => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Ok(Async::Ready(())) => { Async::Ready(()) => {
next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); next_id.reset(Instant::now() + DELAY_TO_NEXT_ID);
let upgrade = self.config.clone(); let upgrade = self.config.clone();
let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () };
Ok(Async::Ready(Some(ev))) Ok(Async::Ready(Some(ev)))
} }
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
} }
} }
} }

View File

@ -343,6 +343,7 @@ where
{ {
type InEvent = KademliaHandlerIn<TUserData>; type InEvent = KademliaHandlerIn<TUserData>;
type OutEvent = KademliaHandlerEvent<TUserData>; type OutEvent = KademliaHandlerEvent<TUserData>;
type Error = io::Error; // TODO: better error type?
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = upgrade::EitherUpgrade<KademliaProtocolConfig, upgrade::DeniedUpgrade>; type InboundProtocol = upgrade::EitherUpgrade<KademliaProtocolConfig, upgrade::DeniedUpgrade>;
type OutboundProtocol = KademliaProtocolConfig; type OutboundProtocol = KademliaProtocolConfig;

View File

@ -33,7 +33,7 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay; use tokio_timer::{self, Delay};
use void::{Void, unreachable}; use void::{Void, unreachable};
/// Protocol handler that handles pinging the remote at a regular period. /// Protocol handler that handles pinging the remote at a regular period.
@ -153,6 +153,7 @@ where
{ {
type InEvent = Void; type InEvent = Void;
type OutEvent = OutEvent; type OutEvent = OutEvent;
type Error = io::Error; // TODO: more precise error type
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = DeniedUpgrade; type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = Ping<Instant>; type OutboundProtocol = Ping<Instant>;

View File

@ -30,7 +30,6 @@ use libp2p_core::{
upgrade::DeniedUpgrade upgrade::DeniedUpgrade
}; };
use log::warn; use log::warn;
use std::io;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use void::{Void, unreachable}; use void::{Void, unreachable};
@ -72,6 +71,7 @@ where
{ {
type InEvent = Void; type InEvent = Void;
type OutEvent = Void; type OutEvent = Void;
type Error = Void;
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = Ping<()>; type InboundProtocol = Ping<()>;
type OutboundProtocol = DeniedUpgrade; type OutboundProtocol = DeniedUpgrade;
@ -118,7 +118,7 @@ where
&mut self, &mut self,
) -> Poll< ) -> Poll<
Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>, Option<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>>,
io::Error, Self::Error,
> { > {
// Removes each substream one by one, and pushes them back if they're not ready (which // Removes each substream one by one, and pushes them back if they're not ready (which
// should be the case 99% of the time). // should be the case 99% of the time).