Add an Error associated type to transports (#835)

* Add an Error associated type to transports

* Improve raw swarm a bit

* Rename map_other to map

* Use source() instead of cause()

* RawSwarmIncErr -> IncomingError
This commit is contained in:
Pierre Krieger
2019-01-10 11:27:06 +01:00
committed by GitHub
parent f55a8bc2f3
commit dbff125df2
30 changed files with 798 additions and 682 deletions

View File

@ -275,19 +275,21 @@ pub enum EitherListenStream<A, B> {
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
where
AStream: Stream<Item = (AInner, Multiaddr), Error = IoError>,
BStream: Stream<Item = (BInner, Multiaddr), Error = IoError>,
AStream: Stream<Item = (AInner, Multiaddr)>,
BStream: Stream<Item = (BInner, Multiaddr)>,
{
type Item = (EitherFuture<AInner, BInner>, Multiaddr);
type Error = IoError;
type Error = EitherError<AStream::Error, BStream::Error>;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
EitherListenStream::First(a) => a.poll()
.map(|i| (i.map(|v| (v.map(|(o, addr)| (EitherFuture::First(o), addr)))))),
.map(|i| (i.map(|v| (v.map(|(o, addr)| (EitherFuture::First(o), addr))))))
.map_err(EitherError::A),
EitherListenStream::Second(a) => a.poll()
.map(|i| (i.map(|v| (v.map(|(o, addr)| (EitherFuture::Second(o), addr)))))),
.map(|i| (i.map(|v| (v.map(|(o, addr)| (EitherFuture::Second(o), addr))))))
.map_err(EitherError::B),
}
}
}
@ -302,17 +304,17 @@ pub enum EitherFuture<A, B> {
impl<AFuture, BFuture, AInner, BInner> Future for EitherFuture<AFuture, BFuture>
where
AFuture: Future<Item = AInner, Error = IoError>,
BFuture: Future<Item = BInner, Error = IoError>,
AFuture: Future<Item = AInner>,
BFuture: Future<Item = BInner>,
{
type Item = EitherOutput<AInner, BInner>;
type Error = IoError;
type Error = EitherError<AFuture::Error, BFuture::Error>;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
EitherFuture::First(a) => a.poll().map(|v| v.map(EitherOutput::First)),
EitherFuture::Second(a) => a.poll().map(|v| v.map(EitherOutput::Second)),
EitherFuture::First(a) => a.poll().map(|v| v.map(EitherOutput::First)).map_err(EitherError::A),
EitherFuture::Second(a) => a.poll().map(|v| v.map(EitherOutput::Second)).map_err(EitherError::B),
}
}
}

View File

