Some Kademlia code cleanup. (#1101)

This commit is contained in:
Roman Borschel 2019-05-08 10:10:58 +02:00 committed by GitHub
parent 8537eb38b9
commit 61b236172b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 151 additions and 241 deletions

View File

@ -70,9 +70,6 @@ where
/// We haven't started opening the outgoing substream yet. /// We haven't started opening the outgoing substream yet.
/// Contains the request we want to send, and the user data if we expect an answer. /// Contains the request we want to send, and the user data if we expect an answer.
OutPendingOpen(KadRequestMsg, Option<TUserData>), OutPendingOpen(KadRequestMsg, Option<TUserData>),
/// We are waiting for the outgoing substream to be upgraded.
/// Contains the request we want to send, and the user data if we expect an answer.
OutPendingUpgrade(KadRequestMsg, Option<TUserData>),
/// Waiting to send a message to the remote. /// Waiting to send a message to the remote.
OutPendingSend( OutPendingSend(
KadOutStreamSink<TSubstream>, KadOutStreamSink<TSubstream>,
@ -111,7 +108,6 @@ where
fn try_close(self) -> AsyncSink<Self> { fn try_close(self) -> AsyncSink<Self> {
match self { match self {
SubstreamState::OutPendingOpen(_, _) SubstreamState::OutPendingOpen(_, _)
| SubstreamState::OutPendingUpgrade(_, _)
| SubstreamState::OutReportError(_, _) => AsyncSink::Ready, | SubstreamState::OutReportError(_, _) => AsyncSink::Ready,
SubstreamState::OutPendingSend(mut stream, _, _) SubstreamState::OutPendingSend(mut stream, _, _)
| SubstreamState::OutPendingFlush(mut stream, _) | SubstreamState::OutPendingFlush(mut stream, _)
@ -405,11 +401,8 @@ where
request_id, request_id,
} => { } => {
let pos = self.substreams.iter().position(|state| match state { let pos = self.substreams.iter().position(|state| match state {
SubstreamState::InWaitingUser(ref conn_id, _) SubstreamState::InWaitingUser(ref conn_id, _) =>
if conn_id == &request_id.connec_unique_id => conn_id == &request_id.connec_unique_id,
{
true
}
_ => false, _ => false,
}); });
@ -562,11 +555,6 @@ where
}; };
(None, Some(ev), false) (None, Some(ev), false)
} }
SubstreamState::OutPendingUpgrade(msg, user_data) => (
Some(SubstreamState::OutPendingUpgrade(msg, user_data)),
None,
false,
),
SubstreamState::OutPendingSend(mut substream, msg, user_data) => { SubstreamState::OutPendingSend(mut substream, msg, user_data) => {
match substream.start_send(msg) { match substream.start_send(msg) {
Ok(AsyncSink::Ready) => ( Ok(AsyncSink::Ready) => (

View File

@ -19,43 +19,11 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
//! Kademlia protocol. Allows peer discovery, records store and records fetch. //! Kademlia protocol. Allows peer discovery, records store and records fetch.
//!
//! # Usage
//!
//! Usage is done in the following steps:
//!
//! - Build a `KadSystemConfig` and a `KadConnecConfig` object that contain the way you want the
//! Kademlia protocol to behave.
//!
//! - Create a swarm that upgrades incoming connections with the `KadConnecConfig`.
//!
//! - Build a `KadSystem` from the `KadSystemConfig`. This requires passing a closure that provides
//! the Kademlia controller of a peer.
//!
//! - You can perform queries using the `KadSystem`.
//!
// TODO: we allow dead_code for now because this library contains a lot of unused code that will // TODO: we allow dead_code for now because this library contains a lot of unused code that will
// be useful later for record store // be useful later for record store
#![allow(dead_code)] #![allow(dead_code)]
// # Crate organization
//
// The crate contains three levels of abstractions over the Kademlia protocol.
//
// - The first level of abstraction is in `protocol`. The API of this module lets you turn a raw
// bytes stream (`AsyncRead + AsyncWrite`) into a `Sink + Stream` of raw but strongly-typed
// Kademlia messages.
//
// - The second level of abstraction is in `kad_server`. Its API lets you upgrade a connection and
// obtain a future (that must be driven to completion), plus a controller. Processing the future
// will automatically respond to Kad requests received by the remote. The controller lets you
// send your own requests to this remote and obtain strongly-typed responses.
//
// - The third level of abstraction is in `high_level`. This module only provides the
// `KademliaSystem`.
//
pub use self::behaviour::{Kademlia, KademliaOut}; pub use self::behaviour::{Kademlia, KademliaOut};
pub use self::kbucket::KBucketsPeerId; pub use self::kbucket::KBucketsPeerId;
pub use self::protocol::KadConnectionType; pub use self::protocol::KadConnectionType;

View File

@ -18,23 +18,24 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
//! Provides the `KadRequestMsg` and `KadResponseMsg` enums of all the possible messages //! The Kademlia connection protocol upgrade and associated message types.
//! transmitted with the Kademlia protocol, and the `KademliaProtocolConfig` connection upgrade.
//! //!
//! The upgrade's output a `Sink + Stream` of messages. //! The connection protocol upgrade is provided by [`KademliaProtocolConfig`], with the
//! //! request and response types [`KadRequestMsg`] and [`KadResponseMsg`], respectively.
//! The `Stream` component is used to poll the underlying transport, and the `Sink` component is //! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used
//! used to send messages. //! to poll the underlying transport for incoming messages, and the `Sink` component
//! is used to send messages to remote peers.
use bytes::BytesMut; use bytes::BytesMut;
use crate::protobuf_structs; use codec::UviBytes;
use futures::{future, sink, stream, Sink, Stream}; use crate::protobuf_structs::dht as proto;
use libp2p_core::{InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId, UpgradeInfo, upgrade::Negotiated}; use futures::{future::{self, FutureResult}, sink, stream, Sink, Stream};
use libp2p_core::{Multiaddr, PeerId};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
use multihash::Multihash; use multihash::Multihash;
use protobuf::{self, Message}; use protobuf::{self, Message};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::{io, iter};
use std::iter;
use tokio_codec::Framed; use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec; use unsigned_varint::codec;
@ -52,10 +53,10 @@ pub enum KadConnectionType {
CannotConnect = 3, CannotConnect = 3,
} }
impl From<protobuf_structs::dht::Message_ConnectionType> for KadConnectionType { impl From<proto::Message_ConnectionType> for KadConnectionType {
#[inline] #[inline]
fn from(raw: protobuf_structs::dht::Message_ConnectionType) -> KadConnectionType { fn from(raw: proto::Message_ConnectionType) -> KadConnectionType {
use crate::protobuf_structs::dht::Message_ConnectionType::{ use proto::Message_ConnectionType::{
CAN_CONNECT, CANNOT_CONNECT, CONNECTED, NOT_CONNECTED CAN_CONNECT, CANNOT_CONNECT, CONNECTED, NOT_CONNECTED
}; };
match raw { match raw {
@ -67,10 +68,10 @@ impl From<protobuf_structs::dht::Message_ConnectionType> for KadConnectionType {
} }
} }
impl Into<protobuf_structs::dht::Message_ConnectionType> for KadConnectionType { impl Into<proto::Message_ConnectionType> for KadConnectionType {
#[inline] #[inline]
fn into(self) -> protobuf_structs::dht::Message_ConnectionType { fn into(self) -> proto::Message_ConnectionType {
use crate::protobuf_structs::dht::Message_ConnectionType::{ use proto::Message_ConnectionType::{
CAN_CONNECT, CANNOT_CONNECT, CONNECTED, NOT_CONNECTED CAN_CONNECT, CANNOT_CONNECT, CONNECTED, NOT_CONNECTED
}; };
match self { match self {
@ -93,19 +94,19 @@ pub struct KadPeer {
pub connection_ty: KadConnectionType, pub connection_ty: KadConnectionType,
} }
impl KadPeer { // Builds a `KadPeer` from a corresponding protobuf message.
// Builds a `KadPeer` from its raw protobuf equivalent. impl TryFrom<&mut proto::Message_Peer> for KadPeer {
// TODO: use TryFrom once stable type Error = io::Error;
fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result<KadPeer, IoError> {
fn try_from(peer: &mut proto::Message_Peer) -> Result<KadPeer, Self::Error> {
// TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or // TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or
// as a special case here // as a special case here
let node_id = PeerId::from_bytes(peer.get_id().to_vec()) let node_id = PeerId::from_bytes(peer.get_id().to_vec())
.map_err(|_| IoError::new(IoErrorKind::InvalidData, "invalid peer id"))?; .map_err(|_| invalid_data("invalid peer id"))?;
let mut addrs = Vec::with_capacity(peer.get_addrs().len()); let mut addrs = Vec::with_capacity(peer.get_addrs().len());
for addr in peer.take_addrs().into_iter() { for addr in peer.take_addrs().into_iter() {
let as_ma = Multiaddr::try_from(addr) let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?;
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?;
addrs.push(as_ma); addrs.push(as_ma);
} }
debug_assert_eq!(addrs.len(), addrs.capacity()); debug_assert_eq!(addrs.len(), addrs.capacity());
@ -120,9 +121,9 @@ impl KadPeer {
} }
} }
impl Into<protobuf_structs::dht::Message_Peer> for KadPeer { impl Into<proto::Message_Peer> for KadPeer {
fn into(self) -> protobuf_structs::dht::Message_Peer { fn into(self) -> proto::Message_Peer {
let mut out = protobuf_structs::dht::Message_Peer::new(); let mut out = proto::Message_Peer::new();
out.set_id(self.node_id.into_bytes()); out.set_id(self.node_id.into_bytes());
for addr in self.multiaddrs { for addr in self.multiaddrs {
out.mut_addrs().push(addr.to_vec()); out.mut_addrs().push(addr.to_vec());
@ -155,23 +156,22 @@ where
C: AsyncRead + AsyncWrite, C: AsyncRead + AsyncWrite,
{ {
type Output = KadInStreamSink<Negotiated<C>>; type Output = KadInStreamSink<Negotiated<C>>;
type Future = future::FutureResult<Self::Output, IoError>; type Future = FutureResult<Self::Output, io::Error>;
type Error = IoError; type Error = io::Error;
#[inline] #[inline]
fn upgrade_inbound(self, incoming: Negotiated<C>, _: Self::Info) -> Self::Future { fn upgrade_inbound(self, incoming: Negotiated<C>, _: Self::Info) -> Self::Future {
let mut codec = codec::UviBytes::default(); let mut codec = UviBytes::default();
codec.set_max_len(4096); codec.set_max_len(4096);
future::ok( future::ok(
Framed::new(incoming, codec) Framed::new(incoming, codec)
.from_err::<IoError>() .from_err()
.with::<_, fn(_) -> _, _>(|response| -> Result<_, IoError> { .with::<_, fn(_) -> _, _>(|response| {
let proto_struct = resp_msg_to_proto(response); let proto_struct = resp_msg_to_proto(response);
proto_struct.write_to_bytes() proto_struct.write_to_bytes().map_err(invalid_data)
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err.to_string()))
}) })
.and_then::<fn(_) -> _, _>(|bytes: BytesMut| { .and_then::<fn(_) -> _, _>(|bytes| {
let request = protobuf::parse_from_bytes(&bytes)?; let request = protobuf::parse_from_bytes(&bytes)?;
proto_to_req_msg(request) proto_to_req_msg(request)
}), }),
@ -184,25 +184,22 @@ where
C: AsyncRead + AsyncWrite, C: AsyncRead + AsyncWrite,
{ {
type Output = KadOutStreamSink<Negotiated<C>>; type Output = KadOutStreamSink<Negotiated<C>>;
type Future = future::FutureResult<Self::Output, IoError>; type Future = FutureResult<Self::Output, io::Error>;
type Error = IoError; type Error = io::Error;
#[inline] #[inline]
fn upgrade_outbound(self, incoming: Negotiated<C>, _: Self::Info) -> Self::Future { fn upgrade_outbound(self, incoming: Negotiated<C>, _: Self::Info) -> Self::Future {
let mut codec = codec::UviBytes::default(); let mut codec = UviBytes::default();
codec.set_max_len(4096); codec.set_max_len(4096);
future::ok( future::ok(
Framed::new(incoming, codec) Framed::new(incoming, codec)
.from_err::<IoError>() .from_err()
.with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> { .with::<_, fn(_) -> _, _>(|request| {
let proto_struct = req_msg_to_proto(request); let proto_struct = req_msg_to_proto(request);
match proto_struct.write_to_bytes() { proto_struct.write_to_bytes().map_err(invalid_data)
Ok(msg) => Ok(msg),
Err(err) => Err(IoError::new(IoErrorKind::Other, err.to_string())),
}
}) })
.and_then::<fn(_) -> _, _>(|bytes: BytesMut| { .and_then::<fn(_) -> _, _>(|bytes| {
let response = protobuf::parse_from_bytes(&bytes)?; let response = protobuf::parse_from_bytes(&bytes)?;
proto_to_resp_msg(response) proto_to_resp_msg(response)
}), }),
@ -211,27 +208,20 @@ where
} }
/// Sink of responses and stream of requests. /// Sink of responses and stream of requests.
pub type KadInStreamSink<S> = stream::AndThen< pub type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
sink::With<
stream::FromErr<Framed<S, codec::UviBytes<Vec<u8>>>, IoError>,
KadResponseMsg,
fn(KadResponseMsg) -> Result<Vec<u8>, IoError>,
Result<Vec<u8>, IoError>,
>,
fn(BytesMut) -> Result<KadRequestMsg, IoError>,
Result<KadRequestMsg, IoError>,
>;
/// Sink of requests and stream of responses. /// Sink of requests and stream of responses.
pub type KadOutStreamSink<S> = stream::AndThen< pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
pub type KadStreamSink<S, A, B> = stream::AndThen<
sink::With< sink::With<
stream::FromErr<Framed<S, codec::UviBytes<Vec<u8>>>, IoError>, stream::FromErr<Framed<S, UviBytes<Vec<u8>>>, io::Error>,
KadRequestMsg, A,
fn(KadRequestMsg) -> Result<Vec<u8>, IoError>, fn(A) -> Result<Vec<u8>, io::Error>,
Result<Vec<u8>, IoError>, Result<Vec<u8>, io::Error>,
>, >,
fn(BytesMut) -> Result<KadResponseMsg, IoError>, fn(BytesMut) -> Result<B, io::Error>,
Result<KadResponseMsg, IoError>, Result<B, io::Error>,
>; >;
/// Request that we can send to a peer or that we received from a peer. /// Request that we can send to a peer or that we received from a peer.
@ -284,31 +274,31 @@ pub enum KadResponseMsg {
}, },
} }
// Turns a type-safe Kadmelia message into the corresponding raw protobuf message. /// Converts a `KadRequestMsg` into the corresponding protobuf message for sending.
fn req_msg_to_proto(kad_msg: KadRequestMsg) -> protobuf_structs::dht::Message { fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
match kad_msg { match kad_msg {
KadRequestMsg::Ping => { KadRequestMsg::Ping => {
let mut msg = protobuf_structs::dht::Message::new(); let mut msg = proto::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::PING); msg.set_field_type(proto::Message_MessageType::PING);
msg msg
} }
KadRequestMsg::FindNode { key } => { KadRequestMsg::FindNode { key } => {
let mut msg = protobuf_structs::dht::Message::new(); let mut msg = proto::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE); msg.set_field_type(proto::Message_MessageType::FIND_NODE);
msg.set_key(key.into_bytes()); msg.set_key(key.into_bytes());
msg.set_clusterLevelRaw(10); msg.set_clusterLevelRaw(10);
msg msg
} }
KadRequestMsg::GetProviders { key } => { KadRequestMsg::GetProviders { key } => {
let mut msg = protobuf_structs::dht::Message::new(); let mut msg = proto::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_PROVIDERS); msg.set_field_type(proto::Message_MessageType::GET_PROVIDERS);
msg.set_key(key.into_bytes()); msg.set_key(key.into_bytes());
msg.set_clusterLevelRaw(10); msg.set_clusterLevelRaw(10);
msg msg
} }
KadRequestMsg::AddProvider { key, provider_peer } => { KadRequestMsg::AddProvider { key, provider_peer } => {
let mut msg = protobuf_structs::dht::Message::new(); let mut msg = proto::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::ADD_PROVIDER); msg.set_field_type(proto::Message_MessageType::ADD_PROVIDER);
msg.set_clusterLevelRaw(10); msg.set_clusterLevelRaw(10);
msg.set_key(key.into_bytes()); msg.set_key(key.into_bytes());
msg.mut_providerPeers().push(provider_peer.into()); msg.mut_providerPeers().push(provider_peer.into());
@ -317,17 +307,17 @@ fn req_msg_to_proto(kad_msg: KadRequestMsg) -> protobuf_structs::dht::Message {
} }
} }
// Turns a type-safe Kadmelia message into the corresponding raw protobuf message. /// Converts a `KadResponseMsg` into the corresponding protobuf message for sending.
fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> protobuf_structs::dht::Message { fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
match kad_msg { match kad_msg {
KadResponseMsg::Pong => { KadResponseMsg::Pong => {
let mut msg = protobuf_structs::dht::Message::new(); let mut msg = proto::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::PING); msg.set_field_type(proto::Message_MessageType::PING);
msg msg
} }
KadResponseMsg::FindNode { closer_peers } => { KadResponseMsg::FindNode { closer_peers } => {
let mut msg = protobuf_structs::dht::Message::new(); let mut msg = proto::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE); msg.set_field_type(proto::Message_MessageType::FIND_NODE);
msg.set_clusterLevelRaw(9); msg.set_clusterLevelRaw(9);
for peer in closer_peers { for peer in closer_peers {
msg.mut_closerPeers().push(peer.into()); msg.mut_closerPeers().push(peer.into());
@ -338,8 +328,8 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> protobuf_structs::dht::Message
closer_peers, closer_peers,
provider_peers, provider_peers,
} => { } => {
let mut msg = protobuf_structs::dht::Message::new(); let mut msg = proto::Message::new();
msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_PROVIDERS); msg.set_field_type(proto::Message_MessageType::GET_PROVIDERS);
msg.set_clusterLevelRaw(9); msg.set_clusterLevelRaw(9);
for peer in closer_peers { for peer in closer_peers {
msg.mut_closerPeers().push(peer.into()); msg.mut_closerPeers().push(peer.into());
@ -352,96 +342,80 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> protobuf_structs::dht::Message
} }
} }
/// Turns a raw Kademlia message into a type-safe message. /// Converts a received protobuf message into a corresponding `KadRequestMsg`.
fn proto_to_req_msg(mut message: protobuf_structs::dht::Message) -> Result<KadRequestMsg, IoError> { ///
/// Fails if the protobuf message is not a valid and supported Kademlia request message.
fn proto_to_req_msg(mut message: proto::Message) -> Result<KadRequestMsg, io::Error> {
match message.get_field_type() { match message.get_field_type() {
protobuf_structs::dht::Message_MessageType::PING => Ok(KadRequestMsg::Ping), proto::Message_MessageType::PING => Ok(KadRequestMsg::Ping),
protobuf_structs::dht::Message_MessageType::PUT_VALUE => { proto::Message_MessageType::PUT_VALUE =>
Err(IoError::new( Err(invalid_data("Unsupported: PUT_VALUE message.")),
IoErrorKind::InvalidData,
"received a PUT_VALUE message, but this is not supported by rust-libp2p yet",
))
}
protobuf_structs::dht::Message_MessageType::GET_VALUE => { proto::Message_MessageType::GET_VALUE =>
Err(IoError::new( Err(invalid_data("Unsupported: GET_VALUE message.")),
IoErrorKind::InvalidData,
"received a GET_VALUE message, but this is not supported by rust-libp2p yet",
))
}
protobuf_structs::dht::Message_MessageType::FIND_NODE => { proto::Message_MessageType::FIND_NODE => {
let key = PeerId::from_bytes(message.take_key()).map_err(|_| { let key = PeerId::from_bytes(message.take_key())
IoError::new(IoErrorKind::InvalidData, "invalid peer id in FIND_NODE") .map_err(|_| invalid_data("Invalid peer id in FIND_NODE"))?;
})?;
Ok(KadRequestMsg::FindNode { key }) Ok(KadRequestMsg::FindNode { key })
} }
protobuf_structs::dht::Message_MessageType::GET_PROVIDERS => { proto::Message_MessageType::GET_PROVIDERS => {
let key = Multihash::from_bytes(message.take_key()) let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?;
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?;
Ok(KadRequestMsg::GetProviders { key }) Ok(KadRequestMsg::GetProviders { key })
} }
protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => { proto::Message_MessageType::ADD_PROVIDER => {
// TODO: for now we don't parse the peer properly, so it is possible that we get // TODO: for now we don't parse the peer properly, so it is possible that we get
// parsing errors for peers even when they are valid; we ignore these // parsing errors for peers even when they are valid; we ignore these
// errors for now, but ultimately we should just error altogether // errors for now, but ultimately we should just error altogether
let provider_peer = message let provider_peer = message
.mut_providerPeers() .mut_providerPeers()
.iter_mut() .iter_mut()
.filter_map(|peer| KadPeer::from_peer(peer).ok()) .find_map(|peer| KadPeer::try_from(peer).ok());
.next();
if let Some(provider_peer) = provider_peer { if let Some(provider_peer) = provider_peer {
let key = Multihash::from_bytes(message.take_key()) let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?;
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?;
Ok(KadRequestMsg::AddProvider { key, provider_peer }) Ok(KadRequestMsg::AddProvider { key, provider_peer })
} else { } else {
Err(IoError::new( Err(invalid_data("ADD_PROVIDER message with no valid peer."))
IoErrorKind::InvalidData,
"received an ADD_PROVIDER message with no valid peer",
))
} }
} }
} }
} }
/// Turns a raw Kademlia message into a type-safe message. /// Converts a received protobuf message into a corresponding `KadResponseMessage`.
fn proto_to_resp_msg( ///
mut message: protobuf_structs::dht::Message, /// Fails if the protobuf message is not a valid and supported Kademlia response message.
) -> Result<KadResponseMsg, IoError> { fn proto_to_resp_msg(mut message: proto::Message) -> Result<KadResponseMsg, io::Error> {
match message.get_field_type() { match message.get_field_type() {
protobuf_structs::dht::Message_MessageType::PING => Ok(KadResponseMsg::Pong), proto::Message_MessageType::PING => Ok(KadResponseMsg::Pong),
protobuf_structs::dht::Message_MessageType::GET_VALUE => { proto::Message_MessageType::GET_VALUE =>
Err(IoError::new( Err(invalid_data("Unsupported: GET_VALUE message")),
IoErrorKind::InvalidData,
"received a GET_VALUE message, but this is not supported by rust-libp2p yet",
))
}
protobuf_structs::dht::Message_MessageType::FIND_NODE => { proto::Message_MessageType::FIND_NODE => {
let closer_peers = message let closer_peers = message
.mut_closerPeers() .mut_closerPeers()
.iter_mut() .iter_mut()
.filter_map(|peer| KadPeer::from_peer(peer).ok()) .filter_map(|peer| KadPeer::try_from(peer).ok())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(KadResponseMsg::FindNode { closer_peers }) Ok(KadResponseMsg::FindNode { closer_peers })
} }
protobuf_structs::dht::Message_MessageType::GET_PROVIDERS => { proto::Message_MessageType::GET_PROVIDERS => {
let closer_peers = message let closer_peers = message
.mut_closerPeers() .mut_closerPeers()
.iter_mut() .iter_mut()
.filter_map(|peer| KadPeer::from_peer(peer).ok()) .filter_map(|peer| KadPeer::try_from(peer).ok())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let provider_peers = message let provider_peers = message
.mut_providerPeers() .mut_providerPeers()
.iter_mut() .iter_mut()
.filter_map(|peer| KadPeer::from_peer(peer).ok()) .filter_map(|peer| KadPeer::try_from(peer).ok())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(KadResponseMsg::GetProviders { Ok(KadResponseMsg::GetProviders {
@ -450,18 +424,22 @@ fn proto_to_resp_msg(
}) })
} }
protobuf_structs::dht::Message_MessageType::PUT_VALUE => Err(IoError::new( proto::Message_MessageType::PUT_VALUE =>
IoErrorKind::InvalidData, Err(invalid_data("received an unexpected PUT_VALUE message")),
"received an unexpected PUT_VALUE message",
)),
protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => Err(IoError::new( proto::Message_MessageType::ADD_PROVIDER =>
IoErrorKind::InvalidData, Err(invalid_data("received an unexpected ADD_PROVIDER message"))
"received an unexpected ADD_PROVIDER message",
)),
} }
} }
/// Creates an `io::Error` with `io::ErrorKind::InvalidData`.
fn invalid_data<E>(e: E) -> io::Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>
{
io::Error::new(io::ErrorKind::InvalidData, e)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@ -267,16 +267,12 @@ where
let state = self let state = self
.closest_peers .closest_peers
.iter_mut() .iter_mut()
.filter_map( .find_map(|(peer_id, state)|
|(peer_id, state)| { if peer_id == id {
if peer_id == id { Some(state)
Some(state) } else {
} else { None
None });
}
},
)
.next();
match state { match state {
Some(state @ &mut QueryPeerState::InProgress(_)) => *state = QueryPeerState::Failed, Some(state @ &mut QueryPeerState::InProgress(_)) => *state = QueryPeerState::Failed,
@ -295,67 +291,47 @@ where
let mut active_counter = 0; let mut active_counter = 0;
// While iterating over peers, count the number of queries in a row (from closer to further // While iterating over peers, count the number of queries in a row (from closer to further
// away from target) that are in the succeeded in state. // away from target) that are in the succeeded state.
// Contains `None` if the chain is broken. let mut succeeded_counter = 0;
let mut succeeded_counter = Some(0);
// Extract `self.num_results` to avoid borrowing errors with closures. // Extract `self.num_results` to avoid borrowing errors with closures.
let num_results = self.num_results; let num_results = self.num_results;
for &mut (ref peer_id, ref mut state) in self.closest_peers.iter_mut() { for &mut (ref peer_id, ref mut state) in self.closest_peers.iter_mut() {
// Start by "killing" the query if it timed out. // Start by "killing" the query if it timed out.
{ if let QueryPeerState::InProgress(timeout) = state {
let timed_out = match state { match timeout.poll() {
QueryPeerState::InProgress(timeout) => match timeout.poll() { Ok(Async::Ready(_)) | Err(_) => {
Ok(Async::Ready(_)) | Err(_) => true, *state = QueryPeerState::Failed;
Ok(Async::NotReady) => false, return Async::Ready(QueryStatePollOut::CancelRpc { peer_id });
}, }
_ => false, Ok(Async::NotReady) => {
}; active_counter += 1
if timed_out {
*state = QueryPeerState::Failed;
return Async::Ready(QueryStatePollOut::CancelRpc { peer_id });
}
}
// Increment the local counters.
match state {
QueryPeerState::InProgress(_) => {
active_counter += 1;
}
QueryPeerState::Succeeded => {
if let Some(ref mut c) = succeeded_counter {
*c += 1;
} }
} }
_ => (),
};
// We have enough results; the query is done.
if succeeded_counter
.as_ref()
.map(|&c| c >= num_results)
.unwrap_or(false)
{
return Async::Ready(QueryStatePollOut::Finished);
} }
// Dial the node if it needs dialing. if let QueryPeerState::Succeeded = state {
let need_connect = match state { succeeded_counter += 1;
QueryPeerState::NotContacted => match self.stage { // If we have enough results; the query is done.
QueryStage::Iterating { .. } => active_counter < self.parallelism, if succeeded_counter >= num_results {
QueryStage::Frozen => true, // TODO: as an optimization, could be false if we're not trying to find peers return Async::Ready(QueryStatePollOut::Finished)
}, }
_ => false, }
};
if need_connect { if let QueryPeerState::NotContacted = state {
let delay = Delay::new(Instant::now() + self.rpc_timeout); let connect = match self.stage {
*state = QueryPeerState::InProgress(delay); QueryStage::Frozen => true,
return Async::Ready(QueryStatePollOut::SendRpc { QueryStage::Iterating {..} => active_counter < self.parallelism,
peer_id, };
query_target: &self.target, if connect {
}); let delay = Delay::new(Instant::now() + self.rpc_timeout);
*state = QueryPeerState::InProgress(delay);
return Async::Ready(QueryStatePollOut::SendRpc {
peer_id,
query_target: &self.target,
});
}
} }
} }