mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-23 23:01:33 +00:00
More precise error passed to inject_dial_upgrade_error (#771)
* More precise error passed to inject_dial_upgrade_error * Fix concerns * Fix panic proof
This commit is contained in:
@ -19,7 +19,7 @@
|
|||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
|
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
|
||||||
upgrade::{
|
upgrade::{
|
||||||
InboundUpgrade,
|
InboundUpgrade,
|
||||||
OutboundUpgrade,
|
OutboundUpgrade,
|
||||||
@ -82,7 +82,7 @@ where
|
|||||||
fn inject_event(&mut self, _: Self::InEvent) {}
|
fn inject_event(&mut self, _: Self::InEvent) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {}
|
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_inbound_closed(&mut self) {}
|
fn inject_inbound_closed(&mut self) {}
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
|
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
|
||||||
upgrade::{
|
upgrade::{
|
||||||
InboundUpgrade,
|
InboundUpgrade,
|
||||||
OutboundUpgrade,
|
OutboundUpgrade,
|
||||||
@ -89,7 +89,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error) {
|
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
|
||||||
self.inner.inject_dial_upgrade_error(info, error)
|
self.inner.inject_dial_upgrade_error(info, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
|
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
|
||||||
upgrade::{
|
upgrade::{
|
||||||
InboundUpgrade,
|
InboundUpgrade,
|
||||||
OutboundUpgrade,
|
OutboundUpgrade,
|
||||||
@ -85,7 +85,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error) {
|
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
|
||||||
self.inner.inject_dial_upgrade_error(info, error)
|
self.inner.inject_dial_upgrade_error(info, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,9 +21,10 @@
|
|||||||
use crate::upgrade::{
|
use crate::upgrade::{
|
||||||
InboundUpgrade,
|
InboundUpgrade,
|
||||||
OutboundUpgrade,
|
OutboundUpgrade,
|
||||||
|
UpgradeError,
|
||||||
};
|
};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use std::{io, time::Duration};
|
use std::{error, fmt, io, time::Duration};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
pub use self::dummy::DummyProtocolsHandler;
|
pub use self::dummy::DummyProtocolsHandler;
|
||||||
@ -124,7 +125,7 @@ pub trait ProtocolsHandler {
|
|||||||
fn inject_event(&mut self, event: Self::InEvent);
|
fn inject_event(&mut self, event: Self::InEvent);
|
||||||
|
|
||||||
/// Indicates to the handler that upgrading a substream to the given protocol has failed.
|
/// Indicates to the handler that upgrading a substream to the given protocol has failed.
|
||||||
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error);
|
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>);
|
||||||
|
|
||||||
/// Indicates to the handler that the inbound part of the muxer has been closed, and that
|
/// Indicates to the handler that the inbound part of the muxer has been closed, and that
|
||||||
/// therefore no more inbound substreams will be produced.
|
/// therefore no more inbound substreams will be produced.
|
||||||
@ -273,3 +274,50 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Error that can happen on an outbound substream opening attempt.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ProtocolsHandlerUpgrErr<TUpgrErr> {
|
||||||
|
/// The opening attempt timed out before the negotiation was fully completed.
|
||||||
|
Timeout,
|
||||||
|
/// There was an error in the timer used.
|
||||||
|
Timer,
|
||||||
|
/// The remote muxer denied the attempt to open a substream.
|
||||||
|
MuxerDeniedSubstream,
|
||||||
|
/// Error while upgrading the substream to the protocol we want.
|
||||||
|
Upgrade(UpgradeError<TUpgrErr>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TUpgrErr> fmt::Display for ProtocolsHandlerUpgrErr<TUpgrErr>
|
||||||
|
where
|
||||||
|
TUpgrErr: fmt::Display,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout => {
|
||||||
|
write!(f, "Timeout error while opening a substream")
|
||||||
|
},
|
||||||
|
ProtocolsHandlerUpgrErr::Timer => {
|
||||||
|
write!(f, "Timer error while opening a substream")
|
||||||
|
},
|
||||||
|
ProtocolsHandlerUpgrErr::MuxerDeniedSubstream => {
|
||||||
|
write!(f, "Remote muxer denied our attempt to open a substream")
|
||||||
|
},
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(err) => write!(f, "{}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TUpgrErr> error::Error for ProtocolsHandlerUpgrErr<TUpgrErr>
|
||||||
|
where
|
||||||
|
TUpgrErr: error::Error + 'static
|
||||||
|
{
|
||||||
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
|
match self {
|
||||||
|
ProtocolsHandlerUpgrErr::Timeout => None,
|
||||||
|
ProtocolsHandlerUpgrErr::Timer => None,
|
||||||
|
ProtocolsHandlerUpgrErr::MuxerDeniedSubstream => None,
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(err) => Some(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent},
|
nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent},
|
||||||
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
|
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
|
||||||
upgrade::{
|
upgrade::{
|
||||||
self,
|
self,
|
||||||
OutboundUpgrade,
|
OutboundUpgrade,
|
||||||
@ -185,7 +185,7 @@ where
|
|||||||
|
|
||||||
self.queued_dial_upgrades.remove(pos);
|
self.queued_dial_upgrades.remove(pos);
|
||||||
self.handler
|
self.handler
|
||||||
.inject_dial_upgrade_error(user_data.1, io::ErrorKind::ConnectionReset.into());
|
.inject_dial_upgrade_error(user_data.1, ProtocolsHandlerUpgrErr::MuxerDeniedSubstream);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -224,8 +224,18 @@ where
|
|||||||
self.negotiating_out.push((upgr_info, in_progress));
|
self.negotiating_out.push((upgr_info, in_progress));
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let msg = format!("Error while upgrading: {:?}", err);
|
let err = if err.is_elapsed() {
|
||||||
let err = io::Error::new(io::ErrorKind::Other, msg);
|
ProtocolsHandlerUpgrErr::Timeout
|
||||||
|
} else if err.is_timer() {
|
||||||
|
ProtocolsHandlerUpgrErr::Timer
|
||||||
|
} else {
|
||||||
|
debug_assert!(err.is_inner());
|
||||||
|
let err = err.into_inner().expect("Timeout error is one of {elapsed, \
|
||||||
|
timer, inner}; is_inner and is_elapsed are both false; error is \
|
||||||
|
inner; QED");
|
||||||
|
ProtocolsHandlerUpgrErr::Upgrade(err)
|
||||||
|
};
|
||||||
|
|
||||||
self.handler.inject_dial_upgrade_error(upgr_info, err);
|
self.handler.inject_dial_upgrade_error(upgr_info, err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,13 +19,15 @@
|
|||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
either::EitherError,
|
||||||
either::EitherOutput,
|
either::EitherOutput,
|
||||||
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
|
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
|
||||||
upgrade::{
|
upgrade::{
|
||||||
InboundUpgrade,
|
InboundUpgrade,
|
||||||
OutboundUpgrade,
|
OutboundUpgrade,
|
||||||
EitherUpgrade,
|
EitherUpgrade,
|
||||||
SelectUpgrade
|
SelectUpgrade,
|
||||||
|
UpgradeError,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
@ -112,10 +114,44 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error) {
|
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
|
||||||
match info {
|
match (info, error) {
|
||||||
EitherOutput::First(info) => self.proto1.inject_dial_upgrade_error(info, error),
|
(EitherOutput::First(info), ProtocolsHandlerUpgrErr::MuxerDeniedSubstream) => {
|
||||||
EitherOutput::Second(info) => self.proto2.inject_dial_upgrade_error(info, error),
|
self.proto1.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::MuxerDeniedSubstream)
|
||||||
|
},
|
||||||
|
(EitherOutput::First(info), ProtocolsHandlerUpgrErr::Timer) => {
|
||||||
|
self.proto1.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer)
|
||||||
|
},
|
||||||
|
(EitherOutput::First(info), ProtocolsHandlerUpgrErr::Timeout) => {
|
||||||
|
self.proto1.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout)
|
||||||
|
},
|
||||||
|
(EitherOutput::First(info), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err))) => {
|
||||||
|
self.proto1.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)))
|
||||||
|
},
|
||||||
|
(EitherOutput::First(info), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err)))) => {
|
||||||
|
self.proto1.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)))
|
||||||
|
},
|
||||||
|
(EitherOutput::First(_), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(_)))) => {
|
||||||
|
panic!("Wrong API usage; the upgrade error doesn't match the outbound open info");
|
||||||
|
},
|
||||||
|
(EitherOutput::Second(info), ProtocolsHandlerUpgrErr::MuxerDeniedSubstream) => {
|
||||||
|
self.proto2.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::MuxerDeniedSubstream)
|
||||||
|
},
|
||||||
|
(EitherOutput::Second(info), ProtocolsHandlerUpgrErr::Timeout) => {
|
||||||
|
self.proto2.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timeout)
|
||||||
|
},
|
||||||
|
(EitherOutput::Second(info), ProtocolsHandlerUpgrErr::Timer) => {
|
||||||
|
self.proto2.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer)
|
||||||
|
},
|
||||||
|
(EitherOutput::Second(info), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err))) => {
|
||||||
|
self.proto2.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)))
|
||||||
|
},
|
||||||
|
(EitherOutput::Second(info), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err)))) => {
|
||||||
|
self.proto2.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)))
|
||||||
|
},
|
||||||
|
(EitherOutput::Second(_), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(_)))) => {
|
||||||
|
panic!("Wrong API usage; the upgrade error doesn't match the outbound open info");
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,8 +28,6 @@ pub enum UpgradeError<E> {
|
|||||||
Select(ProtocolChoiceError),
|
Select(ProtocolChoiceError),
|
||||||
/// Error during the post-negotiation handshake.
|
/// Error during the post-negotiation handshake.
|
||||||
Apply(E),
|
Apply(E),
|
||||||
#[doc(hidden)]
|
|
||||||
__Nonexhaustive
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E> UpgradeError<E>
|
impl<E> UpgradeError<E>
|
||||||
@ -49,7 +47,6 @@ impl<E> UpgradeError<E> {
|
|||||||
match self {
|
match self {
|
||||||
UpgradeError::Select(e) => UpgradeError::Select(e),
|
UpgradeError::Select(e) => UpgradeError::Select(e),
|
||||||
UpgradeError::Apply(e) => UpgradeError::Apply(f(e)),
|
UpgradeError::Apply(e) => UpgradeError::Apply(f(e)),
|
||||||
UpgradeError::__Nonexhaustive => UpgradeError::__Nonexhaustive
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,7 +66,6 @@ where
|
|||||||
match self {
|
match self {
|
||||||
UpgradeError::Select(e) => write!(f, "select error: {}", e),
|
UpgradeError::Select(e) => write!(f, "select error: {}", e),
|
||||||
UpgradeError::Apply(e) => write!(f, "upgrade apply error: {}", e),
|
UpgradeError::Apply(e) => write!(f, "upgrade apply error: {}", e),
|
||||||
UpgradeError::__Nonexhaustive => f.write_str("__Nonexhaustive")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -82,7 +78,6 @@ where
|
|||||||
match self {
|
match self {
|
||||||
UpgradeError::Select(e) => Some(e),
|
UpgradeError::Select(e) => Some(e),
|
||||||
UpgradeError::Apply(e) => Some(e),
|
UpgradeError::Apply(e) => Some(e),
|
||||||
UpgradeError::__Nonexhaustive => None
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ use crate::protocol::{FloodsubCodec, FloodsubConfig, FloodsubRpc};
|
|||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
ProtocolsHandler, ProtocolsHandlerEvent,
|
ProtocolsHandler, ProtocolsHandlerEvent,
|
||||||
|
protocols_handler::ProtocolsHandlerUpgrErr,
|
||||||
upgrade::{InboundUpgrade, OutboundUpgrade}
|
upgrade::{InboundUpgrade, OutboundUpgrade}
|
||||||
};
|
};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
@ -144,7 +145,7 @@ where
|
|||||||
fn inject_inbound_closed(&mut self) {}
|
fn inject_inbound_closed(&mut self) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {}
|
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn shutdown(&mut self) {
|
fn shutdown(&mut self) {
|
||||||
|
@ -23,7 +23,7 @@ use crate::periodic_id_handler::{PeriodicIdHandler, PeriodicIdHandlerEvent};
|
|||||||
use crate::protocol::{IdentifyInfo, IdentifySender, IdentifySenderFuture};
|
use crate::protocol::{IdentifyInfo, IdentifySender, IdentifySenderFuture};
|
||||||
use crate::topology::IdentifyTopology;
|
use crate::topology::IdentifyTopology;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerSelect};
|
use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerSelect, ProtocolsHandlerUpgrErr};
|
||||||
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
|
||||||
use libp2p_core::{Multiaddr, PeerId, either::EitherOutput, topology::Topology};
|
use libp2p_core::{Multiaddr, PeerId, either::EitherOutput, topology::Topology};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
@ -192,6 +192,6 @@ pub enum IdentifyEvent {
|
|||||||
/// Peer that we fail to identify.
|
/// Peer that we fail to identify.
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
error: io::Error,
|
error: ProtocolsHandlerUpgrErr<io::Error>,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -21,8 +21,8 @@
|
|||||||
use crate::protocol::{IdentifySender, IdentifyProtocolConfig};
|
use crate::protocol::{IdentifySender, IdentifyProtocolConfig};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
|
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
|
||||||
upgrade::{DeniedUpgrade, InboundUpgrade}
|
upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}
|
||||||
};
|
};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::io;
|
use std::io;
|
||||||
@ -87,7 +87,7 @@ where
|
|||||||
fn inject_inbound_closed(&mut self) {}
|
fn inject_inbound_closed(&mut self) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {}
|
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn shutdown(&mut self) {
|
fn shutdown(&mut self) {
|
||||||
|
@ -21,7 +21,7 @@
|
|||||||
use crate::protocol::{RemoteInfo, IdentifyProtocolConfig};
|
use crate::protocol::{RemoteInfo, IdentifyProtocolConfig};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
|
protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr},
|
||||||
upgrade::{DeniedUpgrade, OutboundUpgrade}
|
upgrade::{DeniedUpgrade, OutboundUpgrade}
|
||||||
};
|
};
|
||||||
use std::{io, marker::PhantomData, time::{Duration, Instant}};
|
use std::{io, marker::PhantomData, time::{Duration, Instant}};
|
||||||
@ -59,7 +59,7 @@ pub enum PeriodicIdHandlerEvent {
|
|||||||
/// We obtained identification information from the remote
|
/// We obtained identification information from the remote
|
||||||
Identified(RemoteInfo),
|
Identified(RemoteInfo),
|
||||||
/// Failed to identify the remote.
|
/// Failed to identify the remote.
|
||||||
IdentificationError(io::Error),
|
IdentificationError(ProtocolsHandlerUpgrErr<io::Error>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<TSubstream> PeriodicIdHandler<TSubstream> {
|
impl<TSubstream> PeriodicIdHandler<TSubstream> {
|
||||||
@ -110,7 +110,7 @@ where
|
|||||||
fn inject_inbound_closed(&mut self) {}
|
fn inject_inbound_closed(&mut self) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, err: io::Error) {
|
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
|
||||||
self.pending_result = Some(PeriodicIdHandlerEvent::IdentificationError(err));
|
self.pending_result = Some(PeriodicIdHandlerEvent::IdentificationError(err));
|
||||||
if let Some(ref mut next_id) = self.next_id {
|
if let Some(ref mut next_id) = self.next_id {
|
||||||
next_id.reset(Instant::now() + TRY_AGAIN_ON_ERR);
|
next_id.reset(Instant::now() + TRY_AGAIN_ON_ERR);
|
||||||
|
@ -19,14 +19,14 @@
|
|||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent};
|
use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr};
|
||||||
use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId};
|
use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId};
|
||||||
use multihash::Multihash;
|
use multihash::Multihash;
|
||||||
use protocol::{
|
use protocol::{
|
||||||
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
|
KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
|
||||||
KademliaProtocolConfig,
|
KademliaProtocolConfig,
|
||||||
};
|
};
|
||||||
use std::io;
|
use std::{error, fmt, io};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
/// Protocol handler that handles Kademlia communications with the remote.
|
/// Protocol handler that handles Kademlia communications with the remote.
|
||||||
@ -80,7 +80,7 @@ where
|
|||||||
// TODO: add timeout
|
// TODO: add timeout
|
||||||
OutWaitingAnswer(KadOutStreamSink<TSubstream>, TUserData),
|
OutWaitingAnswer(KadOutStreamSink<TSubstream>, TUserData),
|
||||||
/// An error happened on the substream and we should report the error to the user.
|
/// An error happened on the substream and we should report the error to the user.
|
||||||
OutReportError(io::Error, TUserData),
|
OutReportError(KademliaHandlerQueryErr, TUserData),
|
||||||
/// The substream is being closed.
|
/// The substream is being closed.
|
||||||
OutClosing(KadOutStreamSink<TSubstream>),
|
OutClosing(KadOutStreamSink<TSubstream>),
|
||||||
/// Waiting for a request from the remote.
|
/// Waiting for a request from the remote.
|
||||||
@ -168,7 +168,7 @@ pub enum KademliaHandlerEvent<TUserData> {
|
|||||||
/// An error happened when performing a query.
|
/// An error happened when performing a query.
|
||||||
QueryError {
|
QueryError {
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
error: io::Error,
|
error: KademliaHandlerQueryErr,
|
||||||
/// The user data passed to the query.
|
/// The user data passed to the query.
|
||||||
user_data: TUserData,
|
user_data: TUserData,
|
||||||
},
|
},
|
||||||
@ -182,6 +182,50 @@ pub enum KademliaHandlerEvent<TUserData> {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Error that can happen when requesting an RPC query.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum KademliaHandlerQueryErr {
|
||||||
|
/// Error while trying to perform the query.
|
||||||
|
Upgrade(ProtocolsHandlerUpgrErr<io::Error>),
|
||||||
|
/// Received an answer that doesn't correspond to the request.
|
||||||
|
UnexpectedMessage,
|
||||||
|
/// I/O error in the substream.
|
||||||
|
Io(io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for KademliaHandlerQueryErr {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
KademliaHandlerQueryErr::Upgrade(err) => {
|
||||||
|
write!(f, "Error while performing Kademlia query: {}", err)
|
||||||
|
},
|
||||||
|
KademliaHandlerQueryErr::UnexpectedMessage => {
|
||||||
|
write!(f, "Remote answered our Kademlia RPC query with the wrong message type")
|
||||||
|
},
|
||||||
|
KademliaHandlerQueryErr::Io(err) => {
|
||||||
|
write!(f, "I/O error during a Kademlia RPC query: {}", err)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl error::Error for KademliaHandlerQueryErr {
|
||||||
|
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||||
|
match self {
|
||||||
|
KademliaHandlerQueryErr::Upgrade(err) => Some(err),
|
||||||
|
KademliaHandlerQueryErr::UnexpectedMessage => None,
|
||||||
|
KademliaHandlerQueryErr::Io(err) => Some(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ProtocolsHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
|
||||||
|
#[inline]
|
||||||
|
fn from(err: ProtocolsHandlerUpgrErr<io::Error>) -> Self {
|
||||||
|
KademliaHandlerQueryErr::Upgrade(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Event to send to the handler.
|
/// Event to send to the handler.
|
||||||
pub enum KademliaHandlerIn<TUserData> {
|
pub enum KademliaHandlerIn<TUserData> {
|
||||||
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
|
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
|
||||||
@ -434,13 +478,13 @@ where
|
|||||||
fn inject_dial_upgrade_error(
|
fn inject_dial_upgrade_error(
|
||||||
&mut self,
|
&mut self,
|
||||||
(_, user_data): Self::OutboundOpenInfo,
|
(_, user_data): Self::OutboundOpenInfo,
|
||||||
error: io::Error,
|
error: ProtocolsHandlerUpgrErr<io::Error>,
|
||||||
) {
|
) {
|
||||||
// TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't
|
// TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't
|
||||||
// continue trying
|
// continue trying
|
||||||
if let Some(user_data) = user_data {
|
if let Some(user_data) = user_data {
|
||||||
self.substreams
|
self.substreams
|
||||||
.push(SubstreamState::OutReportError(error, user_data));
|
.push(SubstreamState::OutReportError(error.into(), user_data));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -553,8 +597,10 @@ where
|
|||||||
),
|
),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
let event = if let Some(user_data) = user_data {
|
let event = if let Some(user_data) = user_data {
|
||||||
let ev = KademliaHandlerEvent::QueryError { error, user_data };
|
Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
|
||||||
Some(ProtocolsHandlerEvent::Custom(ev))
|
error: KademliaHandlerQueryErr::Io(error),
|
||||||
|
user_data
|
||||||
|
}))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
@ -583,8 +629,10 @@ where
|
|||||||
),
|
),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
let event = if let Some(user_data) = user_data {
|
let event = if let Some(user_data) = user_data {
|
||||||
let ev = KademliaHandlerEvent::QueryError { error, user_data };
|
Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
|
||||||
Some(ProtocolsHandlerEvent::Custom(ev))
|
error: KademliaHandlerQueryErr::Io(error),
|
||||||
|
user_data,
|
||||||
|
}))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
@ -609,12 +657,17 @@ where
|
|||||||
false,
|
false,
|
||||||
),
|
),
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
let event = KademliaHandlerEvent::QueryError { error, user_data };
|
let event = KademliaHandlerEvent::QueryError {
|
||||||
|
error: KademliaHandlerQueryErr::Io(error),
|
||||||
|
user_data,
|
||||||
|
};
|
||||||
(None, Some(ProtocolsHandlerEvent::Custom(event)), false)
|
(None, Some(ProtocolsHandlerEvent::Custom(event)), false)
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
let error = io::Error::new(io::ErrorKind::Other, "unexpected EOF");
|
let event = KademliaHandlerEvent::QueryError {
|
||||||
let event = KademliaHandlerEvent::QueryError { error, user_data };
|
error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()),
|
||||||
|
user_data,
|
||||||
|
};
|
||||||
(None, Some(ProtocolsHandlerEvent::Custom(event)), false)
|
(None, Some(ProtocolsHandlerEvent::Custom(event)), false)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -722,12 +775,8 @@ fn process_kad_response<TUserData>(
|
|||||||
match event {
|
match event {
|
||||||
KadResponseMsg::Pong => {
|
KadResponseMsg::Pong => {
|
||||||
// We never send out pings.
|
// We never send out pings.
|
||||||
let err = io::Error::new(
|
|
||||||
io::ErrorKind::InvalidData,
|
|
||||||
"received unexpected PONG message",
|
|
||||||
);
|
|
||||||
KademliaHandlerEvent::QueryError {
|
KademliaHandlerEvent::QueryError {
|
||||||
error: err,
|
error: KademliaHandlerQueryErr::UnexpectedMessage,
|
||||||
user_data,
|
user_data,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ use libp2p_core::{
|
|||||||
OutboundUpgrade,
|
OutboundUpgrade,
|
||||||
ProtocolsHandler,
|
ProtocolsHandler,
|
||||||
ProtocolsHandlerEvent,
|
ProtocolsHandlerEvent,
|
||||||
|
protocols_handler::ProtocolsHandlerUpgrErr,
|
||||||
upgrade::DeniedUpgrade
|
upgrade::DeniedUpgrade
|
||||||
};
|
};
|
||||||
use log::warn;
|
use log::warn;
|
||||||
@ -183,7 +184,7 @@ where
|
|||||||
fn inject_inbound_closed(&mut self) {}
|
fn inject_inbound_closed(&mut self) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {
|
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {
|
||||||
// In case of error while upgrading, there's not much we can do except shut down.
|
// In case of error while upgrading, there's not much we can do except shut down.
|
||||||
// TODO: we assume that the error is about ping not being supported, which is not
|
// TODO: we assume that the error is about ping not being supported, which is not
|
||||||
// necessarily the case
|
// necessarily the case
|
||||||
|
@ -23,8 +23,10 @@ use arrayvec::ArrayVec;
|
|||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
InboundUpgrade,
|
InboundUpgrade,
|
||||||
|
OutboundUpgrade,
|
||||||
ProtocolsHandler,
|
ProtocolsHandler,
|
||||||
ProtocolsHandlerEvent,
|
ProtocolsHandlerEvent,
|
||||||
|
protocols_handler::ProtocolsHandlerUpgrErr,
|
||||||
upgrade::DeniedUpgrade
|
upgrade::DeniedUpgrade
|
||||||
};
|
};
|
||||||
use log::warn;
|
use log::warn;
|
||||||
@ -101,7 +103,7 @@ where
|
|||||||
fn inject_inbound_closed(&mut self) {}
|
fn inject_inbound_closed(&mut self) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {}
|
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn shutdown(&mut self) {
|
fn shutdown(&mut self) {
|
||||||
|
Reference in New Issue
Block a user