@ -20,11 +20,10 @@
//! Manage listening on multiple multiaddresses at once.
use crate::{Multiaddr, Transport, transport::TransportError};
use futures::prelude::*;
use std::fmt;
use std::{collections::VecDeque, fmt};
use void::Void;
use crate::{Multiaddr, Transport};
use std::collections::VecDeque;
/// Implementation of `futures::Stream` that allows listening on multiaddresses.
///
@ -151,15 +150,14 @@ where
/// Start listening on a multiaddress.
///
/// Returns an error if the transport doesn't support the given multiaddress.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, Multiaddr>
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, TransportError<TTrans::Error>>
where
TTrans: Clone,
{
let (listener, new_addr) = self
.transport
.clone()
.listen_on(addr)
.map_err(|(_, addr)| addr)?;
.listen_on(addr)?;
self.listeners.push_back(Listener {
listener,
@ -324,7 +322,7 @@ mod tests {
let mut listeners = ListenersStream::new(rx);
listeners.listen_on("/memory".parse().unwrap()).unwrap();
let dial = tx.dial("/memory".parse().unwrap()).unwrap_or_else(|_| panic!());
let dial = tx.dial("/memory".parse().unwrap()).unwrap();
let future = listeners
.into_future()

View File

@ -37,7 +37,8 @@ use crate::{
node::Substream
},
nodes::listeners::{ListenersEvent, ListenersStream},
transport::Transport
transport::Transport,
transport::TransportError,
};
use fnv::FnvHashMap;
use futures::{prelude::*, future};
@ -45,7 +46,6 @@ use std::{
collections::hash_map::{Entry, OccupiedEntry},
error,
fmt,
io::{Error as IoError, ErrorKind as IoErrorKind}
};
mod tests;
@ -60,7 +60,7 @@ where
listeners: ListenersStream<TTrans>,
/// The nodes currently active.
active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError, THandlerErr>,
active_nodes: CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError<TTrans::Error>, THandlerErr>,
/// The reach attempts of the swarm.
/// This needs to be a separate struct in order to handle multiple mutable borrows issues.
@ -122,7 +122,7 @@ where
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
/// The error that happened.
error: IoError,
error: IncomingError<TTrans::Error>,
},
/// A new connection to a peer has been opened.
@ -180,7 +180,7 @@ where
multiaddr: Multiaddr,
/// The error that happened.
error: RawSwarmReachError,
error: RawSwarmReachError<TTrans::Error>,
},
/// Failed to reach a peer that we were trying to dial.
@ -189,7 +189,7 @@ where
multiaddr: Multiaddr,
/// The error that happened.
error: IoError,
error: TransportError<TTrans::Error>,
/// The handler that was passed to `dial()`.
handler: THandler,
@ -208,6 +208,7 @@ impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> fmt::Debug for RawS
where
TOutEvent: fmt::Debug,
TTrans: Transport,
TTrans::Error: fmt::Debug,
THandlerErr: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
@ -283,10 +284,10 @@ where
/// Error that can happen when trying to reach a node.
#[derive(Debug)]
pub enum RawSwarmReachError {
pub enum RawSwarmReachError<TTransErr> {
/// Error in the transport layer.
// TODO: better error type
Transport(IoError),
// TODO: is a TransportError correct here?
Transport(TransportError<TTransErr>),
/// We successfully reached the peer, but there was a mismatch between the expected id and the
/// actual id of the peer.
PeerIdMismatch {
@ -295,7 +296,9 @@ pub enum RawSwarmReachError {
}
}
impl fmt::Display for RawSwarmReachError {
impl<TTransErr> fmt::Display for RawSwarmReachError<TTransErr>
where TTransErr: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
RawSwarmReachError::Transport(err) => write!(f, "{}", err),
@ -306,7 +309,9 @@ impl fmt::Display for RawSwarmReachError {
}
}
impl error::Error for RawSwarmReachError {
impl<TTransErr> error::Error for RawSwarmReachError<TTransErr>
where TTransErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
RawSwarmReachError::Transport(err) => Some(err),
@ -315,6 +320,41 @@ impl error::Error for RawSwarmReachError {
}
}
/// Error that can happen on an incoming connection.
#[derive(Debug)]
pub enum IncomingError<TTransErr> {
/// Error in the transport layer.
// TODO: just TTransError should be enough?
Transport(TransportError<TTransErr>),
/// Denied the incoming connection because we're already connected to this peer as a dialer
/// and we have a higher priority than the remote.
DeniedLowerPriority,
}
impl<TTransErr> fmt::Display for IncomingError<TTransErr>
where TTransErr: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
IncomingError::Transport(err) => write!(f, "{}", err),
IncomingError::DeniedLowerPriority => {
write!(f, "Denied because of lower priority")
},
}
}
}
impl<TTransErr> error::Error for IncomingError<TTransErr>
where TTransErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
IncomingError::Transport(err) => Some(err),
IncomingError::DeniedLowerPriority => None,
}
}
}
/// A new connection arrived on a listener.
pub struct IncomingConnectionEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
where TTrans: Transport
@ -326,7 +366,7 @@ where TTrans: Transport
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
/// Reference to the `active_nodes` field of the swarm.
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError, THandlerErr>,
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError<TTrans::Error>, THandlerErr>,
/// Reference to the `other_reach_attempts` field of the swarm.
other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>,
}
@ -335,6 +375,7 @@ impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
@ -357,7 +398,7 @@ where
{
let connected_point = self.to_connected_point();
let handler = builder(self.info());
let id = self.active_nodes.add_reach_attempt(self.upgrade.map_err(RawSwarmReachError::Transport), handler);
let id = self.active_nodes.add_reach_attempt(self.upgrade.map_err(|err| RawSwarmReachError::Transport(TransportError::Other(err))), handler);
self.other_reach_attempts.push((
id,
connected_point,
@ -510,7 +551,7 @@ where
/// Start listening on the given multiaddress.
#[inline]
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, TransportError<TTrans::Error>> {
self.listeners.listen_on(addr)
}
@ -550,9 +591,10 @@ where
/// Dials a multiaddress without knowing the peer ID we're going to obtain.
///
/// The second parameter is the handler to use if we manage to reach a node.
pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), Multiaddr>
pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), TransportError<TTrans::Error>>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
@ -560,13 +602,10 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
let future = match self.transport().clone().dial(addr.clone()) {
Ok(fut) => fut,
Err((_, addr)) => return Err(addr),
};
let future = self.transport().clone().dial(addr.clone())?;
let connected_point = ConnectedPoint::Dialer { address: addr };
let reach_id = self.active_nodes.add_reach_attempt(future.map_err(RawSwarmReachError::Transport), handler);
let reach_id = self.active_nodes.add_reach_attempt(future.map_err(|err| RawSwarmReachError::Transport(TransportError::Other(err))), handler);
self.reach_attempts.other_reach_attempts.push((reach_id, connected_point));
Ok(())
}
@ -658,6 +697,7 @@ where
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
@ -668,7 +708,7 @@ where
Ok(fut) => {
let expected_peer_id = peer_id.clone();
let fut = fut
.map_err(RawSwarmReachError::Transport)
.map_err(|err| RawSwarmReachError::Transport(TransportError::Other(err)))
.and_then(move |(actual_peer_id, muxer)| {
if actual_peer_id == expected_peer_id {
Ok((actual_peer_id, muxer))
@ -678,10 +718,8 @@ where
});
self.active_nodes.add_reach_attempt(fut, handler)
},
Err((_, addr)) => {
// TODO: better error reporting
let msg = format!("unsupported multiaddr {}", addr);
let fut = future::err(RawSwarmReachError::Transport(IoError::new(IoErrorKind::Other, msg)));
Err(err) => {
let fut = future::err(RawSwarmReachError::Transport(err));
self.active_nodes.add_reach_attempt(fut, handler)
},
};
@ -702,6 +740,7 @@ where
pub fn poll(&mut self) -> Async<RawSwarmEvent<TTrans, TInEvent, TOutEvent, THandler, THandlerErr>>
where
TTrans: Transport<Output = (PeerId, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
@ -829,7 +868,7 @@ impl<THandler> Default for ActionItem<THandler> {
/// > panics will likely happen.
fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr>(
reach_attempts: &mut ReachAttempts,
event: CollectionReachEvent<TInEvent, TOutEvent, THandler, RawSwarmReachError, THandlerErr>
event: CollectionReachEvent<TInEvent, TOutEvent, THandler, RawSwarmReachError<TTrans::Error>, THandlerErr>
) -> (ActionItem<THandler>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>)
where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
@ -856,8 +895,7 @@ where
return (Default::default(), RawSwarmEvent::IncomingConnectionError {
listen_addr,
send_back_addr,
error: IoError::new(IoErrorKind::PermissionDenied,
"refused incoming connection".to_string()),
error: IncomingError::DeniedLowerPriority,
});
}
}
@ -959,7 +997,7 @@ fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool {
fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>(
reach_attempts: &mut ReachAttempts,
reach_id: ReachAttemptId,
error: RawSwarmReachError,
error: RawSwarmReachError<TTrans::Error>,
handler: THandler,
) -> (ActionItem<THandler>, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>)
where TTrans: Transport
@ -1017,6 +1055,7 @@ where TTrans: Transport
});
}
ConnectedPoint::Listener { listen_addr, send_back_addr } => {
let error = IncomingError::Transport(error);
return (Default::default(), RawSwarmEvent::IncomingConnectionError { listen_addr, send_back_addr, error });
}
}
@ -1041,7 +1080,7 @@ where
Connected(PeerConnected<'a, TInEvent>),
/// We are currently attempting to connect to this peer.
PendingConnect(PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>),
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>),
/// We are not connected to this peer at all.
///
@ -1081,6 +1120,7 @@ impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr>
Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
@ -1102,7 +1142,7 @@ where
/// If a connection is pending, returns the `PeerPendingConnect`.
#[inline]
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>> {
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>> {
match self {
Peer::PendingConnect(peer) => Some(peer),
_ => None,
@ -1122,13 +1162,9 @@ where
///
/// If we reach a peer but the `PeerId` doesn't correspond to the one we're expecting, then
/// the whole connection is immediately closed.
///
/// > **Note**: It is possible that the attempt reaches a node that doesn't have the peer id
/// > that we are expecting, in which case the handler will be used for this "wrong"
/// > node.
#[inline]
pub fn or_connect(self, addr: Multiaddr, handler: THandler)
-> Result<PeerPotentialConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
-> PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
{
self.or_connect_with(move |_| addr, handler)
}
@ -1138,40 +1174,40 @@ where
///
/// If we reach a peer but the `PeerId` doesn't correspond to the one we're expecting, then
/// the whole connection is immediately closed.
///
/// > **Note**: It is possible that the attempt reaches a node that doesn't have the peer id
/// > that we are expecting, in which case the handler will be used for this "wrong"
/// > node.
#[inline]
pub fn or_connect_with<TFn>(self, addr: TFn, handler: THandler)
-> Result<PeerPotentialConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
-> PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TFn: FnOnce(&PeerId) -> Multiaddr,
{
match self {
Peer::Connected(peer) => Ok(PeerPotentialConnect::Connected(peer)),
Peer::PendingConnect(peer) => Ok(PeerPotentialConnect::PendingConnect(peer)),
Peer::Connected(peer) => PeerPotentialConnect::Connected(peer),
Peer::PendingConnect(peer) => PeerPotentialConnect::PendingConnect(peer),
Peer::NotConnected(peer) => {
let addr = addr(&peer.peer_id);
match peer.connect(addr, handler) {
Ok(peer) => Ok(PeerPotentialConnect::PendingConnect(peer)),
Err(peer) => Err(Peer::NotConnected(peer)),
}
PeerPotentialConnect::PendingConnect(peer.connect(addr, handler))
}
}
}
}
/// Peer we are potentially going to connect to.
pub enum PeerPotentialConnect<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> {
pub enum PeerPotentialConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
where
TTrans: Transport
{
/// We are connected to this peer.
Connected(PeerConnected<'a, TInEvent>),
/// We are currently attempting to connect to this peer.
PendingConnect(PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>),
PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>),
}
impl<'a, TInEvent, TOutEvent, THandler, THandlerErr> PeerPotentialConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr> {
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport
{
/// Closes the connection or the connection attempt.
///
/// If the connection was active, returns the list of outbound substream openings that were
@ -1196,7 +1232,7 @@ impl<'a, TInEvent, TOutEvent, THandler, THandlerErr> PeerPotentialConnect<'a, TI
/// If a connection is pending, returns the `PeerPendingConnect`.
#[inline]
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>> {
pub fn as_pending_connect(self) -> Option<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>> {
match self {
PeerPotentialConnect::PendingConnect(peer) => Some(peer),
_ => None,
@ -1242,12 +1278,18 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
/// Access to a peer we are attempting to connect to.
#[derive(Debug)]
pub struct PeerPendingConnect<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> {
pub struct PeerPendingConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a>
where
TTrans: Transport
{
attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>,
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError, THandlerErr>,
active_nodes: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, RawSwarmReachError<TTrans::Error>, THandlerErr>,
}
impl<'a, TInEvent, TOutEvent, THandler, THandlerErr> PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr> {
impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport
{
/// Interrupt this connection attempt.
// TODO: consider returning a PeerNotConnected; however that is really pain in terms of
// borrows
@ -1302,6 +1344,7 @@ impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr>
PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
where
TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
@ -1317,7 +1360,9 @@ where
/// If we reach a peer but the `PeerId` doesn't correspond to the one we're expecting, then
/// the whole connection is immediately closed.
#[inline]
pub fn connect(self, addr: Multiaddr, handler: THandler) -> Result<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self> {
pub fn connect(self, addr: Multiaddr, handler: THandler)
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
{
self.connect_inner(handler, addr, Vec::new())
}
@ -1325,13 +1370,13 @@ where
///
/// The multiaddresses passed as parameter will be tried one by one.
///
/// If the iterator is empty, TODO: what to do? at the moment we unwrap
/// Returns an error if the iterator is empty.
///
/// If we reach a peer but the `PeerId` doesn't correspond to the one we're expecting, then
/// the whole connection is immediately closed.
#[inline]
pub fn connect_iter<TIter>(self, addrs: TIter, handler: THandler)
-> Result<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
-> Result<PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
where
TIter: IntoIterator<Item = Multiaddr>,
{
@ -1341,15 +1386,15 @@ where
None => return Err(self)
};
let rest = addrs.collect();
self.connect_inner(handler, first, rest)
Ok(self.connect_inner(handler, first, rest))
}
/// Inner implementation of `connect`.
fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec<Multiaddr>)
-> Result<PeerPendingConnect<'a, TInEvent, TOutEvent, THandler, THandlerErr>, Self>
-> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>
{
self.nodes.start_dial_out(self.peer_id.clone(), handler, first, rest);
Ok(PeerPendingConnect {
PeerPendingConnect {
attempt: match self.nodes.reach_attempts.out_reach_attempts.entry(self.peer_id) {
Entry::Occupied(e) => e,
Entry::Vacant(_) => {
@ -1357,6 +1402,6 @@ where
},
},
active_nodes: &mut self.nodes.active_nodes,
})
}
}
}

View File

@ -198,8 +198,7 @@ fn querying_for_pending_peer() {
assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. }));
let addr = "/memory".parse().expect("bad multiaddr");
let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default());
assert!(pending_peer.is_ok());
assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } ));
assert_matches!(pending_peer, PeerPendingConnect { .. });
}
#[test]
@ -307,8 +306,7 @@ fn known_peer_that_is_unreachable_yields_dial_error() {
assert_matches!(peer, Peer::NotConnected(PeerNotConnected{ .. }));
let addr = "/memory".parse::<Multiaddr>().expect("bad multiaddr");
let pending_peer = peer.as_not_connected().unwrap().connect(addr, Handler::default());
assert!(pending_peer.is_ok());
assert_matches!(pending_peer, Ok(PeerPendingConnect { .. } ));
assert_matches!(pending_peer, PeerPendingConnect { .. });
}
let mut rt = Runtime::new().unwrap();
// Drive it forward until we hear back from the node.
@ -349,7 +347,7 @@ fn yields_node_error_when_there_is_an_error_after_successful_connect() {
let mut handler = Handler::default();
// Force an error
handler.next_states = vec![ HandlerState::Err ];
peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer");
peer.as_not_connected().unwrap().connect(addr, handler);
}
// Ensure we run on a single thread
@ -403,7 +401,7 @@ fn yields_node_closed_when_the_node_closes_after_successful_connect() {
let mut handler = Handler::default();
// Force handler to close
handler.next_states = vec![ HandlerState::Ready(None) ];
peer.as_not_connected().unwrap().connect(addr, handler).expect("can connect unconnected peer");
peer.as_not_connected().unwrap().connect(addr, handler);
}
// Ensure we run on a single thread

