// Copyright 2018 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::protocol::{ KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg, KademliaProtocolConfig, }; use crate::record::{self, Record}; use futures::prelude::*; use libp2p_swarm::{ NegotiatedSubstream, KeepAlive, SubstreamProtocol, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr }; use libp2p_core::{ either::EitherOutput, upgrade::{self, InboundUpgrade, OutboundUpgrade} }; use log::trace; use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::Instant; /// Protocol handler that handles Kademlia communications with the remote. /// /// The handler will automatically open a Kademlia substream with the remote for each request we /// make. /// /// It also handles requests made by the remote. pub struct KademliaHandler { /// Configuration for the Kademlia protocol. config: KademliaHandlerConfig, /// Next unique ID of a connection. next_connec_unique_id: UniqueConnecId, /// List of active substreams with the state they are in. substreams: Vec>, /// Until when to keep the connection alive. keep_alive: KeepAlive, } /// Configuration of a [`KademliaHandler`]. #[derive(Debug, Clone)] pub struct KademliaHandlerConfig { /// Configuration of the wire protocol. pub protocol_config: KademliaProtocolConfig, /// If false, we deny incoming requests. pub allow_listening: bool, /// Time after which we close an idle connection. pub idle_timeout: Duration, } /// State of an active substream, opened either by us or by the remote. enum SubstreamState { /// We haven't started opening the outgoing substream yet. /// Contains the request we want to send, and the user data if we expect an answer. OutPendingOpen(KadRequestMsg, Option), /// Waiting to send a message to the remote. OutPendingSend( KadOutStreamSink, KadRequestMsg, Option, ), /// Waiting to flush the substream so that the data arrives to the remote. OutPendingFlush(KadOutStreamSink, Option), /// Waiting for an answer back from the remote. // TODO: add timeout OutWaitingAnswer(KadOutStreamSink, TUserData), /// An error happened on the substream and we should report the error to the user. OutReportError(KademliaHandlerQueryErr, TUserData), /// The substream is being closed. OutClosing(KadOutStreamSink), /// Waiting for a request from the remote. InWaitingMessage(UniqueConnecId, KadInStreamSink), /// Waiting for the user to send a `KademliaHandlerIn` event containing the response. InWaitingUser(UniqueConnecId, KadInStreamSink), /// Waiting to send an answer back to the remote. InPendingSend(UniqueConnecId, KadInStreamSink, KadResponseMsg), /// Waiting to flush an answer back to the remote. InPendingFlush(UniqueConnecId, KadInStreamSink), /// The substream is being closed. InClosing(KadInStreamSink), } impl SubstreamState { /// Tries to close the substream. /// /// If the substream is not ready to be closed, returns it back. fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self { SubstreamState::OutPendingOpen(_, _) | SubstreamState::OutReportError(_, _) => Poll::Ready(()), SubstreamState::OutPendingSend(ref mut stream, _, _) | SubstreamState::OutPendingFlush(ref mut stream, _) | SubstreamState::OutWaitingAnswer(ref mut stream, _) | SubstreamState::OutClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, }, SubstreamState::InWaitingMessage(_, ref mut stream) | SubstreamState::InWaitingUser(_, ref mut stream) | SubstreamState::InPendingSend(_, ref mut stream, _) | SubstreamState::InPendingFlush(_, ref mut stream) | SubstreamState::InClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, }, } } } /// Event produced by the Kademlia handler. #[derive(Debug)] pub enum KademliaHandlerEvent { /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes /// returned is not specified, but should be around 20. FindNodeReq { /// The key for which to locate the closest nodes. key: Vec, /// Identifier of the request. Needs to be passed back when answering. request_id: KademliaRequestId, }, /// Response to an `KademliaHandlerIn::FindNodeReq`. FindNodeRes { /// Results of the request. closer_peers: Vec, /// The user data passed to the `FindNodeReq`. user_data: TUserData, }, /// Same as `FindNodeReq`, but should also return the entries of the local providers list for /// this key. GetProvidersReq { /// The key for which providers are requested. key: record::Key, /// Identifier of the request. Needs to be passed back when answering. request_id: KademliaRequestId, }, /// Response to an `KademliaHandlerIn::GetProvidersReq`. GetProvidersRes { /// Nodes closest to the key. closer_peers: Vec, /// Known providers for this key. provider_peers: Vec, /// The user data passed to the `GetProvidersReq`. user_data: TUserData, }, /// An error happened when performing a query. QueryError { /// The error that happened. error: KademliaHandlerQueryErr, /// The user data passed to the query. user_data: TUserData, }, /// The peer announced itself as a provider of a key. AddProvider { /// The key for which the peer is a provider of the associated value. key: record::Key, /// The peer that is the provider of the value for `key`. provider: KadPeer, }, /// Request to get a value from the dht records GetRecord { /// Key for which we should look in the dht key: record::Key, /// Identifier of the request. Needs to be passed back when answering. request_id: KademliaRequestId, }, /// Response to a `KademliaHandlerIn::GetRecord`. GetRecordRes { /// The result is present if the key has been found record: Option, /// Nodes closest to the key. closer_peers: Vec, /// The user data passed to the `GetValue`. user_data: TUserData, }, /// Request to put a value in the dht records PutRecord { record: Record, /// Identifier of the request. Needs to be passed back when answering. request_id: KademliaRequestId, }, /// Response to a request to store a record. PutRecordRes { /// The key of the stored record. key: record::Key, /// The value of the stored record. value: Vec, /// The user data passed to the `PutValue`. user_data: TUserData, } } /// Error that can happen when requesting an RPC query. #[derive(Debug)] pub enum KademliaHandlerQueryErr { /// Error while trying to perform the query. Upgrade(ProtocolsHandlerUpgrErr), /// Received an answer that doesn't correspond to the request. UnexpectedMessage, /// I/O error in the substream. Io(io::Error), } impl fmt::Display for KademliaHandlerQueryErr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { KademliaHandlerQueryErr::Upgrade(err) => { write!(f, "Error while performing Kademlia query: {}", err) }, KademliaHandlerQueryErr::UnexpectedMessage => { write!(f, "Remote answered our Kademlia RPC query with the wrong message type") }, KademliaHandlerQueryErr::Io(err) => { write!(f, "I/O error during a Kademlia RPC query: {}", err) }, } } } impl error::Error for KademliaHandlerQueryErr { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { KademliaHandlerQueryErr::Upgrade(err) => Some(err), KademliaHandlerQueryErr::UnexpectedMessage => None, KademliaHandlerQueryErr::Io(err) => Some(err), } } } impl From> for KademliaHandlerQueryErr { fn from(err: ProtocolsHandlerUpgrErr) -> Self { KademliaHandlerQueryErr::Upgrade(err) } } /// Event to send to the handler. #[derive(Debug, Clone)] pub enum KademliaHandlerIn { /// Resets the (sub)stream associated with the given request ID, /// thus signaling an error to the remote. /// /// Explicitly resetting the (sub)stream associated with a request /// can be used as an alternative to letting requests simply time /// out on the remote peer, thus potentially avoiding some delay /// for the query on the remote. Reset(KademliaRequestId), /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes /// returned is not specified, but should be around 20. FindNodeReq { /// Identifier of the node. key: Vec, /// Custom user data. Passed back in the out event when the results arrive. user_data: TUserData, }, /// Response to a `FindNodeReq`. FindNodeRes { /// Results of the request. closer_peers: Vec, /// Identifier of the request that was made by the remote. /// /// It is a logic error to use an id of the handler of a different node. request_id: KademliaRequestId, }, /// Same as `FindNodeReq`, but should also return the entries of the local providers list for /// this key. GetProvidersReq { /// Identifier being searched. key: record::Key, /// Custom user data. Passed back in the out event when the results arrive. user_data: TUserData, }, /// Response to a `GetProvidersReq`. GetProvidersRes { /// Nodes closest to the key. closer_peers: Vec, /// Known providers for this key. provider_peers: Vec, /// Identifier of the request that was made by the remote. /// /// It is a logic error to use an id of the handler of a different node. request_id: KademliaRequestId, }, /// Indicates that this provider is known for this key. /// /// The API of the handler doesn't expose any event that allows you to know whether this /// succeeded. AddProvider { /// Key for which we should add providers. key: record::Key, /// Known provider for this key. provider: KadPeer, }, /// Request to retrieve a record from the DHT. GetRecord { /// The key of the record. key: record::Key, /// Custom data. Passed back in the out event when the results arrive. user_data: TUserData, }, /// Response to a `GetRecord` request. GetRecordRes { /// The value that might have been found in our storage. record: Option, /// Nodes that are closer to the key we were searching for. closer_peers: Vec, /// Identifier of the request that was made by the remote. request_id: KademliaRequestId, }, /// Put a value into the dht records. PutRecord { record: Record, /// Custom data. Passed back in the out event when the results arrive. user_data: TUserData, }, /// Response to a `PutRecord`. PutRecordRes { /// Key of the value that was put. key: record::Key, /// Value that was put. value: Vec, /// Identifier of the request that was made by the remote. request_id: KademliaRequestId, } } /// Unique identifier for a request. Must be passed back in order to answer a request from /// the remote. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KademliaRequestId { /// Unique identifier for an incoming connection. connec_unique_id: UniqueConnecId, } /// Unique identifier for a connection. #[derive(Debug, Copy, Clone, PartialEq, Eq)] struct UniqueConnecId(u64); impl KademliaHandler { /// Create a [`KademliaHandler`] using the given configuration. pub fn new(config: KademliaHandlerConfig) -> Self { let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout); KademliaHandler { config, next_connec_unique_id: UniqueConnecId(0), substreams: Vec::new(), keep_alive, } } } impl Default for KademliaHandler { fn default() -> Self { KademliaHandler::new(Default::default()) } } impl ProtocolsHandler for KademliaHandler where TUserData: Clone + Send + 'static, { type InEvent = KademliaHandlerIn; type OutEvent = KademliaHandlerEvent; type Error = io::Error; // TODO: better error type? type InboundProtocol = upgrade::EitherUpgrade; type OutboundProtocol = KademliaProtocolConfig; // Message of the request to send to the remote, and user data if we expect an answer. type OutboundOpenInfo = (KadRequestMsg, Option); type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { if self.config.allow_listening { SubstreamProtocol::new(self.config.protocol_config.clone(), ()).map_upgrade(upgrade::EitherUpgrade::A) } else { SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ()) } } fn inject_fully_negotiated_outbound( &mut self, protocol: >::Output, (msg, user_data): Self::OutboundOpenInfo, ) { self.substreams .push(SubstreamState::OutPendingSend(protocol, msg, user_data)); } fn inject_fully_negotiated_inbound( &mut self, protocol: >::Output, (): Self::InboundOpenInfo ) { // If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol` // is a `Void`. let protocol = match protocol { EitherOutput::First(p) => p, EitherOutput::Second(p) => void::unreachable(p), }; debug_assert!(self.config.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; self.substreams .push(SubstreamState::InWaitingMessage(connec_unique_id, protocol)); } fn inject_event(&mut self, message: KademliaHandlerIn) { match message { KademliaHandlerIn::Reset(request_id) => { let pos = self.substreams.iter().position(|state| match state { SubstreamState::InWaitingUser(conn_id, _) => conn_id == &request_id.connec_unique_id, _ => false, }); if let Some(pos) = pos { // TODO: we don't properly close down the substream let waker = futures::task::noop_waker(); let mut cx = Context::from_waker(&waker); let _ = self.substreams.remove(pos).try_close(&mut cx); } } KademliaHandlerIn::FindNodeReq { key, user_data } => { let msg = KadRequestMsg::FindNode { key }; self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone()))); } KademliaHandlerIn::FindNodeRes { closer_peers, request_id, } => { let pos = self.substreams.iter().position(|state| match state { SubstreamState::InWaitingUser(ref conn_id, _) => conn_id == &request_id.connec_unique_id, _ => false, }); if let Some(pos) = pos { let (conn_id, substream) = match self.substreams.remove(pos) { SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), _ => unreachable!(), }; let msg = KadResponseMsg::FindNode { closer_peers: closer_peers.clone(), }; self.substreams .push(SubstreamState::InPendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::GetProvidersReq { key, user_data } => { let msg = KadRequestMsg::GetProviders { key }; self.substreams .push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone()))); } KademliaHandlerIn::GetProvidersRes { closer_peers, provider_peers, request_id, } => { let pos = self.substreams.iter().position(|state| match state { SubstreamState::InWaitingUser(ref conn_id, _) if conn_id == &request_id.connec_unique_id => { true } _ => false, }); if let Some(pos) = pos { let (conn_id, substream) = match self.substreams.remove(pos) { SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), _ => unreachable!(), }; let msg = KadResponseMsg::GetProviders { closer_peers: closer_peers.clone(), provider_peers: provider_peers.clone(), }; self.substreams .push(SubstreamState::InPendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::AddProvider { key, provider } => { let msg = KadRequestMsg::AddProvider { key, provider }; self.substreams.push(SubstreamState::OutPendingOpen(msg, None)); } KademliaHandlerIn::GetRecord { key, user_data } => { let msg = KadRequestMsg::GetValue { key }; self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data))); } KademliaHandlerIn::PutRecord { record, user_data } => { let msg = KadRequestMsg::PutValue { record }; self.substreams .push(SubstreamState::OutPendingOpen(msg, Some(user_data))); } KademliaHandlerIn::GetRecordRes { record, closer_peers, request_id, } => { let pos = self.substreams.iter().position(|state| match state { SubstreamState::InWaitingUser(ref conn_id, _) => conn_id == &request_id.connec_unique_id, _ => false, }); if let Some(pos) = pos { let (conn_id, substream) = match self.substreams.remove(pos) { SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), _ => unreachable!(), }; let msg = KadResponseMsg::GetValue { record, closer_peers: closer_peers.clone(), }; self.substreams .push(SubstreamState::InPendingSend(conn_id, substream, msg)); } } KademliaHandlerIn::PutRecordRes { key, request_id, value, } => { let pos = self.substreams.iter().position(|state| match state { SubstreamState::InWaitingUser(ref conn_id, _) if conn_id == &request_id.connec_unique_id => { true } _ => false, }); if let Some(pos) = pos { let (conn_id, substream) = match self.substreams.remove(pos) { SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream), _ => unreachable!(), }; let msg = KadResponseMsg::PutValue { key, value, }; self.substreams .push(SubstreamState::InPendingSend(conn_id, substream, msg)); } } } } fn inject_dial_upgrade_error( &mut self, (_, user_data): Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr, ) { // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't // continue trying if let Some(user_data) = user_data { self.substreams .push(SubstreamState::OutReportError(error.into(), user_data)); } } fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } fn poll( &mut self, cx: &mut Context<'_>, ) -> Poll< ProtocolsHandlerEvent, > { if self.substreams.is_empty() { return Poll::Pending; } // We remove each element from `substreams` one by one and add them back. for n in (0..self.substreams.len()).rev() { let mut substream = self.substreams.swap_remove(n); loop { match advance_substream(substream, self.config.protocol_config.clone(), cx) { (Some(new_state), Some(event), _) => { self.substreams.push(new_state); return Poll::Ready(event); } (None, Some(event), _) => { if self.substreams.is_empty() { self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); } return Poll::Ready(event); } (Some(new_state), None, false) => { self.substreams.push(new_state); break; } (Some(new_state), None, true) => { substream = new_state; continue; } (None, None, _) => { break; } } } } if self.substreams.is_empty() { // We destroyed all substreams in this function. self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout); } else { self.keep_alive = KeepAlive::Yes; } Poll::Pending } } impl Default for KademliaHandlerConfig { fn default() -> Self { KademliaHandlerConfig { protocol_config: Default::default(), allow_listening: true, idle_timeout: Duration::from_secs(10), } } } /// Advances one substream. /// /// Returns the new state for that substream, an event to generate, and whether the substream /// should be polled again. fn advance_substream( state: SubstreamState, upgrade: KademliaProtocolConfig, cx: &mut Context<'_>, ) -> ( Option>, Option< ProtocolsHandlerEvent< KademliaProtocolConfig, (KadRequestMsg, Option), KademliaHandlerEvent, io::Error, >, >, bool, ) { match state { SubstreamState::OutPendingOpen(msg, user_data) => { let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(upgrade, (msg, user_data)) }; (None, Some(ev), false) } SubstreamState::OutPendingSend(mut substream, msg, user_data) => { match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( Some(SubstreamState::OutPendingFlush(substream, user_data)), None, true, ), Err(error) => { let event = if let Some(user_data) = user_data { Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), user_data })) } else { None }; (None, event, false) } } }, Poll::Pending => ( Some(SubstreamState::OutPendingSend(substream, msg, user_data)), None, false, ), Poll::Ready(Err(error)) => { let event = if let Some(user_data) = user_data { Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), user_data })) } else { None }; (None, event, false) } } } SubstreamState::OutPendingFlush(mut substream, user_data) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { if let Some(user_data) = user_data { ( Some(SubstreamState::OutWaitingAnswer(substream, user_data)), None, true, ) } else { (Some(SubstreamState::OutClosing(substream)), None, true) } } Poll::Pending => ( Some(SubstreamState::OutPendingFlush(substream, user_data)), None, false, ), Poll::Ready(Err(error)) => { let event = if let Some(user_data) = user_data { Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), user_data, })) } else { None }; (None, event, false) } } } SubstreamState::OutWaitingAnswer(mut substream, user_data) => match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { let new_state = SubstreamState::OutClosing(substream); let event = process_kad_response(msg, user_data); ( Some(new_state), Some(ProtocolsHandlerEvent::Custom(event)), true, ) } Poll::Pending => ( Some(SubstreamState::OutWaitingAnswer(substream, user_data)), None, false, ), Poll::Ready(Some(Err(error))) => { let event = KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(error), user_data, }; (None, Some(ProtocolsHandlerEvent::Custom(event)), false) } Poll::Ready(None) => { let event = KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()), user_data, }; (None, Some(ProtocolsHandlerEvent::Custom(event)), false) } }, SubstreamState::OutReportError(error, user_data) => { let event = KademliaHandlerEvent::QueryError { error, user_data }; (None, Some(ProtocolsHandlerEvent::Custom(event)), false) } SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) { Poll::Ready(Ok(())) => (None, None, false), Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false), Poll::Ready(Err(_)) => (None, None, false), }, SubstreamState::InWaitingMessage(id, mut substream) => match Stream::poll_next(Pin::new(&mut substream), cx) { Poll::Ready(Some(Ok(msg))) => { if let Ok(ev) = process_kad_request(msg, id) { ( Some(SubstreamState::InWaitingUser(id, substream)), Some(ProtocolsHandlerEvent::Custom(ev)), false, ) } else { (Some(SubstreamState::InClosing(substream)), None, true) } } Poll::Pending => ( Some(SubstreamState::InWaitingMessage(id, substream)), None, false, ), Poll::Ready(None) => { trace!("Inbound substream: EOF"); (None, None, false) } Poll::Ready(Some(Err(e))) => { trace!("Inbound substream error: {:?}", e); (None, None, false) }, }, SubstreamState::InWaitingUser(id, substream) => ( Some(SubstreamState::InWaitingUser(id, substream)), None, false, ), SubstreamState::InPendingSend(id, mut substream, msg) => match Sink::poll_ready(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) { Ok(()) => ( Some(SubstreamState::InPendingFlush(id, substream)), None, true, ), Err(_) => (None, None, false), }, Poll::Pending => ( Some(SubstreamState::InPendingSend(id, substream, msg)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), } SubstreamState::InPendingFlush(id, mut substream) => match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => ( Some(SubstreamState::InWaitingMessage(id, substream)), None, true, ), Poll::Pending => ( Some(SubstreamState::InPendingFlush(id, substream)), None, false, ), Poll::Ready(Err(_)) => (None, None, false), }, SubstreamState::InClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) { Poll::Ready(Ok(())) => (None, None, false), Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false), Poll::Ready(Err(_)) => (None, None, false), }, } } /// Processes a Kademlia message that's expected to be a request from a remote. fn process_kad_request( event: KadRequestMsg, connec_unique_id: UniqueConnecId, ) -> Result, io::Error> { match event { KadRequestMsg::Ping => { // TODO: implement; although in practice the PING message is never // used, so we may consider removing it altogether Err(io::Error::new( io::ErrorKind::InvalidData, "the PING Kademlia message is not implemented", )) } KadRequestMsg::FindNode { key } => Ok(KademliaHandlerEvent::FindNodeReq { key, request_id: KademliaRequestId { connec_unique_id }, }), KadRequestMsg::GetProviders { key } => Ok(KademliaHandlerEvent::GetProvidersReq { key, request_id: KademliaRequestId { connec_unique_id }, }), KadRequestMsg::AddProvider { key, provider } => { Ok(KademliaHandlerEvent::AddProvider { key, provider }) } KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetRecord { key, request_id: KademliaRequestId { connec_unique_id }, }), KadRequestMsg::PutValue { record } => Ok(KademliaHandlerEvent::PutRecord { record, request_id: KademliaRequestId { connec_unique_id }, }) } } /// Process a Kademlia message that's supposed to be a response to one of our requests. fn process_kad_response( event: KadResponseMsg, user_data: TUserData, ) -> KademliaHandlerEvent { // TODO: must check that the response corresponds to the request match event { KadResponseMsg::Pong => { // We never send out pings. KademliaHandlerEvent::QueryError { error: KademliaHandlerQueryErr::UnexpectedMessage, user_data, } } KadResponseMsg::FindNode { closer_peers } => { KademliaHandlerEvent::FindNodeRes { closer_peers, user_data, } }, KadResponseMsg::GetProviders { closer_peers, provider_peers, } => KademliaHandlerEvent::GetProvidersRes { closer_peers, provider_peers, user_data, }, KadResponseMsg::GetValue { record, closer_peers, } => KademliaHandlerEvent::GetRecordRes { record, closer_peers, user_data, }, KadResponseMsg::PutValue { key, value, .. } => { KademliaHandlerEvent::PutRecordRes { key, value, user_data, } } } }