diff --git a/examples/kademlia.rs b/examples/kademlia.rs index 4c932727..f2c08d06 100644 --- a/examples/kademlia.rs +++ b/examples/kademlia.rs @@ -172,6 +172,10 @@ fn main() { .collect::>(); responder.respond(result); }, + KadIncomingRequest::GetProviders { .. } => { + }, + KadIncomingRequest::AddProvider { .. } => { + }, KadIncomingRequest::PingPong => { } }; diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 1228ef61..12a85722 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -17,6 +17,7 @@ libp2p-ping = { path = "../../protocols/ping" } libp2p-core = { path = "../../core" } log = "0.4" multiaddr = { path = "../../misc/multiaddr" } +multihash = { path = "../../misc/multihash" } parking_lot = "0.6" protobuf = "2.0.2" rand = "0.4.2" diff --git a/protocols/kad/src/kad_server.rs b/protocols/kad/src/kad_server.rs index d75f167a..2a9c3e43 100644 --- a/protocols/kad/src/kad_server.rs +++ b/protocols/kad/src/kad_server.rs @@ -37,6 +37,7 @@ use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::{future, Future, Sink, stream, Stream}; use libp2p_core::{ConnectionUpgrade, Endpoint, PeerId}; +use multihash::Multihash; use protocol::{self, KadMsg, KademliaProtocolConfig, KadPeer}; use std::collections::VecDeque; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; @@ -109,7 +110,7 @@ impl KadConnecController { searched_key: &PeerId, ) -> impl Future, Error = IoError> { let message = protocol::KadMsg::FindNodeReq { - key: searched_key.clone().into_bytes(), + key: searched_key.clone().into(), }; let (tx, rx) = oneshot::channel(); @@ -142,6 +143,63 @@ impl KadConnecController { future::Either::A(future) } + /// Sends a `GET_PROVIDERS` query to the node and provides a future that will contain the response. + // TODO: future item could be `impl Iterator` instead + pub fn get_providers( + &self, + searched_key: &Multihash, + ) -> impl Future, Vec), Error = IoError> { + let message = protocol::KadMsg::GetProvidersReq { + key: searched_key.clone(), + }; + + let (tx, rx) = oneshot::channel(); + + match self.inner.unbounded_send((message, tx)) { + Ok(()) => (), + Err(_) => { + let fut = future::err(IoError::new( + IoErrorKind::ConnectionAborted, + "connection to remote has aborted", + )); + + return future::Either::B(fut); + } + }; + + let future = rx.map_err(|_| { + IoError::new( + IoErrorKind::ConnectionAborted, + "connection to remote has aborted", + ) + }).and_then(|msg| match msg { + KadMsg::GetProvidersRes { closer_peers, provider_peers } => Ok((closer_peers, provider_peers)), + _ => Err(IoError::new( + IoErrorKind::InvalidData, + "invalid response type received from the remote", + )), + }); + + future::Either::A(future) + } + + /// Sends an `ADD_PROVIDER` message to the node. + pub fn add_provider(&self, key: Multihash, provider_peer: KadPeer) -> Result<(), IoError> { + // Dummy channel, as the `tx` is going to be dropped anyway. + let (tx, _rx) = oneshot::channel(); + let message = protocol::KadMsg::AddProvider { + key, + provider_peer, + }; + match self.inner.unbounded_send((message, tx)) { + Ok(()) => Ok(()), + Err(_) => Err(IoError::new( + IoErrorKind::ConnectionAborted, + "connection to remote has aborted", + )), + } + } + /// Sends a `PING` query to the node. Because of the way the protocol is designed, there is /// no way to differentiate between a ping and a pong. Therefore this function doesn't return a /// future, and the only way to be notified of the result is through the stream. @@ -168,10 +226,29 @@ pub enum KadIncomingRequest { responder: KadFindNodeRespond, }, - // TODO: PutValue and FindValue + /// Find the nodes closest to `searched` and return the known providers for `searched`. + GetProviders { + /// The value being searched. + searched: Multihash, + /// Object to use to respond to the request. + responder: KadGetProvidersRespond, + }, + + /// Registers a provider for the given key. + /// + /// The local node is supposed to remember this and return the provider on a `GetProviders` + /// request for the given key. + AddProvider { + /// The key of the provider. + key: Multihash, + /// The provider to register. + provider_peer: KadPeer, + }, /// Received either a ping or a pong. PingPong, + + // TODO: PutValue and FindValue } /// Object used to respond to `FindNode` queries from remotes. @@ -190,6 +267,24 @@ impl KadFindNodeRespond { } } +/// Object used to respond to `GetProviders` queries from remotes. +pub struct KadGetProvidersRespond { + inner: oneshot::Sender, +} + +impl KadGetProvidersRespond { + /// Respond to the `GetProviders` request. + pub fn respond(self, closest_peers: Ic, providers: Ip) + where Ic: IntoIterator, + Ip: IntoIterator, + { + let _ = self.inner.send(KadMsg::GetProvidersRes { + closer_peers: closest_peers.into_iter().collect(), + provider_peers: providers.into_iter().collect(), + }); + } +} + // Builds a controller and stream from a stream/sink of raw messages. fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box + Send + 'a>) where S: Sink + Stream + Send + 'a @@ -281,9 +376,11 @@ where }); Box::new(future) }, - Some(EventSource::LocalRequest(message @ KadMsg::PutValue { .. }, _)) => { - // A `PutValue` request. Contrary to other types of messages, this one - // doesn't expect any answer and therefore we ignore the sender. + Some(EventSource::LocalRequest(message @ KadMsg::PutValue { .. }, _)) | + Some(EventSource::LocalRequest(message @ KadMsg::AddProvider { .. }, _)) => { + // A `PutValue` or `AddProvider` request. Contrary to other types of + // messages, these ones don't expect any answer and therefore we ignore + // the sender. let future = kad_sink .send(message) .map(move |kad_sink| { @@ -340,8 +437,9 @@ where } } Some(EventSource::Remote(message @ KadMsg::FindNodeRes { .. })) - | Some(EventSource::Remote(message @ KadMsg::GetValueRes { .. })) => { - // `FindNodeRes` or `GetValueRes` received on the socket. + | Some(EventSource::Remote(message @ KadMsg::GetValueRes { .. })) + | Some(EventSource::Remote(message @ KadMsg::GetProvidersRes { .. })) => { + // `FindNodeRes`, `GetValueRes` or `GetProvidersRes` received on the socket. // Send it back through `send_back_queue`. if let Some(send_back) = send_back_queue.pop_front() { let _ = send_back.send(message); @@ -356,22 +454,13 @@ where Box::new(future) } } - Some(EventSource::Remote(KadMsg::FindNodeReq { key, .. })) => { - let peer_id = match PeerId::from_bytes(key) { - Ok(id) => id, - Err(key) => { - debug!("Ignoring FIND_NODE request with invalid key: {:?}", key); - let future = future::err(IoError::new(IoErrorKind::InvalidData, "invalid key in FIND_NODE")); - return Box::new(future); - } - }; - + Some(EventSource::Remote(KadMsg::FindNodeReq { key })) => { let (tx, rx) = oneshot::channel(); let _ = responders_tx.unbounded_send(rx); let future = future::ok({ let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished); let rq = KadIncomingRequest::FindNode { - searched: peer_id, + searched: key, responder: KadFindNodeRespond { inner: tx } @@ -381,6 +470,30 @@ where Box::new(future) } + Some(EventSource::Remote(KadMsg::GetProvidersReq { key })) => { + let (tx, rx) = oneshot::channel(); + let _ = responders_tx.unbounded_send(rx); + let future = future::ok({ + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished); + let rq = KadIncomingRequest::GetProviders { + searched: key, + responder: KadGetProvidersRespond { + inner: tx + } + }; + (Some(rq), state) + }); + + Box::new(future) + } + Some(EventSource::Remote(KadMsg::AddProvider { key, provider_peer })) => { + let future = future::ok({ + let state = (events, kad_sink, responders_tx, send_back_queue, expected_pongs, finished); + let rq = KadIncomingRequest::AddProvider { key, provider_peer }; + (Some(rq), state) + }); + Box::new(future) as Box<_> + } Some(EventSource::Remote(KadMsg::GetValueReq { .. })) => { warn!("GET_VALUE requests are not implemented yet"); let future = future::err(IoError::new(IoErrorKind::Other, diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index f448ce21..883bd1b1 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -69,6 +69,7 @@ extern crate libp2p_core; #[macro_use] extern crate log; extern crate multiaddr; +extern crate multihash; extern crate parking_lot; extern crate protobuf; extern crate rand; diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index dda91901..64120182 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -28,6 +28,7 @@ use bytes::{Bytes, BytesMut}; use futures::{future, sink, Sink, stream, Stream}; use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr, PeerId}; +use multihash::Multihash; use protobuf::{self, Message}; use protobuf_structs; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; @@ -175,34 +176,62 @@ where pub enum KadMsg { /// Ping request or response. Ping, + /// Target must save the given record, can be queried later with `GetValueReq`. PutValue { /// Identifier of the record. - key: Vec, + key: Multihash, /// The record itself. record: (), //record: protobuf_structs::record::Record, // TODO: no }, + GetValueReq { /// Identifier of the record. - key: Vec, + key: Multihash, }, + GetValueRes { /// Identifier of the returned record. - key: Vec, + key: Multihash, record: (), //record: Option, // TODO: no closer_peers: Vec, }, + /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes /// returned is not specified, but should be around 20. FindNodeReq { /// Identifier of the node. - key: Vec, + key: PeerId, }, + /// Response to a `FindNodeReq`. FindNodeRes { /// Results of the request. closer_peers: Vec, }, + + /// Same as `FindNodeReq`, but should also return the entries of the local providers list for + /// this key. + GetProvidersReq { + /// Identifier being searched. + key: Multihash, + }, + + /// Response to a `FindNodeReq`. + GetProvidersRes { + /// Nodes closest to the key. + closer_peers: Vec, + /// Known providers for this key. + provider_peers: Vec, + }, + + /// Indicates that this list of providers is known for this key. + AddProvider { + /// Key for which we should add providers. + key: Multihash, + /// Known provider for this key. + provider_peer: KadPeer, + }, } // Turns a type-safe kadmelia message into the corresponding row protobuf message. @@ -216,14 +245,14 @@ fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message { KadMsg::PutValue { key, .. } => { let mut msg = protobuf_structs::dht::Message::new(); msg.set_field_type(protobuf_structs::dht::Message_MessageType::PUT_VALUE); - msg.set_key(key); + msg.set_key(key.into_bytes()); //msg.set_record(record); // TODO: msg } KadMsg::GetValueReq { key } => { let mut msg = protobuf_structs::dht::Message::new(); msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_VALUE); - msg.set_key(key); + msg.set_key(key.into_bytes()); msg.set_clusterLevelRaw(10); msg } @@ -231,7 +260,7 @@ fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message { KadMsg::FindNodeReq { key } => { let mut msg = protobuf_structs::dht::Message::new(); msg.set_field_type(protobuf_structs::dht::Message_MessageType::FIND_NODE); - msg.set_key(key); + msg.set_key(key.into_bytes()); msg.set_clusterLevelRaw(10); msg } @@ -247,6 +276,36 @@ fn msg_to_proto(kad_msg: KadMsg) -> protobuf_structs::dht::Message { } msg } + KadMsg::GetProvidersReq { key } => { + let mut msg = protobuf_structs::dht::Message::new(); + msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_PROVIDERS); + msg.set_key(key.into_bytes()); + msg.set_clusterLevelRaw(10); + msg + } + KadMsg::GetProvidersRes { closer_peers, provider_peers } => { + // TODO: if empty, the remote will think it's a request + // TODO: not good, possibly exposed in the API + assert!(!closer_peers.is_empty()); + let mut msg = protobuf_structs::dht::Message::new(); + msg.set_field_type(protobuf_structs::dht::Message_MessageType::GET_PROVIDERS); + msg.set_clusterLevelRaw(9); + for peer in closer_peers { + msg.mut_closerPeers().push(peer.into()); + } + for peer in provider_peers { + msg.mut_providerPeers().push(peer.into()); + } + msg + } + KadMsg::AddProvider { key, provider_peer } => { + let mut msg = protobuf_structs::dht::Message::new(); + msg.set_field_type(protobuf_structs::dht::Message_MessageType::ADD_PROVIDER); + msg.set_clusterLevelRaw(10); + msg.set_key(key.into_bytes()); + msg.mut_providerPeers().push(provider_peer.into()); + msg + } } } @@ -256,7 +315,8 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result Ok(KadMsg::Ping), protobuf_structs::dht::Message_MessageType::PUT_VALUE => { - let key = message.take_key(); + let key = Multihash::from_bytes(message.take_key()) + .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?; let _record = message.take_record(); Ok(KadMsg::PutValue { key: key, @@ -265,14 +325,17 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result { - let key = message.take_key(); + let key = Multihash::from_bytes(message.take_key()) + .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?; Ok(KadMsg::GetValueReq { key: key }) } protobuf_structs::dht::Message_MessageType::FIND_NODE => { if message.get_closerPeers().is_empty() { + let key = PeerId::from_bytes(message.take_key()) + .map_err(|_| IoError::new(IoErrorKind::InvalidData, "invalid peer id in FIND_NODE"))?; Ok(KadMsg::FindNodeReq { - key: message.take_key(), + key, }) } else { @@ -290,14 +353,56 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result { - // These messages don't seem to be used in the protocol in practice, so if we receive - // them we suppose that it's a mistake in the protocol usage. - Err(IoError::new( - IoErrorKind::InvalidData, - "received an unsupported kad message type", - )) + protobuf_structs::dht::Message_MessageType::GET_PROVIDERS => { + if message.get_closerPeers().is_empty() { + let key = Multihash::from_bytes(message.take_key()) + .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?; + Ok(KadMsg::GetProvidersReq { + key, + }) + + } else { + // TODO: for now we don't parse the peer properly, so it is possible that we get + // parsing errors for peers even when they are valid ; we ignore these + // errors for now, but ultimately we should just error altogether + let closer_peers = message.mut_closerPeers() + .iter_mut() + .filter_map(|peer| KadPeer::from_peer(peer).ok()) + .collect::>(); + let provider_peers = message.mut_providerPeers() + .iter_mut() + .filter_map(|peer| KadPeer::from_peer(peer).ok()) + .collect::>(); + + Ok(KadMsg::GetProvidersRes { + closer_peers, + provider_peers, + }) + } + } + + protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => { + // TODO: for now we don't parse the peer properly, so it is possible that we get + // parsing errors for peers even when they are valid ; we ignore these + // errors for now, but ultimately we should just error altogether + let provider_peer = message.mut_providerPeers() + .iter_mut() + .filter_map(|peer| KadPeer::from_peer(peer).ok()) + .next(); + + if let Some(provider_peer) = provider_peer { + let key = Multihash::from_bytes(message.take_key()) + .map_err(|err| IoError::new(IoErrorKind::InvalidData, err))?; + Ok(KadMsg::AddProvider { + key, + provider_peer, + }) + } else { + Err(IoError::new( + IoErrorKind::InvalidData, + "received an ADD_PROVIDER message with no valid peer", + )) + } } } } @@ -310,6 +415,7 @@ mod tests { use self::libp2p_tcp_transport::TcpConfig; use futures::{Future, Sink, Stream}; use libp2p_core::{Transport, PeerId, PublicKey}; + use multihash::{encode, Hash}; use protocol::{KadConnectionType, KadMsg, KademliaProtocolConfig, KadPeer}; use std::sync::mpsc; use std::thread; @@ -321,14 +427,14 @@ mod tests { test_one(KadMsg::Ping); test_one(KadMsg::PutValue { - key: vec![1, 2, 3, 4], + key: encode(Hash::SHA2256, &[1, 2, 3, 4]).unwrap(), record: (), }); test_one(KadMsg::GetValueReq { - key: vec![10, 11, 12], + key: encode(Hash::SHA2256, &[10, 11, 12]).unwrap(), }); test_one(KadMsg::FindNodeReq { - key: vec![9, 12, 0, 245, 245, 201, 28, 95], + key: PeerId::from_public_key(PublicKey::Rsa(vec![9, 12, 0, 245, 245, 201, 28, 95])) }); test_one(KadMsg::FindNodeRes { closer_peers: vec![ @@ -339,6 +445,33 @@ mod tests { }, ], }); + test_one(KadMsg::GetProvidersReq { + key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(), + }); + test_one(KadMsg::GetProvidersRes { + closer_peers: vec![ + KadPeer { + node_id: PeerId::from_public_key(PublicKey::Rsa(vec![93, 80, 12, 250])), + multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()], + connection_ty: KadConnectionType::Connected, + }, + ], + provider_peers: vec![ + KadPeer { + node_id: PeerId::from_public_key(PublicKey::Rsa(vec![12, 90, 1, 28])), + multiaddrs: vec!["/ip4/200.201.202.203/tcp/1999".parse().unwrap()], + connection_ty: KadConnectionType::NotConnected, + }, + ], + }); + test_one(KadMsg::AddProvider { + key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(), + provider_peer: KadPeer { + node_id: PeerId::from_public_key(PublicKey::Rsa(vec![5, 6, 7, 8])), + multiaddrs: vec!["/ip4/9.1.2.3/udp/23".parse().unwrap()], + connection_ty: KadConnectionType::Connected, + }, + }); // TODO: all messages fn test_one(msg_server: KadMsg) {