From 61b236172b4441c98ed6a62836e023f53ea0fe41 Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Wed, 8 May 2019 10:10:58 +0200 Subject: [PATCH] Some Kademlia code cleanup. (#1101) --- protocols/kad/src/handler.rs | 16 +-- protocols/kad/src/lib.rs | 32 ----- protocols/kad/src/protocol.rs | 248 ++++++++++++++++------------------ protocols/kad/src/query.rs | 96 +++++-------- 4 files changed, 151 insertions(+), 241 deletions(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 461243c9..5fbbfe74 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -70,9 +70,6 @@ where /// We haven't started opening the outgoing substream yet. /// Contains the request we want to send, and the user data if we expect an answer. OutPendingOpen(KadRequestMsg, Option), - /// We are waiting for the outgoing substream to be upgraded. - /// Contains the request we want to send, and the user data if we expect an answer. - OutPendingUpgrade(KadRequestMsg, Option), /// Waiting to send a message to the remote. OutPendingSend( KadOutStreamSink, @@ -111,7 +108,6 @@ where fn try_close(self) -> AsyncSink { match self { SubstreamState::OutPendingOpen(_, _) - | SubstreamState::OutPendingUpgrade(_, _) | SubstreamState::OutReportError(_, _) => AsyncSink::Ready, SubstreamState::OutPendingSend(mut stream, _, _) | SubstreamState::OutPendingFlush(mut stream, _) @@ -405,11 +401,8 @@ where request_id, } => { let pos = self.substreams.iter().position(|state| match state { - SubstreamState::InWaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } + SubstreamState::InWaitingUser(ref conn_id, _) => + conn_id == &request_id.connec_unique_id, _ => false, }); @@ -562,11 +555,6 @@ where }; (None, Some(ev), false) } - SubstreamState::OutPendingUpgrade(msg, user_data) => ( - Some(SubstreamState::OutPendingUpgrade(msg, user_data)), - None, - false, - ), SubstreamState::OutPendingSend(mut substream, msg, user_data) => { match substream.start_send(msg) { Ok(AsyncSink::Ready) => ( diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 6d523f8e..37c336de 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -19,43 +19,11 @@ // DEALINGS IN THE SOFTWARE. //! Kademlia protocol. Allows peer discovery, records store and records fetch. -//! -//! # Usage -//! -//! Usage is done in the following steps: -//! -//! - Build a `KadSystemConfig` and a `KadConnecConfig` object that contain the way you want the -//! Kademlia protocol to behave. -//! -//! - Create a swarm that upgrades incoming connections with the `KadConnecConfig`. -//! -//! - Build a `KadSystem` from the `KadSystemConfig`. This requires passing a closure that provides -//! the Kademlia controller of a peer. -//! -//! - You can perform queries using the `KadSystem`. -//! // TODO: we allow dead_code for now because this library contains a lot of unused code that will // be useful later for record store #![allow(dead_code)] -// # Crate organization -// -// The crate contains three levels of abstractions over the Kademlia protocol. -// -// - The first level of abstraction is in `protocol`. The API of this module lets you turn a raw -// bytes stream (`AsyncRead + AsyncWrite`) into a `Sink + Stream` of raw but strongly-typed -// Kademlia messages. -// -// - The second level of abstraction is in `kad_server`. Its API lets you upgrade a connection and -// obtain a future (that must be driven to completion), plus a controller. Processing the future -// will automatically respond to Kad requests received by the remote. The controller lets you -// send your own requests to this remote and obtain strongly-typed responses. -// -// - The third level of abstraction is in `high_level`. This module only provides the -// `KademliaSystem`. -// - pub use self::behaviour::{Kademlia, KademliaOut}; pub use self::kbucket::KBucketsPeerId; pub use self::protocol::KadConnectionType; diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index e4101d2c..43aa7b74 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -18,23 +18,24 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Provides the `KadRequestMsg` and `KadResponseMsg` enums of all the possible messages -//! transmitted with the Kademlia protocol, and the `KademliaProtocolConfig` connection upgrade. +//! The Kademlia connection protocol upgrade and associated message types. //! -//! The upgrade's output a `Sink + Stream` of messages. -//! -//! The `Stream` component is used to poll the underlying transport, and the `Sink` component is -//! used to send messages. +//! The connection protocol upgrade is provided by [`KademliaProtocolConfig`], with the +//! request and response types [`KadRequestMsg`] and [`KadResponseMsg`], respectively. +//! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used +//! to poll the underlying transport for incoming messages, and the `Sink` component +//! is used to send messages to remote peers. use bytes::BytesMut; -use crate::protobuf_structs; -use futures::{future, sink, stream, Sink, Stream}; -use libp2p_core::{InboundUpgrade, Multiaddr, OutboundUpgrade, PeerId, UpgradeInfo, upgrade::Negotiated}; +use codec::UviBytes; +use crate::protobuf_structs::dht as proto; +use futures::{future::{self, FutureResult}, sink, stream, Sink, Stream}; +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use multihash::Multihash; use protobuf::{self, Message}; use std::convert::TryFrom; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::iter; +use std::{io, iter}; use tokio_codec::Framed; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec; @@ -52,10 +53,10 @@ pub enum KadConnectionType { CannotConnect = 3, } -impl From for KadConnectionType { +impl From for KadConnectionType { #[inline] - fn from(raw: protobuf_structs::dht::Message_ConnectionType) -> KadConnectionType { - use crate::protobuf_structs::dht::Message_ConnectionType::{ + fn from(raw: proto::Message_ConnectionType) -> KadConnectionType { + use proto::Message_ConnectionType::{ CAN_CONNECT, CANNOT_CONNECT, CONNECTED, NOT_CONNECTED }; match raw { @@ -67,10 +68,10 @@ impl From for KadConnectionType { } } -impl Into for KadConnectionType { +impl Into for KadConnectionType { #[inline] - fn into(self) -> protobuf_structs::dht::Message_ConnectionType { - use crate::protobuf_structs::dht::Message_ConnectionType::{ + fn into(self) -> proto::Message_ConnectionType { + use proto::Message_ConnectionType::{ CAN_CONNECT, CANNOT_CONNECT, CONNECTED, NOT_CONNECTED }; match self { @@ -93,19 +94,19 @@ pub struct KadPeer { pub connection_ty: KadConnectionType, } -impl KadPeer { - // Builds a `KadPeer` from its raw protobuf equivalent. - // TODO: use TryFrom once stable - fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result { +// Builds a `KadPeer` from a corresponding protobuf message. +impl TryFrom<&mut proto::Message_Peer> for KadPeer { + type Error = io::Error; + + fn try_from(peer: &mut proto::Message_Peer) -> Result { // TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or // as a special case here let node_id = PeerId::from_bytes(peer.get_id().to_vec()) - .map_err(|_| IoError::new(IoErrorKind::InvalidData, "invalid peer id"))?; + .map_err(|_| invalid_data("invalid peer id"))?; let mut addrs = Vec::with_capacity(peer.get_addrs().len()); for addr in peer.take_addrs().into_iter() { - let as_ma = Multiaddr::try_from(addr) - .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?; + let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?; addrs.push(as_ma); } debug_assert_eq!(addrs.len(), addrs.capacity()); @@ -120,9 +121,9 @@ impl KadPeer { } } -impl Into for KadPeer { - fn into(self) -> protobuf_structs::dht::Message_Peer { - let mut out = protobuf_structs::dht::Message_Peer::new(); +impl Into for KadPeer { + fn into(self) -> proto::Message_Peer { + let mut out = proto::Message_Peer::new(); out.set_id(self.node_id.into_bytes()); for addr in self.multiaddrs { out.mut_addrs().push(addr.to_vec()); @@ -155,23 +156,22 @@ where C: AsyncRead + AsyncWrite, { type Output = KadInStreamSink>; - type Future = future::FutureResult; - type Error = IoError; + type Future = FutureResult; + type Error = io::Error; #[inline] fn upgrade_inbound(self, incoming: Negotiated, _: Self::Info) -> Self::Future { - let mut codec = codec::UviBytes::default(); + let mut codec = UviBytes::default(); codec.set_max_len(4096); future::ok( Framed::new(incoming, codec) - .from_err::() - .with::<_, fn(_) -> _, _>(|response| -> Result<_, IoError> { + .from_err() + .with::<_, fn(_) -> _, _>(|response| { let proto_struct = resp_msg_to_proto(response); - proto_struct.write_to_bytes() - .map_err(|err| IoError::new(IoErrorKind::InvalidData, err.to_string())) + proto_struct.write_to_bytes().map_err(invalid_data) }) - .and_then:: _, _>(|bytes: BytesMut| { + .and_then:: _, _>(|bytes| { let request = protobuf::parse_from_bytes(&bytes)?; proto_to_req_msg(request) }), @@ -184,25 +184,22 @@ where C: AsyncRead + AsyncWrite, { type Output = KadOutStreamSink>; - type Future = future::FutureResult; - type Error = IoError; + type Future = FutureResult; + type Error = io::Error; #[inline] fn upgrade_outbound(self, incoming: Negotiated, _: Self::Info) -> Self::Future { - let mut codec = codec::UviBytes::default(); + let mut codec = UviBytes::default(); codec.set_max_len(4096); future::ok( Framed::new(incoming, codec) - .from_err::() - .with::<_, fn(_) -> _, _>(|request| -> Result<_, IoError> { + .from_err() + .with::<_, fn(_) -> _, _>(|request| { let proto_struct = req_msg_to_proto(request); - match proto_struct.write_to_bytes() { - Ok(msg) => Ok(msg), - Err(err) => Err(IoError::new(IoErrorKind::Other, err.to_string())), - } + proto_struct.write_to_bytes().map_err(invalid_data) }) - .and_then:: _, _>(|bytes: BytesMut| { + .and_then:: _, _>(|bytes| { let response = protobuf::parse_from_bytes(&bytes)?; proto_to_resp_msg(response) }), @@ -211,27 +208,20 @@ where } /// Sink of responses and stream of requests. -pub type KadInStreamSink = stream::AndThen< - sink::With< - stream::FromErr>>, IoError>, - KadResponseMsg, - fn(KadResponseMsg) -> Result, IoError>, - Result, IoError>, - >, - fn(BytesMut) -> Result, - Result, ->; +pub type KadInStreamSink = KadStreamSink; /// Sink of requests and stream of responses. -pub type KadOutStreamSink = stream::AndThen< +pub type KadOutStreamSink = KadStreamSink; + +pub type KadStreamSink = stream::AndThen< sink::With< - stream::FromErr>>, IoError>, - KadRequestMsg, - fn(KadRequestMsg) -> Result, IoError>, - Result, IoError>, + stream::FromErr>>, io::Error>, + A, + fn(A) -> Result, io::Error>, + Result, io::Error>, >, - fn(BytesMut) -> Result, - Result, + fn(BytesMut) -> Result, + Result, >; /// Request that we can send to a peer or that we received from a peer. @@ -284,31 +274,31 @@ pub enum KadResponseMsg { }, } -// Turns a type-safe Kadmelia message into the corresponding raw protobuf message. -fn req_msg_to_proto(kad_msg: KadRequestMsg) -> protobuf_structs::dht::Message { +/// Converts a `KadRequestMsg` into the corresponding protobuf message for sending. +fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message { match kad_msg { KadRequestMsg::Ping => { - let mut msg = protobuf_structs::dht::Message::new(); - msg.set_field_type(protobuf_structs::dht::Message_MessageType::PING); + let mut msg = proto::Message::new(); + msg.set_field_type(proto::Message_MessageType::PING); msg } KadRequestMsg::FindNode { key } => { - let mut msg = protobuf_structs::dht::Message::new(); - msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE); + let mut msg = proto::Message::new(); + msg.set_field_type(proto::Message_MessageType::FIND_NODE); msg.set_key(key.into_bytes()); msg.set_clusterLevelRaw(10); msg } KadRequestMsg::GetProviders { key } => { - let mut msg = protobuf_structs::dht::Message::new(); - msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_PROVIDERS); + let mut msg = proto::Message::new(); + msg.set_field_type(proto::Message_MessageType::GET_PROVIDERS); msg.set_key(key.into_bytes()); msg.set_clusterLevelRaw(10); msg } KadRequestMsg::AddProvider { key, provider_peer } => { - let mut msg = protobuf_structs::dht::Message::new(); - msg.set_field_type(protobuf_structs::dht::Message_MessageType::ADD_PROVIDER); + let mut msg = proto::Message::new(); + msg.set_field_type(proto::Message_MessageType::ADD_PROVIDER); msg.set_clusterLevelRaw(10); msg.set_key(key.into_bytes()); msg.mut_providerPeers().push(provider_peer.into()); @@ -317,17 +307,17 @@ fn req_msg_to_proto(kad_msg: KadRequestMsg) -> protobuf_structs::dht::Message { } } -// Turns a type-safe Kadmelia message into the corresponding raw protobuf message. -fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> protobuf_structs::dht::Message { +/// Converts a `KadResponseMsg` into the corresponding protobuf message for sending. +fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message { match kad_msg { KadResponseMsg::Pong => { - let mut msg = protobuf_structs::dht::Message::new(); - msg.set_field_type(protobuf_structs::dht::Message_MessageType::PING); + let mut msg = proto::Message::new(); + msg.set_field_type(proto::Message_MessageType::PING); msg } KadResponseMsg::FindNode { closer_peers } => { - let mut msg = protobuf_structs::dht::Message::new(); - msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE); + let mut msg = proto::Message::new(); + msg.set_field_type(proto::Message_MessageType::FIND_NODE); msg.set_clusterLevelRaw(9); for peer in closer_peers { msg.mut_closerPeers().push(peer.into()); @@ -338,8 +328,8 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> protobuf_structs::dht::Message closer_peers, provider_peers, } => { - let mut msg = protobuf_structs::dht::Message::new(); - msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_PROVIDERS); + let mut msg = proto::Message::new(); + msg.set_field_type(proto::Message_MessageType::GET_PROVIDERS); msg.set_clusterLevelRaw(9); for peer in closer_peers { msg.mut_closerPeers().push(peer.into()); @@ -352,96 +342,80 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> protobuf_structs::dht::Message } } -/// Turns a raw Kademlia message into a type-safe message. -fn proto_to_req_msg(mut message: protobuf_structs::dht::Message) -> Result { +/// Converts a received protobuf message into a corresponding `KadRequestMsg`. +/// +/// Fails if the protobuf message is not a valid and supported Kademlia request message. +fn proto_to_req_msg(mut message: proto::Message) -> Result { match message.get_field_type() { - protobuf_structs::dht::Message_MessageType::PING => Ok(KadRequestMsg::Ping), + proto::Message_MessageType::PING => Ok(KadRequestMsg::Ping), - protobuf_structs::dht::Message_MessageType::PUT_VALUE => { - Err(IoError::new( - IoErrorKind::InvalidData, - "received a PUT_VALUE message, but this is not supported by rust-libp2p yet", - )) - } + proto::Message_MessageType::PUT_VALUE => + Err(invalid_data("Unsupported: PUT_VALUE message.")), - protobuf_structs::dht::Message_MessageType::GET_VALUE => { - Err(IoError::new( - IoErrorKind::InvalidData, - "received a GET_VALUE message, but this is not supported by rust-libp2p yet", - )) - } + proto::Message_MessageType::GET_VALUE => + Err(invalid_data("Unsupported: GET_VALUE message.")), - protobuf_structs::dht::Message_MessageType::FIND_NODE => { - let key = PeerId::from_bytes(message.take_key()).map_err(|_| { - IoError::new(IoErrorKind::InvalidData, "invalid peer id in FIND_NODE") - })?; + proto::Message_MessageType::FIND_NODE => { + let key = PeerId::from_bytes(message.take_key()) + .map_err(|_| invalid_data("Invalid peer id in FIND_NODE"))?; Ok(KadRequestMsg::FindNode { key }) } - protobuf_structs::dht::Message_MessageType::GET_PROVIDERS => { - let key = Multihash::from_bytes(message.take_key()) - .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?; + proto::Message_MessageType::GET_PROVIDERS => { + let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?; Ok(KadRequestMsg::GetProviders { key }) } - protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => { + proto::Message_MessageType::ADD_PROVIDER => { // TODO: for now we don't parse the peer properly, so it is possible that we get // parsing errors for peers even when they are valid; we ignore these // errors for now, but ultimately we should just error altogether let provider_peer = message .mut_providerPeers() .iter_mut() - .filter_map(|peer| KadPeer::from_peer(peer).ok()) - .next(); + .find_map(|peer| KadPeer::try_from(peer).ok()); if let Some(provider_peer) = provider_peer { - let key = Multihash::from_bytes(message.take_key()) - .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?; + let key = Multihash::from_bytes(message.take_key()).map_err(invalid_data)?; Ok(KadRequestMsg::AddProvider { key, provider_peer }) } else { - Err(IoError::new( - IoErrorKind::InvalidData, - "received an ADD_PROVIDER message with no valid peer", - )) + Err(invalid_data("ADD_PROVIDER message with no valid peer.")) } } } } -/// Turns a raw Kademlia message into a type-safe message. -fn proto_to_resp_msg( - mut message: protobuf_structs::dht::Message, -) -> Result { +/// Converts a received protobuf message into a corresponding `KadResponseMessage`. +/// +/// Fails if the protobuf message is not a valid and supported Kademlia response message. +fn proto_to_resp_msg(mut message: proto::Message) -> Result { match message.get_field_type() { - protobuf_structs::dht::Message_MessageType::PING => Ok(KadResponseMsg::Pong), + proto::Message_MessageType::PING => Ok(KadResponseMsg::Pong), - protobuf_structs::dht::Message_MessageType::GET_VALUE => { - Err(IoError::new( - IoErrorKind::InvalidData, - "received a GET_VALUE message, but this is not supported by rust-libp2p yet", - )) - } + proto::Message_MessageType::GET_VALUE => + Err(invalid_data("Unsupported: GET_VALUE message")), - protobuf_structs::dht::Message_MessageType::FIND_NODE => { + proto::Message_MessageType::FIND_NODE => { let closer_peers = message .mut_closerPeers() .iter_mut() - .filter_map(|peer| KadPeer::from_peer(peer).ok()) + .filter_map(|peer| KadPeer::try_from(peer).ok()) .collect::>(); Ok(KadResponseMsg::FindNode { closer_peers }) } - protobuf_structs::dht::Message_MessageType::GET_PROVIDERS => { + proto::Message_MessageType::GET_PROVIDERS => { let closer_peers = message .mut_closerPeers() .iter_mut() - .filter_map(|peer| KadPeer::from_peer(peer).ok()) + .filter_map(|peer| KadPeer::try_from(peer).ok()) .collect::>(); + let provider_peers = message .mut_providerPeers() .iter_mut() - .filter_map(|peer| KadPeer::from_peer(peer).ok()) + .filter_map(|peer| KadPeer::try_from(peer).ok()) .collect::>(); Ok(KadResponseMsg::GetProviders { @@ -450,18 +424,22 @@ fn proto_to_resp_msg( }) } - protobuf_structs::dht::Message_MessageType::PUT_VALUE => Err(IoError::new( - IoErrorKind::InvalidData, - "received an unexpected PUT_VALUE message", - )), + proto::Message_MessageType::PUT_VALUE => + Err(invalid_data("received an unexpected PUT_VALUE message")), - protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => Err(IoError::new( - IoErrorKind::InvalidData, - "received an unexpected ADD_PROVIDER message", - )), + proto::Message_MessageType::ADD_PROVIDER => + Err(invalid_data("received an unexpected ADD_PROVIDER message")) } } +/// Creates an `io::Error` with `io::ErrorKind::InvalidData`. +fn invalid_data(e: E) -> io::Error +where + E: Into> +{ + io::Error::new(io::ErrorKind::InvalidData, e) +} + #[cfg(test)] mod tests { diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 4f8089a8..7bf6a0cb 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -267,16 +267,12 @@ where let state = self .closest_peers .iter_mut() - .filter_map( - |(peer_id, state)| { - if peer_id == id { - Some(state) - } else { - None - } - }, - ) - .next(); + .find_map(|(peer_id, state)| + if peer_id == id { + Some(state) + } else { + None + }); match state { Some(state @ &mut QueryPeerState::InProgress(_)) => *state = QueryPeerState::Failed, @@ -295,67 +291,47 @@ where let mut active_counter = 0; // While iterating over peers, count the number of queries in a row (from closer to further - // away from target) that are in the succeeded in state. - // Contains `None` if the chain is broken. - let mut succeeded_counter = Some(0); + // away from target) that are in the succeeded state. + let mut succeeded_counter = 0; // Extract `self.num_results` to avoid borrowing errors with closures. let num_results = self.num_results; for &mut (ref peer_id, ref mut state) in self.closest_peers.iter_mut() { // Start by "killing" the query if it timed out. - { - let timed_out = match state { - QueryPeerState::InProgress(timeout) => match timeout.poll() { - Ok(Async::Ready(_)) | Err(_) => true, - Ok(Async::NotReady) => false, - }, - _ => false, - }; - if timed_out { - *state = QueryPeerState::Failed; - return Async::Ready(QueryStatePollOut::CancelRpc { peer_id }); - } - } - - // Increment the local counters. - match state { - QueryPeerState::InProgress(_) => { - active_counter += 1; - } - QueryPeerState::Succeeded => { - if let Some(ref mut c) = succeeded_counter { - *c += 1; + if let QueryPeerState::InProgress(timeout) = state { + match timeout.poll() { + Ok(Async::Ready(_)) | Err(_) => { + *state = QueryPeerState::Failed; + return Async::Ready(QueryStatePollOut::CancelRpc { peer_id }); + } + Ok(Async::NotReady) => { + active_counter += 1 } } - _ => (), - }; - - // We have enough results; the query is done. - if succeeded_counter - .as_ref() - .map(|&c| c >= num_results) - .unwrap_or(false) - { - return Async::Ready(QueryStatePollOut::Finished); } - // Dial the node if it needs dialing. - let need_connect = match state { - QueryPeerState::NotContacted => match self.stage { - QueryStage::Iterating { .. } => active_counter < self.parallelism, - QueryStage::Frozen => true, // TODO: as an optimization, could be false if we're not trying to find peers - }, - _ => false, - }; + if let QueryPeerState::Succeeded = state { + succeeded_counter += 1; + // If we have enough results; the query is done. + if succeeded_counter >= num_results { + return Async::Ready(QueryStatePollOut::Finished) + } + } - if need_connect { - let delay = Delay::new(Instant::now() + self.rpc_timeout); - *state = QueryPeerState::InProgress(delay); - return Async::Ready(QueryStatePollOut::SendRpc { - peer_id, - query_target: &self.target, - }); + if let QueryPeerState::NotContacted = state { + let connect = match self.stage { + QueryStage::Frozen => true, + QueryStage::Iterating {..} => active_counter < self.parallelism, + }; + if connect { + let delay = Delay::new(Instant::now() + self.rpc_timeout); + *state = QueryPeerState::InProgress(delay); + return Async::Ready(QueryStatePollOut::SendRpc { + peer_id, + query_target: &self.target, + }); + } } }