mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-13 10:01:25 +00:00
Implement GET_PROVIDERS/ADD_PROVIDER Kademlia messages (#530)
* Implement GET_PROVIDERS/ADD_PROVIDER Kademlia messages * Use multihash and peer id in protocol.rs * Fix Kademlia example
This commit is contained in:
@ -172,6 +172,10 @@ fn main() {
|
|||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
responder.respond(result);
|
responder.respond(result);
|
||||||
},
|
},
|
||||||
|
KadIncomingRequest::GetProviders { .. } => {
|
||||||
|
},
|
||||||
|
KadIncomingRequest::AddProvider { .. } => {
|
||||||
|
},
|
||||||
KadIncomingRequest::PingPong => {
|
KadIncomingRequest::PingPong => {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -17,6 +17,7 @@ libp2p-ping = { path = "../../protocols/ping" }
|
|||||||
libp2p-core = { path = "../../core" }
|
libp2p-core = { path = "../../core" }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
multiaddr = { path = "../../misc/multiaddr" }
|
multiaddr = { path = "../../misc/multiaddr" }
|
||||||
|
multihash = { path = "../../misc/multihash" }
|
||||||
parking_lot = "0.6"
|
parking_lot = "0.6"
|
||||||
protobuf = "2.0.2"
|
protobuf = "2.0.2"
|
||||||
rand = "0.4.2"
|
rand = "0.4.2"
|
||||||
|
@ -37,6 +37,7 @@ use bytes::Bytes;
|
|||||||
use futures::sync::{mpsc, oneshot};
|
use futures::sync::{mpsc, oneshot};
|
||||||
use futures::{future, Future, Sink, stream, Stream};
|
use futures::{future, Future, Sink, stream, Stream};
|
||||||
use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
|
use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId};
|
||||||
|
use multihash::Multihash;
|
||||||
use protocol::{self, KadMsg, KademliaProtocolConfig, KadPeer};
|
use protocol::{self, KadMsg, KademliaProtocolConfig, KadPeer};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
@ -109,7 +110,7 @@ impl KadConnecController {
|
|||||||
searched_key: &PeerId,
|
searched_key: &PeerId,
|
||||||
) -> impl Future<Item = Vec<KadPeer>, Error = IoError> {
|
) -> impl Future<Item = Vec<KadPeer>, Error = IoError> {
|
||||||
let message = protocol::KadMsg::FindNodeReq {
|
let message = protocol::KadMsg::FindNodeReq {
|
||||||
key: searched_key.clone().into_bytes(),
|
key: searched_key.clone().into(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
@ -142,6 +143,63 @@ impl KadConnecController {
|
|||||||
future::Either::A(future)
|
future::Either::A(future)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sends a `GET_PROVIDERS` query to the node and provides a future that will contain the response.
|
||||||
|
// TODO: future item could be `impl Iterator` instead
|
||||||
|
pub fn get_providers(
|
||||||
|
&self,
|
||||||
|
searched_key: &Multihash,
|
||||||
|
) -> impl Future<Item = (Vec<KadPeer>, Vec<KadPeer>), Error = IoError> {
|
||||||
|
let message = protocol::KadMsg::GetProvidersReq {
|
||||||
|
key: searched_key.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
match self.inner.unbounded_send((message, tx)) {
|
||||||
|
Ok(()) => (),
|
||||||
|
Err(_) => {
|
||||||
|
let fut = future::err(IoError::new(
|
||||||
|
IoErrorKind::ConnectionAborted,
|
||||||
|
"connection to remote has aborted",
|
||||||
|
));
|
||||||
|
|
||||||
|
return future::Either::B(fut);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let future = rx.map_err(|_| {
|
||||||
|
IoError::new(
|
||||||
|
IoErrorKind::ConnectionAborted,
|
||||||
|
"connection to remote has aborted",
|
||||||
|
)
|
||||||
|
}).and_then(|msg| match msg {
|
||||||
|
KadMsg::GetProvidersRes { closer_peers, provider_peers } => Ok((closer_peers, provider_peers)),
|
||||||
|
_ => Err(IoError::new(
|
||||||
|
IoErrorKind::InvalidData,
|
||||||
|
"invalid response type received from the remote",
|
||||||
|
)),
|
||||||
|
});
|
||||||
|
|
||||||
|
future::Either::A(future)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sends an `ADD_PROVIDER` message to the node.
|
||||||
|
pub fn add_provider(&self, key: Multihash, provider_peer: KadPeer) -> Result<(), IoError> {
|
||||||
|
// Dummy channel, as the `tx` is going to be dropped anyway.
|
||||||
|
let (tx, _rx) = oneshot::channel();
|
||||||
|
let message = protocol::KadMsg::AddProvider {
|
||||||
|
key,
|
||||||
|
provider_peer,
|
||||||
|
};
|
||||||
|
match self.inner.unbounded_send((message, tx)) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(_) => Err(IoError::new(
|
||||||
|
IoErrorKind::ConnectionAborted,
|
||||||
|
"connection to remote has aborted",
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Sends a `PING` query to the node. Because of the way the protocol is designed, there is
|
/// Sends a `PING` query to the node. Because of the way the protocol is designed, there is
|
||||||
/// no way to differentiate between a ping and a pong. Therefore this function doesn't return a
|
/// no way to differentiate between a ping and a pong. Therefore this function doesn't return a
|
||||||
/// future, and the only way to be notified of the result is through the stream.
|
/// future, and the only way to be notified of the result is through the stream.
|
||||||
@ -168,10 +226,29 @@ pub enum KadIncomingRequest {
|
|||||||
responder: KadFindNodeRespond,
|
responder: KadFindNodeRespond,
|
||||||
},
|
},
|
||||||
|
|
||||||
// TODO: PutValue and FindValue
|
/// Find the nodes closest to `searched` and return the known providers for `searched`.
|
||||||
|
GetProviders {
|
||||||
|
/// The value being searched.
|
||||||
|
searched: Multihash,
|
||||||
|
/// Object to use to respond to the request.
|
||||||
|
responder: KadGetProvidersRespond,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Registers a provider for the given key.
|
||||||
|
///
|
||||||
|
/// The local node is supposed to remember this and return the provider on a `GetProviders`
|
||||||
|
/// request for the given key.
|
||||||
|
AddProvider {
|
||||||
|
/// The key of the provider.
|
||||||
|
key: Multihash,
|
||||||
|
/// The provider to register.
|
||||||
|
provider_peer: KadPeer,
|
||||||
|
},
|
||||||
|
|
||||||
/// Received either a ping or a pong.
|
/// Received either a ping or a pong.
|
||||||
PingPong,
|
PingPong,
|
||||||
|
|
||||||
|
// TODO: PutValue and FindValue
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Object used to respond to `FindNode` queries from remotes.
|
/// Object used to respond to `FindNode` queries from remotes.
|
||||||
@ -190,6 +267,24 @@ impl KadFindNodeRespond {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Object used to respond to `GetProviders` queries from remotes.
|
||||||
|
pub struct KadGetProvidersRespond {
|
||||||
|
inner: oneshot::Sender<KadMsg>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KadGetProvidersRespond {
|
||||||
|
/// Respond to the `GetProviders` request.
|
||||||
|
pub fn respond<Ic, Ip>(self, closest_peers: Ic, providers: Ip)
|
||||||
|
where Ic: IntoIterator<Item = protocol::KadPeer>,
|
||||||
|
Ip: IntoIterator<Item = protocol::KadPeer>,
|
||||||
|
{
|
||||||
|
let _ = self.inner.send(KadMsg::GetProvidersRes {
|
||||||
|
closer_peers: closest_peers.into_iter().collect(),
|
||||||
|
provider_peers: providers.into_iter().collect(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Builds a controller and stream from a stream/sink of raw messages.
|
// Builds a controller and stream from a stream/sink of raw messages.
|
||||||
fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send + 'a>)
|
fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send + 'a>)
|
||||||
where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Error = IoError> + Send + 'a
|
where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Error = IoError> + Send + 'a
|
||||||
@ -281,9 +376,11 @@ where
|
|||||||
});
|
});
|
||||||
Box::new(future)
|
Box::new(future)
|
||||||
},
|
},
|
||||||
Some(EventSource::LocalRequest(message @ KadMsg::PutValue { .. }, _)) => {
|
Some(EventSource::LocalRequest(message @ KadMsg::PutValue { .. }, _)) |
|
||||||
// A `PutValue` request. Contrary to other types of messages, this one
|
Some(EventSource::LocalRequest(message @ KadMsg::AddProvider { .. }, _)) => {
|
||||||
// doesn't expect any answer and therefore we ignore the sender.
|
// A `PutValue` or `AddProvider` request. Contrary to other types of
|
||||||
|
// messages, these ones don't expect any answer and therefore we ignore
|
||||||
|
// the sender.
|
||||||
let future = kad_sink
|
let future = kad_sink
|
||||||
.send(message)
|
.send(message)
|
||||||
.map(move |kad_sink| {
|
.map(move |kad_sink| {
|
||||||
@ -340,8 +437,9 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(EventSource::Remote(message @ KadMsg::FindNodeRes { .. }))
|
Some(EventSource::Remote(message @ KadMsg::FindNodeRes { .. }))
|
||||||
| Some(EventSource::Remote(message @ KadMsg::GetValueRes { .. })) => {
|
| Some(EventSource::Remote(message @ KadMsg::GetValueRes { .. }))
|
||||||
// `FindNodeRes` or `GetValueRes` received on the socket.
|
| Some(EventSource::Remote(message @ KadMsg::GetProvidersRes { .. })) => {
|
||||||
|
// `FindNodeRes`, `GetValueRes` or `GetProvidersRes` received on the socket.
|
||||||
// Send it back through `send_back_queue`.
|
// Send it back through `send_back_queue`.
|
||||||
if let Some(send_back) = send_back_queue.pop_front() {
|
if let Some(send_back) = send_back_queue.pop_front() {
|
||||||
let _ = send_back.send(message);
|
let _ = send_back.send(message);
|
||||||
@ -356,22 +454,13 @@ where
|
|||||||
Box::new(future)
|
Box::new(future)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(EventSource::Remote(KadMsg::FindNodeReq { key, .. })) => {
|
Some(EventSource::Remote(KadMsg::FindNodeReq { key })) => {
|
||||||
let peer_id = match PeerId::from_bytes(key) {
|
|
||||||
Ok(id) => id,
|
|
||||||
Err(key) => {
|
|
||||||
debug!("Ignoring FIND_NODE request with invalid key: {:?}", key);
|
|
||||||
let future = future::err(IoError::new(IoErrorKind::InvalidData, "invalid key in FIND_NODE"));
|
|
||||||
return Box::new(future);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let _ = responders_tx.unbounded_send(rx);
|
let _ = responders_tx.unbounded_send(rx);
|
||||||
let future = future::ok({
|
let future = future::ok({
|
||||||
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
|
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
|
||||||
let rq = KadIncomingRequest::FindNode {
|
let rq = KadIncomingRequest::FindNode {
|
||||||
searched: peer_id,
|
searched: key,
|
||||||
responder: KadFindNodeRespond {
|
responder: KadFindNodeRespond {
|
||||||
inner: tx
|
inner: tx
|
||||||
}
|
}
|
||||||
@ -381,6 +470,30 @@ where
|
|||||||
|
|
||||||
Box::new(future)
|
Box::new(future)
|
||||||
}
|
}
|
||||||
|
Some(EventSource::Remote(KadMsg::GetProvidersReq { key })) => {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let _ = responders_tx.unbounded_send(rx);
|
||||||
|
let future = future::ok({
|
||||||
|
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
|
||||||
|
let rq = KadIncomingRequest::GetProviders {
|
||||||
|
searched: key,
|
||||||
|
responder: KadGetProvidersRespond {
|
||||||
|
inner: tx
|
||||||
|
}
|
||||||
|
};
|
||||||
|
(Some(rq), state)
|
||||||
|
});
|
||||||
|
|
||||||
|
Box::new(future)
|
||||||
|
}
|
||||||
|
Some(EventSource::Remote(KadMsg::AddProvider { key, provider_peer })) => {
|
||||||
|
let future = future::ok({
|
||||||
|
let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished);
|
||||||
|
let rq = KadIncomingRequest::AddProvider { key, provider_peer };
|
||||||
|
(Some(rq), state)
|
||||||
|
});
|
||||||
|
Box::new(future) as Box<_>
|
||||||
|
}
|
||||||
Some(EventSource::Remote(KadMsg::GetValueReq { .. })) => {
|
Some(EventSource::Remote(KadMsg::GetValueReq { .. })) => {
|
||||||
warn!("GET_VALUE requests are not implemented yet");
|
warn!("GET_VALUE requests are not implemented yet");
|
||||||
let future = future::err(IoError::new(IoErrorKind::Other,
|
let future = future::err(IoError::new(IoErrorKind::Other,
|
||||||
|
@ -69,6 +69,7 @@ extern crate libp2p_core;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
extern crate multiaddr;
|
extern crate multiaddr;
|
||||||
|
extern crate multihash;
|
||||||
extern crate parking_lot;
|
extern crate parking_lot;
|
||||||
extern crate protobuf;
|
extern crate protobuf;
|
||||||
extern crate rand;
|
extern crate rand;
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures::{future, sink, Sink, stream, Stream};
|
use futures::{future, sink, Sink, stream, Stream};
|
||||||
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr, PeerId};
|
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr, PeerId};
|
||||||
|
use multihash::Multihash;
|
||||||
use protobuf::{self, Message};
|
use protobuf::{self, Message};
|
||||||
use protobuf_structs;
|
use protobuf_structs;
|
||||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||||
@ -175,34 +176,62 @@ where
|
|||||||
pub enum KadMsg {
|
pub enum KadMsg {
|
||||||
/// Ping request or response.
|
/// Ping request or response.
|
||||||
Ping,
|
Ping,
|
||||||
|
|
||||||
/// Target must save the given record, can be queried later with `GetValueReq`.
|
/// Target must save the given record, can be queried later with `GetValueReq`.
|
||||||
PutValue {
|
PutValue {
|
||||||
/// Identifier of the record.
|
/// Identifier of the record.
|
||||||
key: Vec<u8>,
|
key: Multihash,
|
||||||
/// The record itself.
|
/// The record itself.
|
||||||
record: (), //record: protobuf_structs::record::Record, // TODO: no
|
record: (), //record: protobuf_structs::record::Record, // TODO: no
|
||||||
},
|
},
|
||||||
|
|
||||||
GetValueReq {
|
GetValueReq {
|
||||||
/// Identifier of the record.
|
/// Identifier of the record.
|
||||||
key: Vec<u8>,
|
key: Multihash,
|
||||||
},
|
},
|
||||||
|
|
||||||
GetValueRes {
|
GetValueRes {
|
||||||
/// Identifier of the returned record.
|
/// Identifier of the returned record.
|
||||||
key: Vec<u8>,
|
key: Multihash,
|
||||||
record: (), //record: Option<protobuf_structs::record::Record>, // TODO: no
|
record: (), //record: Option<protobuf_structs::record::Record>, // TODO: no
|
||||||
closer_peers: Vec<KadPeer>,
|
closer_peers: Vec<KadPeer>,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
|
/// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
|
||||||
/// returned is not specified, but should be around 20.
|
/// returned is not specified, but should be around 20.
|
||||||
FindNodeReq {
|
FindNodeReq {
|
||||||
/// Identifier of the node.
|
/// Identifier of the node.
|
||||||
key: Vec<u8>,
|
key: PeerId,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// Response to a `FindNodeReq`.
|
/// Response to a `FindNodeReq`.
|
||||||
FindNodeRes {
|
FindNodeRes {
|
||||||
/// Results of the request.
|
/// Results of the request.
|
||||||
closer_peers: Vec<KadPeer>,
|
closer_peers: Vec<KadPeer>,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Same as `FindNodeReq`, but should also return the entries of the local providers list for
|
||||||
|
/// this key.
|
||||||
|
GetProvidersReq {
|
||||||
|
/// Identifier being searched.
|
||||||
|
key: Multihash,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Response to a `FindNodeReq`.
|
||||||
|
GetProvidersRes {
|
||||||
|
/// Nodes closest to the key.
|
||||||
|
closer_peers: Vec<KadPeer>,
|
||||||
|
/// Known providers for this key.
|
||||||
|
provider_peers: Vec<KadPeer>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Indicates that this list of providers is known for this key.
|
||||||
|
AddProvider {
|
||||||
|
/// Key for which we should add providers.
|
||||||
|
key: Multihash,
|
||||||
|
/// Known provider for this key.
|
||||||
|
provider_peer: KadPeer,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Turns a type-safe kadmelia message into the corresponding row protobuf message.
|
// Turns a type-safe kadmelia message into the corresponding row protobuf message.
|
||||||
@ -216,14 +245,14 @@ fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message {
|
|||||||
KadMsg::PutValue { key, .. } => {
|
KadMsg::PutValue { key, .. } => {
|
||||||
let mut msg = protobuf_structs::dht::Message::new();
|
let mut msg = protobuf_structs::dht::Message::new();
|
||||||
msg.set_field_type(protobuf_structs::dht::Message_MessageType::PUT_VALUE);
|
msg.set_field_type(protobuf_structs::dht::Message_MessageType::PUT_VALUE);
|
||||||
msg.set_key(key);
|
msg.set_key(key.into_bytes());
|
||||||
//msg.set_record(record); // TODO:
|
//msg.set_record(record); // TODO:
|
||||||
msg
|
msg
|
||||||
}
|
}
|
||||||
KadMsg::GetValueReq { key } => {
|
KadMsg::GetValueReq { key } => {
|
||||||
let mut msg = protobuf_structs::dht::Message::new();
|
let mut msg = protobuf_structs::dht::Message::new();
|
||||||
msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_VALUE);
|
msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_VALUE);
|
||||||
msg.set_key(key);
|
msg.set_key(key.into_bytes());
|
||||||
msg.set_clusterLevelRaw(10);
|
msg.set_clusterLevelRaw(10);
|
||||||
msg
|
msg
|
||||||
}
|
}
|
||||||
@ -231,7 +260,7 @@ fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message {
|
|||||||
KadMsg::FindNodeReq { key } => {
|
KadMsg::FindNodeReq { key } => {
|
||||||
let mut msg = protobuf_structs::dht::Message::new();
|
let mut msg = protobuf_structs::dht::Message::new();
|
||||||
msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE);
|
msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE);
|
||||||
msg.set_key(key);
|
msg.set_key(key.into_bytes());
|
||||||
msg.set_clusterLevelRaw(10);
|
msg.set_clusterLevelRaw(10);
|
||||||
msg
|
msg
|
||||||
}
|
}
|
||||||
@ -247,6 +276,36 @@ fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message {
|
|||||||
}
|
}
|
||||||
msg
|
msg
|
||||||
}
|
}
|
||||||
|
KadMsg::GetProvidersReq { key } => {
|
||||||
|
let mut msg = protobuf_structs::dht::Message::new();
|
||||||
|
msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_PROVIDERS);
|
||||||
|
msg.set_key(key.into_bytes());
|
||||||
|
msg.set_clusterLevelRaw(10);
|
||||||
|
msg
|
||||||
|
}
|
||||||
|
KadMsg::GetProvidersRes { closer_peers, provider_peers } => {
|
||||||
|
// TODO: if empty, the remote will think it's a request
|
||||||
|
// TODO: not good, possibly exposed in the API
|
||||||
|
assert!(!closer_peers.is_empty());
|
||||||
|
let mut msg = protobuf_structs::dht::Message::new();
|
||||||
|
msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_PROVIDERS);
|
||||||
|
msg.set_clusterLevelRaw(9);
|
||||||
|
for peer in closer_peers {
|
||||||
|
msg.mut_closerPeers().push(peer.into());
|
||||||
|
}
|
||||||
|
for peer in provider_peers {
|
||||||
|
msg.mut_providerPeers().push(peer.into());
|
||||||
|
}
|
||||||
|
msg
|
||||||
|
}
|
||||||
|
KadMsg::AddProvider { key, provider_peer } => {
|
||||||
|
let mut msg = protobuf_structs::dht::Message::new();
|
||||||
|
msg.set_field_type(protobuf_structs::dht::Message_MessageType::ADD_PROVIDER);
|
||||||
|
msg.set_clusterLevelRaw(10);
|
||||||
|
msg.set_key(key.into_bytes());
|
||||||
|
msg.mut_providerPeers().push(provider_peer.into());
|
||||||
|
msg
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -256,7 +315,8 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result<KadMsg, I
|
|||||||
protobuf_structs::dht::Message_MessageType::PING => Ok(KadMsg::Ping),
|
protobuf_structs::dht::Message_MessageType::PING => Ok(KadMsg::Ping),
|
||||||
|
|
||||||
protobuf_structs::dht::Message_MessageType::PUT_VALUE => {
|
protobuf_structs::dht::Message_MessageType::PUT_VALUE => {
|
||||||
let key = message.take_key();
|
let key = Multihash::from_bytes(message.take_key())
|
||||||
|
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?;
|
||||||
let _record = message.take_record();
|
let _record = message.take_record();
|
||||||
Ok(KadMsg::PutValue {
|
Ok(KadMsg::PutValue {
|
||||||
key: key,
|
key: key,
|
||||||
@ -265,14 +325,17 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result<KadMsg, I
|
|||||||
}
|
}
|
||||||
|
|
||||||
protobuf_structs::dht::Message_MessageType::GET_VALUE => {
|
protobuf_structs::dht::Message_MessageType::GET_VALUE => {
|
||||||
let key = message.take_key();
|
let key = Multihash::from_bytes(message.take_key())
|
||||||
|
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?;
|
||||||
Ok(KadMsg::GetValueReq { key: key })
|
Ok(KadMsg::GetValueReq { key: key })
|
||||||
}
|
}
|
||||||
|
|
||||||
protobuf_structs::dht::Message_MessageType::FIND_NODE => {
|
protobuf_structs::dht::Message_MessageType::FIND_NODE => {
|
||||||
if message.get_closerPeers().is_empty() {
|
if message.get_closerPeers().is_empty() {
|
||||||
|
let key = PeerId::from_bytes(message.take_key())
|
||||||
|
.map_err(|_| IoError::new(IoErrorKind::InvalidData, "invalid peer id in FIND_NODE"))?;
|
||||||
Ok(KadMsg::FindNodeReq {
|
Ok(KadMsg::FindNodeReq {
|
||||||
key: message.take_key(),
|
key,
|
||||||
})
|
})
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
@ -290,17 +353,59 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result<KadMsg, I
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protobuf_structs::dht::Message_MessageType::GET_PROVIDERS
|
protobuf_structs::dht::Message_MessageType::GET_PROVIDERS => {
|
||||||
| protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => {
|
if message.get_closerPeers().is_empty() {
|
||||||
// These messages don't seem to be used in the protocol in practice, so if we receive
|
let key = Multihash::from_bytes(message.take_key())
|
||||||
// them we suppose that it's a mistake in the protocol usage.
|
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?;
|
||||||
|
Ok(KadMsg::GetProvidersReq {
|
||||||
|
key,
|
||||||
|
})
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// TODO: for now we don't parse the peer properly, so it is possible that we get
|
||||||
|
// parsing errors for peers even when they are valid ; we ignore these
|
||||||
|
// errors for now, but ultimately we should just error altogether
|
||||||
|
let closer_peers = message.mut_closerPeers()
|
||||||
|
.iter_mut()
|
||||||
|
.filter_map(|peer| KadPeer::from_peer(peer).ok())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let provider_peers = message.mut_providerPeers()
|
||||||
|
.iter_mut()
|
||||||
|
.filter_map(|peer| KadPeer::from_peer(peer).ok())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
Ok(KadMsg::GetProvidersRes {
|
||||||
|
closer_peers,
|
||||||
|
provider_peers,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => {
|
||||||
|
// TODO: for now we don't parse the peer properly, so it is possible that we get
|
||||||
|
// parsing errors for peers even when they are valid ; we ignore these
|
||||||
|
// errors for now, but ultimately we should just error altogether
|
||||||
|
let provider_peer = message.mut_providerPeers()
|
||||||
|
.iter_mut()
|
||||||
|
.filter_map(|peer| KadPeer::from_peer(peer).ok())
|
||||||
|
.next();
|
||||||
|
|
||||||
|
if let Some(provider_peer) = provider_peer {
|
||||||
|
let key = Multihash::from_bytes(message.take_key())
|
||||||
|
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?;
|
||||||
|
Ok(KadMsg::AddProvider {
|
||||||
|
key,
|
||||||
|
provider_peer,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
Err(IoError::new(
|
Err(IoError::new(
|
||||||
IoErrorKind::InvalidData,
|
IoErrorKind::InvalidData,
|
||||||
"received an unsupported kad message type",
|
"received an ADD_PROVIDER message with no valid peer",
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
@ -310,6 +415,7 @@ mod tests {
|
|||||||
use self::libp2p_tcp_transport::TcpConfig;
|
use self::libp2p_tcp_transport::TcpConfig;
|
||||||
use futures::{Future, Sink, Stream};
|
use futures::{Future, Sink, Stream};
|
||||||
use libp2p_core::{Transport, PeerId, PublicKey};
|
use libp2p_core::{Transport, PeerId, PublicKey};
|
||||||
|
use multihash::{encode, Hash};
|
||||||
use protocol::{KadConnectionType, KadMsg, KademliaProtocolConfig, KadPeer};
|
use protocol::{KadConnectionType, KadMsg, KademliaProtocolConfig, KadPeer};
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
@ -321,14 +427,14 @@ mod tests {
|
|||||||
|
|
||||||
test_one(KadMsg::Ping);
|
test_one(KadMsg::Ping);
|
||||||
test_one(KadMsg::PutValue {
|
test_one(KadMsg::PutValue {
|
||||||
key: vec![1, 2, 3, 4],
|
key: encode(Hash::SHA2256, &[1, 2, 3, 4]).unwrap(),
|
||||||
record: (),
|
record: (),
|
||||||
});
|
});
|
||||||
test_one(KadMsg::GetValueReq {
|
test_one(KadMsg::GetValueReq {
|
||||||
key: vec![10, 11, 12],
|
key: encode(Hash::SHA2256, &[10, 11, 12]).unwrap(),
|
||||||
});
|
});
|
||||||
test_one(KadMsg::FindNodeReq {
|
test_one(KadMsg::FindNodeReq {
|
||||||
key: vec![9, 12, 0, 245, 245, 201, 28, 95],
|
key: PeerId::from_public_key(PublicKey::Rsa(vec![9, 12, 0, 245, 245, 201, 28, 95]))
|
||||||
});
|
});
|
||||||
test_one(KadMsg::FindNodeRes {
|
test_one(KadMsg::FindNodeRes {
|
||||||
closer_peers: vec![
|
closer_peers: vec![
|
||||||
@ -339,6 +445,33 @@ mod tests {
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
test_one(KadMsg::GetProvidersReq {
|
||||||
|
key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
|
||||||
|
});
|
||||||
|
test_one(KadMsg::GetProvidersRes {
|
||||||
|
closer_peers: vec![
|
||||||
|
KadPeer {
|
||||||
|
node_id: PeerId::from_public_key(PublicKey::Rsa(vec![93, 80, 12, 250])),
|
||||||
|
multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
|
||||||
|
connection_ty: KadConnectionType::Connected,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
provider_peers: vec![
|
||||||
|
KadPeer {
|
||||||
|
node_id: PeerId::from_public_key(PublicKey::Rsa(vec![12, 90, 1, 28])),
|
||||||
|
multiaddrs: vec!["/ip4/200.201.202.203/tcp/1999".parse().unwrap()],
|
||||||
|
connection_ty: KadConnectionType::NotConnected,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
});
|
||||||
|
test_one(KadMsg::AddProvider {
|
||||||
|
key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
|
||||||
|
provider_peer: KadPeer {
|
||||||
|
node_id: PeerId::from_public_key(PublicKey::Rsa(vec![5, 6, 7, 8])),
|
||||||
|
multiaddrs: vec!["/ip4/9.1.2.3/udp/23".parse().unwrap()],
|
||||||
|
connection_ty: KadConnectionType::Connected,
|
||||||
|
},
|
||||||
|
});
|
||||||
// TODO: all messages
|
// TODO: all messages
|
||||||
|
|
||||||
fn test_one(msg_server: KadMsg) {
|
fn test_one(msg_server: KadMsg) {
|
||||||
|
Reference in New Issue
Block a user