View File

@ -51,6 +51,7 @@ use crate::{
},
protocols_handler::{NodeHandlerWrapper, ProtocolsHandler},
topology::Topology,
transport::TransportError,
topology::DisconnectReason,
};
use futures::prelude::*;
@ -115,6 +116,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (PeerId, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
@ -171,7 +173,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
/// Returns an error if the address is not supported.
/// On success, returns an alternative version of the address.
#[inline]
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<Multiaddr, TransportError<TTransport::Error>> {
let result = me.raw_swarm.listen_on(addr);
if let Ok(ref addr) = result {
me.listened_addrs.push(addr.clone());
@ -183,7 +185,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
///
/// Returns an error if the address is not supported.
#[inline]
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), Multiaddr> {
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), TransportError<TTransport::Error>> {
let handler = me.behaviour.new_handler();
me.raw_swarm.dial(addr, handler.into_node_handler())
}
@ -232,6 +234,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (PeerId, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,

View File

@ -28,7 +28,7 @@ use futures::{
stream,
};
use std::io;
use crate::{Multiaddr, PeerId, Transport};
use crate::{Multiaddr, PeerId, Transport, transport::TransportError};
use crate::tests::dummy_muxer::DummyMuxer;
#[derive(Debug, PartialEq, Clone)]
@ -68,11 +68,12 @@ impl DummyTransport {
}
impl Transport for DummyTransport {
type Output = (PeerId, DummyMuxer);
type Error = io::Error;
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=io::Error> + Send>;
type ListenerUpgrade = FutureResult<Self::Output, io::Error>;
type Dial = Box<Future<Item = Self::Output, Error = io::Error> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>>
where
Self: Sized,
{
@ -95,11 +96,11 @@ impl Transport for DummyTransport {
}
})
}
ListenerState::Error => Err((self, addr2)),
ListenerState::Error => Err(TransportError::MultiaddrNotSupported(addr)),
}
}
fn dial(self, _addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
fn dial(self, _addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized,
{

View File

@ -18,10 +18,11 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{nodes::raw_swarm::ConnectedPoint, transport::Transport};
use crate::either::EitherError;
use crate::{nodes::raw_swarm::ConnectedPoint, transport::Transport, transport::TransportError};
use futures::{future::Either, prelude::*, try_ready};
use multiaddr::Multiaddr;
use std::io;
use std::error;
/// See the `Transport::and_then` method.
#[derive(Debug, Clone)]
@ -38,22 +39,19 @@ impl<T, C, F, O> Transport for AndThen<T, C>
where
T: Transport,
C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
F: IntoFuture<Item = O, Error = io::Error>
F: IntoFuture<Item = O>,
F::Error: error::Error,
{
type Output = O;
type Error = EitherError<T::Error, F::Error>;
type Listener = AndThenStream<T::Listener, C>;
type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F::Future>;
type Dial = AndThenFuture<T::Dial, C, F::Future>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (listening_stream, new_addr) = match self.transport.listen_on(addr) {
Ok((l, new_addr)) => (l, new_addr),
Err((transport, addr)) => {
let builder = AndThen { transport, fun: self.fun };
return Err((builder, addr));
}
};
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
let (listening_stream, new_addr) = self.transport.listen_on(addr)
.map_err(|err| err.map(EitherError::A))?;
// Try to negotiate the protocol.
// Note that failing to negotiate a protocol will never produce a future with an error.
@ -69,14 +67,9 @@ where
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let dialed_fut = match self.transport.dial(addr.clone()) {
Ok(f) => f,
Err((transport, addr)) => {
let builder = AndThen { transport, fun: self.fun };
return Err((builder, addr));
}
};
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let dialed_fut = self.transport.dial(addr.clone())
.map_err(|err| err.map(EitherError::A))?;
let connected_point = ConnectedPoint::Dialer { address: addr };
@ -98,20 +91,20 @@ where
///
/// Applies a function to every stream item.
#[derive(Debug, Clone)]
pub struct AndThenStream<T, F> { stream: T, listen_addr: Multiaddr, fun: F }
pub struct AndThenStream<TListener, TMap> { stream: TListener, listen_addr: Multiaddr, fun: TMap }
impl<T, F, A, B, X> Stream for AndThenStream<T, F>
impl<TListener, TMap, TTransOut, TMapOut, TListUpgr, TTransErr> Stream for AndThenStream<TListener, TMap>
where
T: Stream<Item = (X, Multiaddr)>,
X: Future<Item = A>,
F: FnOnce(A, ConnectedPoint) -> B + Clone,
B: IntoFuture<Error = X::Error>
TListener: Stream<Item = (TListUpgr, Multiaddr), Error = TTransErr>,
TListUpgr: Future<Item = TTransOut, Error = TTransErr>,
TMap: FnOnce(TTransOut, ConnectedPoint) -> TMapOut + Clone,
TMapOut: IntoFuture
{
type Item = (AndThenFuture<X, F, B::Future>, Multiaddr);
type Error = T::Error;
type Item = (AndThenFuture<TListUpgr, TMap, TMapOut::Future>, Multiaddr);
type Error = EitherError<TTransErr, TMapOut::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.stream.poll()? {
match self.stream.poll().map_err(EitherError::A)? {
Async::Ready(Some((future, addr))) => {
let f = self.fun.clone();
let p = ConnectedPoint::Listener {
@ -134,29 +127,29 @@ where
///
/// Applies a function to the result of the inner future.
#[derive(Debug)]
pub struct AndThenFuture<T, F, U> {
inner: Either<T, U>,
args: Option<(F, ConnectedPoint)>
pub struct AndThenFuture<TFut, TMap, TMapOut> {
inner: Either<TFut, TMapOut>,
args: Option<(TMap, ConnectedPoint)>
}
impl<T, A, F, B> Future for AndThenFuture<T, F, B::Future>
impl<TFut, TMap, TMapOut> Future for AndThenFuture<TFut, TMap, TMapOut::Future>
where
T: Future<Item = A>,
F: FnOnce(A, ConnectedPoint) -> B,
B: IntoFuture<Error = T::Error>
TFut: Future,
TMap: FnOnce(TFut::Item, ConnectedPoint) -> TMapOut,
TMapOut: IntoFuture
{
type Item = <B::Future as Future>::Item;
type Error = T::Error;
type Item = <TMapOut::Future as Future>::Item;
type Error = EitherError<TFut::Error, TMapOut::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let future = match self.inner {
Either::A(ref mut future) => {
let item = try_ready!(future.poll());
let item = try_ready!(future.poll().map_err(EitherError::A));
let (f, a) = self.args.take().expect("AndThenFuture has already finished.");
f(item, a).into_future()
}
Either::B(ref mut future) => return future.poll()
Either::B(ref mut future) => return future.poll().map_err(EitherError::B)
};
self.inner = Either::B(future);

View File

@ -18,16 +18,14 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::transport::{Transport, TransportError};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::fmt;
use std::io::Error as IoError;
use std::sync::Arc;
use crate::transport::Transport;
use std::{error, fmt, sync::Arc};
/// See the `Transport::boxed` method.
#[inline]
pub fn boxed<T>(transport: T) -> Boxed<T::Output>
pub fn boxed<T>(transport: T) -> Boxed<T::Output, T::Error>
where
T: Transport + Clone + Send + Sync + 'static,
T::Dial: Send + 'static,
@ -39,37 +37,34 @@ where
}
}
pub type Dial<O> = Box<Future<Item = O, Error = IoError> + Send>;
pub type Listener<O> = Box<Stream<Item = (ListenerUpgrade<O>, Multiaddr), Error = IoError> + Send>;
pub type ListenerUpgrade<O> = Box<Future<Item = O, Error = IoError> + Send>;
pub type Incoming<O> = Box<Future<Item = (IncomingUpgrade<O>, Multiaddr), Error = IoError> + Send>;
pub type IncomingUpgrade<O> = Box<Future<Item = O, Error = IoError> + Send>;
pub type Dial<O, E> = Box<Future<Item = O, Error = E> + Send>;
pub type Listener<O, E> = Box<Stream<Item = (ListenerUpgrade<O, E>, Multiaddr), Error = E> + Send>;
pub type ListenerUpgrade<O, E> = Box<Future<Item = O, Error = E> + Send>;
trait Abstract<O> {
fn listen_on(&self, addr: Multiaddr) -> Result<(Listener<O>, Multiaddr), Multiaddr>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, Multiaddr>;
trait Abstract<O, E> {
fn listen_on(&self, addr: Multiaddr) -> Result<(Listener<O, E>, Multiaddr), TransportError<E>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>>;
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
}
impl<T, O> Abstract<O> for T
impl<T, O, E> Abstract<O, E> for T
where
T: Transport<Output = O> + Clone + 'static,
T: Transport<Output = O, Error = E> + Clone + 'static,
E: error::Error,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
{
fn listen_on(&self, addr: Multiaddr) -> Result<(Listener<O>, Multiaddr), Multiaddr> {
let (listener, new_addr) =
Transport::listen_on(self.clone(), addr).map_err(|(_, addr)| addr)?;
fn listen_on(&self, addr: Multiaddr) -> Result<(Listener<O, E>, Multiaddr), TransportError<E>> {
let (listener, new_addr) = Transport::listen_on(self.clone(), addr)?;
let fut = listener.map(|(upgrade, addr)| {
(Box::new(upgrade) as ListenerUpgrade<O>, addr)
(Box::new(upgrade) as ListenerUpgrade<O, E>, addr)
});
Ok((Box::new(fut) as Box<_>, new_addr))
}
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, Multiaddr> {
let fut = Transport::dial(self.clone(), addr)
.map_err(|(_, addr)| addr)?;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>> {
let fut = Transport::dial(self.clone(), addr)?;
Ok(Box::new(fut) as Box<_>)
}
@ -80,17 +75,17 @@ where
}
/// See the `Transport::boxed` method.
pub struct Boxed<O> {
inner: Arc<Abstract<O> + Send + Sync>,
pub struct Boxed<O, E> {
inner: Arc<Abstract<O, E> + Send + Sync>,
}
impl<O> fmt::Debug for Boxed<O> {
impl<O, E> fmt::Debug for Boxed<O, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "BoxedTransport")
}
}
impl<O> Clone for Boxed<O> {
impl<O, E> Clone for Boxed<O, E> {
#[inline]
fn clone(&self) -> Self {
Boxed {
@ -99,26 +94,23 @@ impl<O> Clone for Boxed<O> {
}
}
impl<O> Transport for Boxed<O> {
impl<O, E> Transport for Boxed<O, E>
where E: error::Error,
{
type Output = O;
type Listener = Listener<O>;
type ListenerUpgrade = ListenerUpgrade<O>;
type Dial = Dial<O>;
type Error = E;
type Listener = Listener<O, E>;
type ListenerUpgrade = ListenerUpgrade<O, E>;
type Dial = Dial<O, E>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
match self.inner.listen_on(addr) {
Ok(listen) => Ok(listen),
Err(addr) => Err((self, addr)),
}
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
self.inner.listen_on(addr)
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
match self.inner.dial(addr) {
Ok(dial) => Ok(dial),
Err(addr) => Err((self, addr)),
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial(addr)
}
#[inline]

View File

@ -18,9 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::either::{EitherListenStream, EitherOutput, EitherFuture};
use crate::either::{EitherListenStream, EitherOutput, EitherError, EitherFuture};
use crate::transport::{Transport, TransportError};
use multiaddr::Multiaddr;
use crate::transport::Transport;
/// Struct returned by `or_transport()`.
#[derive(Debug, Copy, Clone)]
@ -34,36 +34,45 @@ impl<A, B> OrTransport<A, B> {
impl<A, B> Transport for OrTransport<A, B>
where
A: Transport,
B: Transport,
A: Transport,
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (first, addr) = match self.0.listen_on(addr) {
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
let addr = match self.0.listen_on(addr) {
Ok((connec, addr)) => return Ok((EitherListenStream::First(connec), addr)),
Err(err) => err,
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => return Err(TransportError::Other(EitherError::A(err))),
};
match self.1.listen_on(addr) {
Ok((connec, addr)) => Ok((EitherListenStream::Second(connec), addr)),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
let addr = match self.1.listen_on(addr) {
Ok((connec, addr)) => return Ok((EitherListenStream::Second(connec), addr)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => return Err(TransportError::Other(EitherError::B(err))),
};
Err(TransportError::MultiaddrNotSupported(addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
let (first, addr) = match self.0.dial(addr) {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = match self.0.dial(addr) {
Ok(connec) => return Ok(EitherFuture::First(connec)),
Err(err) => err,
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => return Err(TransportError::Other(EitherError::A(err))),
};
match self.1.dial(addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
let addr = match self.1.dial(addr) {
Ok(connec) => return Ok(EitherFuture::Second(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => return Err(TransportError::Other(EitherError::B(err))),
};
Err(TransportError::MultiaddrNotSupported(addr))
}
#[inline]

View File

@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{nodes::raw_swarm::ConnectedPoint, transport::Transport};
use crate::{nodes::raw_swarm::ConnectedPoint, transport::Transport, transport::TransportError};
use futures::{prelude::*, try_ready};
use multiaddr::Multiaddr;
@ -39,35 +39,28 @@ where
F: FnOnce(T::Output, ConnectedPoint) -> D + Clone
{
type Output = D;
type Error = T::Error;
type Listener = MapStream<T::Listener, F>;
type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
type Dial = MapFuture<T::Dial, F>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
match self.transport.listen_on(addr) {
Ok((stream, listen_addr)) => {
let stream = MapStream {
stream,
listen_addr: listen_addr.clone(),
fun: self.fun
};
Ok((stream, listen_addr))
}
Err((transport, addr)) => Err((Map { transport, fun: self.fun }, addr)),
}
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
let (stream, listen_addr) = self.transport.listen_on(addr)?;
let stream = MapStream {
stream,
listen_addr: listen_addr.clone(),
fun: self.fun
};
Ok((stream, listen_addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
match self.transport.dial(addr.clone()) {
Ok(future) => {
let p = ConnectedPoint::Dialer { address: addr };
Ok(MapFuture {
inner: future,
args: Some((self.fun, p))
})
}
Err((transport, addr)) => Err((Map { transport, fun: self.fun }, addr)),
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.transport.dial(addr.clone())?;
let p = ConnectedPoint::Dialer { address: addr };
Ok(MapFuture {
inner: future,
args: Some((self.fun, p))
})
}
#[inline]

View File

@ -18,10 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::transport::Transport;
use futures::{prelude::*, try_ready};
use crate::transport::{Transport, TransportError};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use std::error;
/// See `Transport::map_err`.
#[derive(Debug, Copy, Clone)]
@ -38,17 +38,19 @@ impl<T, F> MapErr<T, F> {
}
}
impl<T, F> Transport for MapErr<T, F>
impl<T, F, TErr> Transport for MapErr<T, F>
where
T: Transport,
F: FnOnce(IoError) -> IoError + Clone,
F: FnOnce(T::Error) -> TErr + Clone,
TErr: error::Error,
{
type Output = T::Output;
type Error = TErr;
type Listener = MapErrListener<T, F>;
type ListenerUpgrade = MapErrListenerUpgrade<T, F>;
type Dial = MapErrDial<T, F>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
let map = self.map;
match self.transport.listen_on(addr) {
@ -56,16 +58,16 @@ where
let stream = MapErrListener { inner: stream, map };
Ok((stream, listen_addr))
}
Err((transport, addr)) => Err((MapErr { transport, map }, addr)),
Err(err) => Err(err.map(move |err| map(err))),
}
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let map = self.map;
match self.transport.dial(addr) {
Ok(future) => Ok(MapErrDial { inner: future, map: Some(map) }),
Err((transport, addr)) => Err((MapErr { transport, map }, addr)),
Err(err) => Err(err.map(move |err| map(err))),
}
}
@ -82,19 +84,22 @@ where T: Transport {
map: F,
}
impl<T, F> Stream for MapErrListener<T, F>
impl<T, F, TErr> Stream for MapErrListener<T, F>
where T: Transport,
F: FnOnce(IoError) -> IoError + Clone,
F: FnOnce(T::Error) -> TErr + Clone,
TErr: error::Error,
{
type Item = (MapErrListenerUpgrade<T, F>, Multiaddr);
type Error = IoError;
type Error = TErr;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.inner.poll()) {
Some((value, addr)) => Ok(Async::Ready(
match self.inner.poll() {
Ok(Async::Ready(Some((value, addr)))) => Ok(Async::Ready(
Some((MapErrListenerUpgrade { inner: value, map: Some(self.map.clone()) }, addr)))),
None => Ok(Async::Ready(None))
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err((self.map.clone())(err)),
}
}
}
@ -106,12 +111,12 @@ where T: Transport {
map: Option<F>,
}
impl<T, F> Future for MapErrListenerUpgrade<T, F>
impl<T, F, TErr> Future for MapErrListenerUpgrade<T, F>
where T: Transport,
F: FnOnce(IoError) -> IoError,
F: FnOnce(T::Error) -> TErr,
{
type Item = T::Output;
type Error = IoError;
type Error = TErr;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -130,19 +135,18 @@ where T: Transport,
/// Dialing future for `MapErr`.
pub struct MapErrDial<T, F>
where T: Transport,
F: FnOnce(IoError) -> IoError,
where T: Transport
{
inner: T::Dial,
map: Option<F>,
}
impl<T, F> Future for MapErrDial<T, F>
impl<T, F, TErr> Future for MapErrDial<T, F>
where T: Transport,
F: FnOnce(IoError) -> IoError,
F: FnOnce(T::Error) -> TErr,
{
type Item = T::Output;
type Error = IoError;
type Error = TErr;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {

View File

@ -1,98 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::transport::Transport;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io;
/// See `Transport::map_err_dial`.
#[derive(Debug, Copy, Clone)]
pub struct MapErrDial<T, F> { transport: T, fun: F }
impl<T, F> MapErrDial<T, F> {
/// Internal function that builds a `MapErrDial`.
#[inline]
pub(crate) fn new(transport: T, fun: F) -> MapErrDial<T, F> {
MapErrDial { transport, fun }
}
}
impl<T, F> Transport for MapErrDial<T, F>
where
T: Transport,
F: FnOnce(io::Error, Multiaddr) -> io::Error
{
type Output = T::Output;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = MapErrFuture<T::Dial, F>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let fun = self.fun;
self.transport.listen_on(addr)
.map_err(move |(transport, addr)| (MapErrDial { transport, fun }, addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
match self.transport.dial(addr.clone()) {
Ok(future) => Ok(MapErrFuture {
inner: future,
args: Some((self.fun, addr))
}),
Err((transport, addr)) => Err((MapErrDial { transport, fun: self.fun }, addr))
}
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.nat_traversal(server, observed)
}
}
/// Custom `Future` to avoid boxing.
///
/// Applies a function to the inner future's error type.
#[derive(Debug, Clone)]
pub struct MapErrFuture<T, F> {
inner: T,
args: Option<(F, Multiaddr)>,
}
impl<T, E, F, A> Future for MapErrFuture<T, F>
where
T: Future<Error = E>,
F: FnOnce(E, Multiaddr) -> A
{
type Item = T::Item;
type Error = A;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(x)) => Ok(Async::Ready(x)),
Err(e) => {
let (f, a) = self.args.take().expect("MapErrFuture has already finished.");
Err(f(e, a))
}
}
}
}

View File

@ -18,13 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{Transport, transport::TransportError};
use bytes::{Bytes, IntoBuf};
use futures::{future::{self, FutureResult}, prelude::*, stream, sync::mpsc};
use multiaddr::{Protocol, Multiaddr};
use parking_lot::Mutex;
use rw_stream_sink::RwStreamSink;
use std::{io, sync::Arc};
use crate::Transport;
use std::{error, fmt, sync::Arc};
/// Builds a new pair of `Transport`s. The dialer can reach the listener by dialing `/memory`.
#[inline]
@ -52,17 +52,18 @@ impl<T> Clone for Dialer<T> {
impl<T: IntoBuf + Send + 'static> Transport for Dialer<T> {
type Output = Channel<T>;
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=io::Error> + Send>;
type ListenerUpgrade = FutureResult<Self::Output, io::Error>;
type Dial = Box<Future<Item=Self::Output, Error=io::Error> + Send>;
type Error = MemoryTransportError;
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=MemoryTransportError> + Send>;
type ListenerUpgrade = FutureResult<Self::Output, MemoryTransportError>;
type Dial = Box<Future<Item=Self::Output, Error=MemoryTransportError> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Err((self, addr))
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
if !is_memory_addr(&addr) {
return Err((self, addr))
return Err(TransportError::MultiaddrNotSupported(addr))
}
let (a_tx, a_rx) = mpsc::unbounded();
let (b_tx, b_rx) = mpsc::unbounded();
@ -70,7 +71,7 @@ impl<T: IntoBuf + Send + 'static> Transport for Dialer<T> {
let b = Chan { incoming: b_rx, outgoing: a_tx };
let future = self.0.send(b)
.map(move |_| a.into())
.map_err(|_| io::ErrorKind::ConnectionRefused.into());
.map_err(|_| MemoryTransportError::RemoteClosed);
Ok(Box::new(future))
}
@ -83,6 +84,24 @@ impl<T: IntoBuf + Send + 'static> Transport for Dialer<T> {
}
}
/// Error that can be produced from the `MemoryTransport`.
#[derive(Debug, Copy, Clone)]
pub enum MemoryTransportError {
/// The other side of the transport has been closed earlier.
RemoteClosed,
}
impl fmt::Display for MemoryTransportError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
MemoryTransportError::RemoteClosed =>
write!(f, "The other side of the memory transport has been closed."),
}
}
}
impl error::Error for MemoryTransportError {}
/// Receiving end of the memory transport.
pub struct Listener<T = Bytes>(Arc<Mutex<mpsc::UnboundedReceiver<Chan<T>>>>);
@ -94,13 +113,14 @@ impl<T> Clone for Listener<T> {
impl<T: IntoBuf + Send + 'static> Transport for Listener<T> {
type Output = Channel<T>;
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=io::Error> + Send>;
type ListenerUpgrade = FutureResult<Self::Output, io::Error>;
type Dial = Box<Future<Item=Self::Output, Error=io::Error> + Send>;
type Error = MemoryTransportError;
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=MemoryTransportError> + Send>;
type ListenerUpgrade = FutureResult<Self::Output, MemoryTransportError>;
type Dial = Box<Future<Item=Self::Output, Error=MemoryTransportError> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
if !is_memory_addr(&addr) {
return Err((self, addr))
return Err(TransportError::MultiaddrNotSupported(addr));
}
let addr2 = addr.clone();
let receiver = self.0.clone();
@ -113,8 +133,8 @@ impl<T: IntoBuf + Send + 'static> Transport for Listener<T> {
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
Err((self, addr))
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}
#[inline]
@ -154,31 +174,31 @@ pub struct Chan<T = Bytes> {
impl<T> Stream for Chan<T> {
type Item = T;
type Error = io::Error;
type Error = MemoryTransportError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.incoming.poll().map_err(|()| io::ErrorKind::ConnectionReset.into())
self.incoming.poll().map_err(|()| MemoryTransportError::RemoteClosed)
}
}
impl<T> Sink for Chan<T> {
type SinkItem = T;
type SinkError = io::Error;
type SinkError = MemoryTransportError;
#[inline]
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
self.outgoing.start_send(item).map_err(|_| io::ErrorKind::ConnectionReset.into())
self.outgoing.start_send(item).map_err(|_| MemoryTransportError::RemoteClosed)
}
#[inline]
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.outgoing.poll_complete().map_err(|_| io::ErrorKind::ConnectionReset.into())
self.outgoing.poll_complete().map_err(|_| MemoryTransportError::RemoteClosed)
}
#[inline]
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.outgoing.close().map_err(|_| io::ErrorKind::ConnectionReset.into())
self.outgoing.close().map_err(|_| MemoryTransportError::RemoteClosed)
}
}

View File

@ -29,7 +29,7 @@
use crate::{InboundUpgrade, OutboundUpgrade, nodes::raw_swarm::ConnectedPoint};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::io::Error as IoError;
use std::{error, fmt};
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};
@ -38,7 +38,6 @@ pub mod boxed;
pub mod choice;
pub mod map;
pub mod map_err;
pub mod map_err_dial;
pub mod memory;
pub mod timeout;
pub mod upgrade;
@ -60,39 +59,43 @@ pub use self::upgrade::Upgrade;
pub trait Transport {
/// The raw connection to a peer.
type Output;
/// Error that can happen when dialing or listening.
type Error: error::Error;
/// The listener produces incoming connections.
///
/// An item should be produced whenever a connection is received at the lowest level of the
/// transport stack. The item is a `Future` that is signalled once some pre-processing has
/// taken place, and that connection has been upgraded to the wanted protocols.
type Listener: Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>;
type Listener: Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = Self::Error>;
/// After a connection has been received, we may need to do some asynchronous pre-processing
/// on it (e.g. an intermediary protocol negotiation). While this pre-processing takes place, we
/// want to be able to continue polling on the listener.
type ListenerUpgrade: Future<Item = Self::Output, Error = IoError>;
/// on it (e.g. an intermediary protocol negotiation). While this pre-processing takes place,
/// we want to be able to continue polling on the listener.
type ListenerUpgrade: Future<Item = Self::Output, Error = Self::Error>;
/// A future which indicates that we are currently dialing to a peer.
type Dial: Future<Item = Self::Output, Error = IoError>;
type Dial: Future<Item = Self::Output, Error = Self::Error>;
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
/// to other nodes, instead of the one passed as parameter.
///
/// Returns the address back if it isn't supported.
///
/// > **Note**: The reason why we need to change the `Multiaddr` on success is to handle
/// > situations such as turning `/ip4/127.0.0.1/tcp/0` into
/// > `/ip4/127.0.0.1/tcp/<actual port>`.
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>>
where
Self: Sized;
/// Dial the given multi-addr.
///
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
/// Returns either a future which may resolve to a connection.
///
/// If `MultiaddrNotSupported` is returned, then caller can try another implementation of
/// `Transport` if there is any. If instead an error is returned, then we assume that there is
/// no point in trying another `Transport`.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized;
@ -114,7 +117,7 @@ pub trait Transport {
/// Turns this `Transport` into an abstract boxed transport.
#[inline]
fn boxed(self) -> boxed::Boxed<Self::Output>
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
where Self: Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Self::Listener: Send + 'static,
@ -135,26 +138,14 @@ pub trait Transport {
/// Applies a function on the errors generated by the futures of the `Transport`.
#[inline]
fn map_err<F>(self, map_err: F) -> map_err::MapErr<Self, F>
fn map_err<F, TNewErr>(self, map_err: F) -> map_err::MapErr<Self, F>
where
Self: Sized,
F: FnOnce(IoError) -> IoError + Clone
F: FnOnce(Self::Error) -> TNewErr + Clone
{
map_err::MapErr::new(self, map_err)
}
/// Applies a function on the errors generated by the futures of the `Transport` when dialing.
///
/// Contrary to `map_err`, this gives access to the `Multiaddr` that we tried to dial.
#[inline]
fn map_err_dial<F>(self, map_err: F) -> map_err_dial::MapErrDial<Self, F>
where
Self: Sized,
F: FnOnce(IoError, Multiaddr) -> IoError
{
map_err_dial::MapErrDial::new(self, map_err)
}
/// Builds a new struct that implements `Transport` that contains both `self` and `other`.
///
/// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial`
@ -193,7 +184,7 @@ pub trait Transport {
where
Self: Sized,
C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone,
F: IntoFuture<Item = O, Error = IoError>
F: IntoFuture<Item = O>
{
and_then::AndThen::new(self, upgrade)
}
@ -228,3 +219,48 @@ pub trait Transport {
timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
}
}
/// Error that can happen when dialing or listening.
#[derive(Debug, Clone)]
pub enum TransportError<TErr> {
/// The `Multiaddr` passed as parameter is not supported.
///
/// Contains back the same address.
MultiaddrNotSupported(Multiaddr),
/// Any other error that the `Transport` may produce.
Other(TErr),
}
impl<TErr> TransportError<TErr> {
/// Applies a map to the `Other` variant.
#[inline]
pub fn map<TNewErr>(self, map: impl FnOnce(TErr) -> TNewErr) -> TransportError<TNewErr> {
match self {
TransportError::MultiaddrNotSupported(addr) => TransportError::MultiaddrNotSupported(addr),
TransportError::Other(err) => TransportError::Other(map(err)),
}
}
}
impl<TErr> fmt::Display for TransportError<TErr>
where TErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TransportError::MultiaddrNotSupported(addr) => write!(f, "Multiaddr is not supported: {}", addr),
TransportError::Other(err) => write!(f, "{}", err),
}
}
}
impl<TErr> error::Error for TransportError<TErr>
where TErr: error::Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
TransportError::MultiaddrNotSupported(_) => None,
TransportError::Other(err) => Some(err),
}
}
}

View File

@ -23,11 +23,10 @@
//! The timeout includes the upgrading process.
// TODO: add example
use crate::{Multiaddr, Transport};
use crate::{Multiaddr, Transport, transport::TransportError};
use futures::{try_ready, Async, Future, Poll, Stream};
use log::debug;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::time::Duration;
use std::{error, fmt, time::Duration};
use tokio_timer::Timeout;
use tokio_timer::timeout::Error as TimeoutError;
@ -77,49 +76,32 @@ impl<InnerTrans> TransportTimeout<InnerTrans> {
impl<InnerTrans> Transport for TransportTimeout<InnerTrans>
where
InnerTrans: Transport,
InnerTrans::Error: 'static,
{
type Output = InnerTrans::Output;
type Error = TransportTimeoutError<InnerTrans::Error>;
type Listener = TimeoutListener<InnerTrans::Listener>;
type ListenerUpgrade = TokioTimerMapErr<Timeout<InnerTrans::ListenerUpgrade>>;
type Dial = TokioTimerMapErr<Timeout<InnerTrans::Dial>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
match self.inner.listen_on(addr) {
Ok((listener, addr)) => {
let listener = TimeoutListener {
inner: listener,
timeout: self.incoming_timeout,
};
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
let (listener, addr) = self.inner.listen_on(addr)
.map_err(|err| err.map(TransportTimeoutError::Other))?;
Ok((listener, addr))
}
Err((inner, addr)) => {
let transport = TransportTimeout {
inner,
outgoing_timeout: self.outgoing_timeout,
incoming_timeout: self.incoming_timeout,
};
let listener = TimeoutListener {
inner: listener,
timeout: self.incoming_timeout,
};
Err((transport, addr))
}
}
Ok((listener, addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
match self.inner.dial(addr) {
Ok(dial) => Ok(TokioTimerMapErr {
inner: Timeout::new(dial, self.outgoing_timeout),
}),
Err((inner, addr)) => {
let transport = TransportTimeout {
inner,
outgoing_timeout: self.outgoing_timeout,
incoming_timeout: self.incoming_timeout,
};
Err((transport, addr))
}
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let dial = self.inner.dial(addr)
.map_err(|err| err.map(TransportTimeoutError::Other))?;
Ok(TokioTimerMapErr {
inner: Timeout::new(dial, self.outgoing_timeout),
})
}
#[inline]
@ -140,10 +122,10 @@ where
InnerStream: Stream<Item = (O, Multiaddr)>,
{
type Item = (TokioTimerMapErr<Timeout<O>>, Multiaddr);
type Error = InnerStream::Error;
type Error = TransportTimeoutError<InnerStream::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let poll_out = try_ready!(self.inner.poll());
let poll_out = try_ready!(self.inner.poll().map_err(TransportTimeoutError::Other));
if let Some((inner_fut, addr)) = poll_out {
let fut = TokioTimerMapErr {
inner: Timeout::new(inner_fut, self.timeout),
@ -155,7 +137,8 @@ where
}
}
/// Wraps around a `Future`. Turns the error type from `TimeoutError<IoError>` to `IoError`.
/// Wraps around a `Future`. Turns the error type from `TimeoutError<Err>` to
/// `TransportTimeoutError<Err>`.
// TODO: can be replaced with `impl Future` once `impl Trait` are fully stable in Rust
// (https://github.com/rust-lang/rust/issues/34511)
#[must_use = "futures do nothing unless polled"]
@ -163,26 +146,60 @@ pub struct TokioTimerMapErr<InnerFut> {
inner: InnerFut,
}
impl<InnerFut> Future for TokioTimerMapErr<InnerFut>
impl<InnerFut, TErr> Future for TokioTimerMapErr<InnerFut>
where
InnerFut: Future<Error = TimeoutError<IoError>>,
InnerFut: Future<Error = TimeoutError<TErr>>,
{
type Item = InnerFut::Item;
type Error = IoError;
type Error = TransportTimeoutError<TErr>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(|err: TimeoutError<IoError>| {
self.inner.poll().map_err(|err: TimeoutError<TErr>| {
if err.is_inner() {
err.into_inner().expect("ensured by is_inner()")
TransportTimeoutError::Other(err.into_inner().expect("ensured by is_inner()"))
} else if err.is_elapsed() {
debug!("timeout elapsed for connection");
IoErrorKind::TimedOut.into()
TransportTimeoutError::Timeout
} else {
assert!(err.is_timer());
debug!("tokio timer error in timeout wrapper");
let err = err.into_timer().expect("ensure by is_timer()");
IoError::new(IoErrorKind::Other, err)
TransportTimeoutError::TimerError
}
})
}
}
/// Error that can be produced by the `TransportTimeout` layer.
#[derive(Debug, Copy, Clone)]
pub enum TransportTimeoutError<TErr> {
/// The transport timed out.
Timeout,
/// An error happened in the timer.
TimerError,
/// Other kind of error.
Other(TErr),
}
impl<TErr> fmt::Display for TransportTimeoutError<TErr>
where TErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TransportTimeoutError::Timeout => write!(f, "Timeout has been reached"),
TransportTimeoutError::TimerError => write!(f, "Error in the timer"),
TransportTimeoutError::Other(err) => write!(f, "{}", err),
}
}
}
impl<TErr> error::Error for TransportTimeoutError<TErr>
where TErr: error::Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
TransportTimeoutError::Timeout => None,
TransportTimeoutError::TimerError => None,
TransportTimeoutError::Other(err) => Some(err),
}
}
}

View File

@ -20,6 +20,7 @@
use crate::{
transport::Transport,
transport::TransportError,
upgrade::{
OutboundUpgrade,
InboundUpgrade,
@ -32,6 +33,7 @@ use crate::{
};
use futures::{future::Either, prelude::*, try_ready};
use multiaddr::Multiaddr;
use std::{error, fmt};
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug, Copy, Clone)]
@ -43,36 +45,34 @@ impl<T, U> Upgrade<T, U> {
}
}
impl<D, U, O, E> Transport for Upgrade<D, U>
impl<D, U, O, TUpgrErr> Transport for Upgrade<D, U>
where
D: Transport,
D::Output: AsyncRead + AsyncWrite,
U: InboundUpgrade<D::Output, Output = O, Error = E>,
U: OutboundUpgrade<D::Output, Output = O, Error = E> + Clone,
E: std::error::Error + Send + Sync + 'static
D::Error: 'static,
U: InboundUpgrade<D::Output, Output = O, Error = TUpgrErr>,
U: OutboundUpgrade<D::Output, Output = O, Error = TUpgrErr> + Clone,
TUpgrErr: std::error::Error + Send + Sync + 'static // TODO: remove bounds
{
type Output = O;
type Error = TransportUpgradeError<D::Error, TUpgrErr>;
type Listener = ListenerStream<D::Listener, U>;
type ListenerUpgrade = ListenerUpgradeFuture<D::ListenerUpgrade, U>;
type Dial = DialUpgradeFuture<D::Dial, U>;
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
match self.inner.dial(addr.clone()) {
Ok(outbound) => Ok(DialUpgradeFuture {
future: outbound,
upgrade: Either::A(Some(self.upgrade))
}),
Err((dialer, addr)) => Err((Upgrade::new(dialer, self.upgrade), addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let outbound = self.inner.dial(addr.clone())
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(DialUpgradeFuture {
future: outbound,
upgrade: Either::A(Some(self.upgrade))
})
}
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
match self.inner.listen_on(addr) {
Ok((inbound, addr)) =>
Ok((ListenerStream { stream: inbound, upgrade: self.upgrade }, addr)),
Err((listener, addr)) =>
Err((Upgrade::new(listener, self.upgrade), addr))
}
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
let (inbound, addr) = self.inner.listen_on(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok((ListenerStream { stream: inbound, upgrade: self.upgrade }, addr))
}
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
@ -80,6 +80,41 @@ where
}
}
/// Error produced by a transport upgrade.
#[derive(Debug)]
pub enum TransportUpgradeError<TTransErr, TUpgrErr> {
/// Error in the transport.
Transport(TTransErr),
/// Error while upgrading to a protocol.
Upgrade(UpgradeError<TUpgrErr>),
}
impl<TTransErr, TUpgrErr> fmt::Display for TransportUpgradeError<TTransErr, TUpgrErr>
where
TTransErr: fmt::Display,
TUpgrErr: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
TransportUpgradeError::Transport(e) => write!(f, "Transport error: {}", e),
TransportUpgradeError::Upgrade(e) => write!(f, "Upgrade error: {}", e),
}
}
}
impl<TTransErr, TUpgrErr> error::Error for TransportUpgradeError<TTransErr, TUpgrErr>
where
TTransErr: error::Error + 'static,
TUpgrErr: error::Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
TransportUpgradeError::Transport(e) => Some(e),
TransportUpgradeError::Upgrade(e) => Some(e),
}
}
}
pub struct DialUpgradeFuture<T, U>
where
T: Future,
@ -92,23 +127,23 @@ where
impl<T, U> Future for DialUpgradeFuture<T, U>
where
T: Future<Error = std::io::Error>,
T: Future,
T::Item: AsyncRead + AsyncWrite,
U: OutboundUpgrade<T::Item>,
U::Error: std::error::Error + Send + Sync + 'static
{
type Item = U::Output;
type Error = std::io::Error;
type Error = TransportUpgradeError<T::Error, U::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.upgrade {
Either::A(ref mut up) => {
let x = try_ready!(self.future.poll());
let x = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some).");
Either::B(apply_outbound(x, u))
}
Either::B(ref mut up) => return up.poll().map_err(UpgradeError::into_io_error)
Either::B(ref mut up) => return up.poll().map_err(TransportUpgradeError::Upgrade)
};
self.upgrade = next
}
@ -128,10 +163,10 @@ where
U: InboundUpgrade<F::Item> + Clone
{
type Item = (ListenerUpgradeFuture<F, U>, Multiaddr);
type Error = T::Error;
type Error = TransportUpgradeError<T::Error, U::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.stream.poll()) {
match try_ready!(self.stream.poll().map_err(TransportUpgradeError::Transport)) {
Some((x, a)) => {
let f = ListenerUpgradeFuture {
future: x,
@ -156,23 +191,23 @@ where
impl<T, U> Future for ListenerUpgradeFuture<T, U>
where
T: Future<Error = std::io::Error>,
T: Future,
T::Item: AsyncRead + AsyncWrite,
U: InboundUpgrade<T::Item>,
U::Error: std::error::Error + Send + Sync + 'static
{
type Item = U::Output;
type Error = std::io::Error;
type Error = TransportUpgradeError<T::Error, U::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let next = match self.upgrade {
Either::A(ref mut up) => {
let x = try_ready!(self.future.poll());
let x = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
let u = up.take().expect("ListenerUpgradeFuture is constructed with Either::A(Some).");
Either::B(apply_inbound(x, u))
}
Either::B(ref mut up) => return up.poll().map_err(UpgradeError::into_io_error)
Either::B(ref mut up) => return up.poll().map_err(TransportUpgradeError::Upgrade)
};
self.upgrade = next
}

View File

@ -30,15 +30,6 @@ pub enum UpgradeError<E> {
Apply(E),
}
impl<E> UpgradeError<E>
where
E: std::error::Error + Send + Sync + 'static
{
pub fn into_io_error(self) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, self)
}
}
impl<E> UpgradeError<E> {
pub fn map_err<F, T>(self, f: F) -> UpgradeError<T>
where
@ -72,9 +63,9 @@ where
impl<E> std::error::Error for UpgradeError<E>
where
E: std::error::Error
E: std::error::Error + 'static
{
fn cause(&self) -> Option<&dyn std::error::Error> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
UpgradeError::Select(e) => Some(e),
UpgradeError::Apply(e) => Some(e),