Fix regression w.r.t. reporting of dial errors. (#1493)

* Fix regression w.r.t. reporting of dial errors.

PR [1440] introduced a regression w.r.t. the reporting of
dial errors. In particular, if a connection attempt fails
due to an invalid remote peer ID, any remaining addresses
for the same peer would not be tried (intentional) but
the dial failure would not be reported to the behaviour,
causing e.g. libp2p-kad queries to potentially stall.

In hindsight, I figured it is better to preserve the
previous behaviour to still try alternative addresses
of the peer even on invalid peer ID errors on an earlier
address. In particular because in the context of libp2p-kad
it is not uncommon for peers to report localhost addresses
while the local node actually has e.g. an ipfs node running
on that address, obviously with a different peer ID, which
is the scenario causing frequent invalid peer ID (mismatch)
errors when running the ipfs-kad example.

This commit thus restores the previous behaviour w.r.t.
trying all remaining addresses on invalid peer ID errors
as well as making sure `inject_dial_error` is always
called when the last attempt failed.

[1440]: https://github.com/libp2p/rust-libp2p/pull/1440.

* Remove an fmt::Debug requirement.
This commit is contained in:
Roman Borschel
2020-03-16 16:53:21 +01:00
committed by GitHub
parent 96cd509c60
commit 58ee13b630
8 changed files with 96 additions and 109 deletions

View File

@ -29,6 +29,10 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),
/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),
/// The connection handler produced an error.
Handler(THandlerErr),
}
@ -44,6 +48,8 @@ where
write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) =>
write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) =>
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
@ -57,6 +63,7 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}
@ -71,10 +78,6 @@ pub enum PendingConnectionError<TTransErr> {
/// match the one that was expected or is otherwise invalid.
InvalidPeerId,
/// The pending connection was successfully negotiated but dropped
/// because the connection limit for a peer has been reached.
ConnectionLimit(ConnectionLimit),
/// An I/O error occurred on the connection.
// TODO: Eventually this should also be a custom error?
IO(io::Error),
@ -93,8 +96,6 @@ where
write!(f, "Pending connection: Transport error: {}", err),
PendingConnectionError::InvalidPeerId =>
write!(f, "Pending connection: Invalid peer ID."),
PendingConnectionError::ConnectionLimit(l) =>
write!(f, "Pending connection: Connection limit: {}.", l)
}
}
}
@ -109,7 +110,6 @@ where
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::ConnectionLimit(..) => None,
}
}
}

View File

@ -112,7 +112,7 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
error: PendingConnectionError<TTransErr>,
/// The handler that was supposed to handle the connection,
/// if the connection failed before the handler was consumed.
handler: Option<THandler>,
handler: THandler,
/// The (expected) peer of the failed connection.
peer: Option<TPeerId>,
/// A reference to the pool that managed the connection.
@ -222,6 +222,7 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_pending_incoming {
@ -263,7 +264,7 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone,
TPeerId: Clone + Send + 'static,
{
self.limits.check_outgoing(|| self.iter_pending_outgoing().count())?;
let endpoint = info.to_connected_point();
@ -298,14 +299,32 @@ where
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TPeerId: Clone + Send + 'static,
{
// Validate the received peer ID as the last step of the pending connection
// future, so that these errors can be raised before the `handler` is consumed
// by the background task, which happens when this future resolves to an
// "established" connection.
let future = future.and_then({
let endpoint = endpoint.clone();
let expected_peer = peer.clone();
let local_id = self.local_id.clone();
move |(info, muxer)| {
if let Some(peer) = expected_peer {
if &peer != info.peer_id() {
return future::err(PendingConnectionError::InvalidPeerId)
}
}
if &local_id == info.peer_id() {
return future::err(PendingConnectionError::InvalidPeerId)
}
let connected = Connected { info, endpoint };
future::ready(Ok((connected, muxer)))
}
});
let id = self.manager.add_pending(future, handler);
self.pending.insert(id, (endpoint, peer));
id
@ -536,7 +555,7 @@ where
PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId>
> where
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone,
TPeerId: Clone,
TPeerId: Clone
{
loop {
let item = match self.manager.poll(cx) {
@ -551,7 +570,7 @@ where
id,
endpoint,
error,
handler: Some(handler),
handler,
peer,
pool: self
})
@ -581,39 +600,25 @@ where
.map_or(0, |conns| conns.len());
if let Err(e) = self.limits.check_established(current) {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
let num_established = e.current;
return Poll::Ready(PoolEvent::ConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::ConnectionLimit(e),
connected,
error: ConnectionError::ConnectionLimit(e),
num_established,
pool: self,
handler: None,
})
}
// Check peer ID.
if let Some(peer) = peer {
if &peer != entry.connected().peer_id() {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::InvalidPeerId,
pool: self,
handler: None,
})
// Peer ID checks must already have happened. See `add_pending`.
if cfg!(debug_assertions) {
if &self.local_id == entry.connected().peer_id() {
panic!("Unexpected local peer ID for remote.");
}
if let Some(peer) = peer {
if &peer != entry.connected().peer_id() {
panic!("Unexpected peer ID mismatch.");
}
}
}
if &self.local_id == entry.connected().peer_id() {
let connected = entry.close();
return Poll::Ready(PoolEvent::PendingConnectionError {
id,
endpoint: connected.endpoint,
peer: Some(connected.info.peer_id().clone()),
error: PendingConnectionError::InvalidPeerId,
pool: self,
handler: None,
})
}
// Add the connection to the pool.
let peer = entry.connected().peer_id().clone();