Embed the topology in the NetworkBehaviour (#889)

* Embed the topology in the NetworkBehaviour

* Put topologies inside of Floodsub and Kad

* Fix core tests

* Fix chat example

* More work

* Some cleanup

* Restore external addresses system
This commit is contained in:
Pierre Krieger
2019-01-26 23:57:53 +01:00
committed by GitHub
parent 30c082dfe5
commit df923526ca
21 changed files with 818 additions and 749 deletions

View File

@ -77,7 +77,6 @@ pub mod muxing;
pub mod nodes;
pub mod protocols_handler;
pub mod swarm;
pub mod topology;
pub mod transport;
pub mod upgrade;

View File

@ -42,7 +42,7 @@
//!
use crate::{
Transport, Multiaddr, PublicKey, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName,
Transport, Multiaddr, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName,
muxing::StreamMuxer,
nodes::{
handled_node::NodeHandler,
@ -50,9 +50,7 @@ use crate::{
raw_swarm::{RawSwarm, RawSwarmEvent}
},
protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapper, IntoProtocolsHandler, ProtocolsHandler},
topology::Topology,
transport::TransportError,
topology::DisconnectReason,
};
use futures::prelude::*;
use smallvec::SmallVec;
@ -61,36 +59,36 @@ use std::{fmt, io, ops::{Deref, DerefMut}};
pub use crate::nodes::raw_swarm::ConnectedPoint;
/// Contains the state of the network, plus the way it should behave.
pub struct Swarm<TTransport, TBehaviour, TTopology>
pub struct Swarm<TTransport, TBehaviour>
where TTransport: Transport,
TBehaviour: NetworkBehaviour<TTopology>,
TBehaviour: NetworkBehaviour,
{
raw_swarm: RawSwarm<
TTransport,
<<<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
NodeHandlerWrapperBuilder<TBehaviour::ProtocolsHandler>,
<<<TBehaviour as NetworkBehaviour<TTopology>>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error,
>,
/// Handles which nodes to connect to and how to handle the events sent back by the protocol
/// handlers.
behaviour: TBehaviour,
/// Holds the topology of the network. In other words all the nodes that we think exist, even
/// if we're not connected to them.
topology: TTopology,
/// List of protocols that the behaviour says it supports.
supported_protocols: SmallVec<[Vec<u8>; 16]>,
/// List of multiaddresses we're listening on.
listened_addrs: SmallVec<[Multiaddr; 8]>,
/// List of multiaddresses we're listening on, after account for external IP addresses and
/// similar mechanisms.
external_addrs: SmallVec<[Multiaddr; 8]>,
}
impl<TTransport, TBehaviour, TTopology> Deref for Swarm<TTransport, TBehaviour, TTopology>
impl<TTransport, TBehaviour> Deref for Swarm<TTransport, TBehaviour>
where TTransport: Transport,
TBehaviour: NetworkBehaviour<TTopology>,
TBehaviour: NetworkBehaviour,
{
type Target = TBehaviour;
@ -100,9 +98,9 @@ where TTransport: Transport,
}
}
impl<TTransport, TBehaviour, TTopology> DerefMut for Swarm<TTransport, TBehaviour, TTopology>
impl<TTransport, TBehaviour> DerefMut for Swarm<TTransport, TBehaviour>
where TTransport: Transport,
TBehaviour: NetworkBehaviour<TTopology>,
TBehaviour: NetworkBehaviour,
{
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
@ -110,8 +108,8 @@ where TTransport: Transport,
}
}
impl<TTransport, TBehaviour, TMuxer, TTopology> Swarm<TTransport, TBehaviour, TTopology>
where TBehaviour: NetworkBehaviour<TTopology>,
impl<TTransport, TBehaviour, TMuxer> Swarm<TTransport, TBehaviour>
where TBehaviour: NetworkBehaviour,
TMuxer: StreamMuxer + Send + Sync + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
@ -139,29 +137,12 @@ where TBehaviour: NetworkBehaviour<TTopology>,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<NodeHandlerWrapper<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
TTopology: Topology,
{
/// Builds a new `Swarm`.
#[inline]
pub fn new(transport: TTransport, mut behaviour: TBehaviour, topology: TTopology) -> Self {
let supported_protocols = behaviour
.new_handler()
.into_handler(topology.local_peer_id())
.listen_protocol()
.protocol_info()
.into_iter()
.map(|info| info.protocol_name().to_vec())
.collect();
let raw_swarm = RawSwarm::new(transport, topology.local_peer_id().clone());
Swarm {
raw_swarm,
behaviour,
topology,
supported_protocols,
listened_addrs: SmallVec::new(),
}
pub fn new(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self {
SwarmBuilder::new(transport, behaviour, local_peer_id)
.build()
}
/// Returns the transport passed when building this object.
@ -198,7 +179,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
/// peer.
#[inline]
pub fn dial(me: &mut Self, peer_id: PeerId) {
let addrs = me.topology.addresses_of_peer(&peer_id);
let addrs = me.behaviour.addresses_of_peer(&peer_id);
let handler = me.behaviour.new_handler().into_node_handler_builder();
if let Some(peer) = me.raw_swarm.peer(peer_id).as_not_connected() {
let _ = peer.connect_iter(addrs, handler);
@ -216,22 +197,10 @@ where TBehaviour: NetworkBehaviour<TTopology>,
pub fn local_peer_id(me: &Self) -> &PeerId {
&me.raw_swarm.local_peer_id()
}
/// Returns the topology of the swarm.
#[inline]
pub fn topology(me: &Self) -> &TTopology {
&me.topology
}
/// Returns the topology of the swarm.
#[inline]
pub fn topology_mut(me: &mut Self) -> &mut TTopology {
&mut me.topology
}
}
impl<TTransport, TBehaviour, TMuxer, TTopology> Stream for Swarm<TTransport, TBehaviour, TTopology>
where TBehaviour: NetworkBehaviour<TTopology>,
impl<TTransport, TBehaviour, TMuxer> Stream for Swarm<TTransport, TBehaviour>
where TBehaviour: NetworkBehaviour,
TMuxer: StreamMuxer + Send + Sync + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
@ -259,7 +228,6 @@ where TBehaviour: NetworkBehaviour<TTopology>,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<NodeHandlerWrapper<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
TTopology: Topology,
{
type Item = TBehaviour::OutEvent;
type Error = io::Error;
@ -275,20 +243,15 @@ where TBehaviour: NetworkBehaviour<TTopology>,
self.behaviour.inject_node_event(peer_id, event);
},
Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => {
self.topology.set_connected(&peer_id, &endpoint);
self.behaviour.inject_connected(peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::NodeClosed { peer_id, endpoint }) => {
self.topology.set_disconnected(&peer_id, &endpoint, DisconnectReason::Graceful);
self.behaviour.inject_disconnected(&peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::NodeError { peer_id, endpoint, .. }) => {
self.topology.set_disconnected(&peer_id, &endpoint, DisconnectReason::Error);
self.behaviour.inject_disconnected(&peer_id, endpoint);
},
Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => {
self.topology.set_disconnected(&peer_id, &closed_endpoint, DisconnectReason::Replaced);
self.topology.set_connected(&peer_id, &endpoint);
self.behaviour.inject_disconnected(&peer_id, closed_endpoint);
self.behaviour.inject_connected(peer_id, endpoint);
},
@ -298,20 +261,17 @@ where TBehaviour: NetworkBehaviour<TTopology>,
},
Async::Ready(RawSwarmEvent::ListenerClosed { .. }) => {},
Async::Ready(RawSwarmEvent::IncomingConnectionError { .. }) => {},
Async::Ready(RawSwarmEvent::DialError { multiaddr, .. }) => {
self.topology.set_unreachable(&multiaddr);
},
Async::Ready(RawSwarmEvent::UnknownPeerDialError { multiaddr, .. }) => {
self.topology.set_unreachable(&multiaddr);
},
Async::Ready(RawSwarmEvent::DialError { .. }) => {},
Async::Ready(RawSwarmEvent::UnknownPeerDialError { .. }) => {},
}
let behaviour_poll = {
let transport = self.raw_swarm.transport();
let mut parameters = PollParameters {
topology: &mut self.topology,
local_peer_id: &mut self.raw_swarm.local_peer_id(),
supported_protocols: &self.supported_protocols,
listened_addrs: &self.listened_addrs,
external_addrs: &self.external_addrs,
nat_traversal: &move |a, b| transport.nat_traversal(a, b),
};
self.behaviour.poll(&mut parameters)
@ -335,7 +295,9 @@ where TBehaviour: NetworkBehaviour<TTopology>,
}
},
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
self.topology.add_local_external_addrs(self.raw_swarm.nat_traversal(&address));
for addr in self.raw_swarm.nat_traversal(&address) {
self.external_addrs.push(addr);
}
},
}
}
@ -346,7 +308,7 @@ where TBehaviour: NetworkBehaviour<TTopology>,
///
/// This trait has been designed to be composable. Multiple implementations can be combined into
/// one that handles all the behaviours at once.
pub trait NetworkBehaviour<TTopology> {
pub trait NetworkBehaviour {
/// Handler for all the protocols the network supports.
type ProtocolsHandler: IntoProtocolsHandler;
/// Event generated by the swarm.
@ -355,6 +317,10 @@ pub trait NetworkBehaviour<TTopology> {
/// Builds a new `ProtocolsHandler`.
fn new_handler(&mut self) -> Self::ProtocolsHandler;
/// Addresses that this behaviour is aware of for this specific peer, and that may allow
/// reaching the peer.
fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec<Multiaddr>;
/// Indicates the behaviour that we connected to the node with the given peer id through the
/// given endpoint.
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint);
@ -376,7 +342,7 @@ pub trait NetworkBehaviour<TTopology> {
/// Polls for things that swarm should do.
///
/// This API mimics the API of the `Stream` trait.
fn poll(&mut self, topology: &mut PollParameters<TTopology>) -> Async<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>;
fn poll(&mut self, topology: &mut PollParameters) -> Async<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>;
}
/// Used when deriving `NetworkBehaviour`. When deriving `NetworkBehaviour`, must be implemented
@ -390,20 +356,15 @@ pub trait NetworkBehaviourEventProcess<TEvent> {
/// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to.
// TODO: #[derive(Debug)]
pub struct PollParameters<'a, TTopology: 'a> {
topology: &'a mut TTopology,
pub struct PollParameters<'a: 'a> {
local_peer_id: &'a PeerId,
supported_protocols: &'a [Vec<u8>],
listened_addrs: &'a [Multiaddr],
external_addrs: &'a [Multiaddr],
nat_traversal: &'a dyn Fn(&Multiaddr, &Multiaddr) -> Option<Multiaddr>,
}
impl<'a, TTopology> PollParameters<'a, TTopology> {
/// Returns a reference to the topology of the network.
#[inline]
pub fn topology(&mut self) -> &mut TTopology {
&mut self.topology
}
impl<'a> PollParameters<'a> {
/// Returns the list of protocol the behaviour supports when a remote negotiates a protocol on
/// an inbound substream.
///
@ -422,28 +383,16 @@ impl<'a, TTopology> PollParameters<'a, TTopology> {
}
/// Returns the list of the addresses nodes can use to reach us.
// TODO: should return references
#[inline]
pub fn external_addresses<'b>(&'b mut self) -> impl ExactSizeIterator<Item = Multiaddr> + 'b
where TTopology: Topology
{
let local_peer_id = self.topology.local_peer_id().clone();
self.topology.addresses_of_peer(&local_peer_id).into_iter()
}
/// Returns the public key of the local node.
#[inline]
pub fn local_public_key(&self) -> &PublicKey
where TTopology: Topology
{
self.topology.local_public_key()
pub fn external_addresses<'b>(&'b mut self) -> impl ExactSizeIterator<Item = Multiaddr> + 'b {
self.external_addrs.iter().cloned()
}
/// Returns the peer id of the local node.
#[inline]
pub fn local_peer_id(&self) -> &PeerId
where TTopology: Topology
{
self.topology.local_peer_id()
pub fn local_peer_id(&self) -> &PeerId {
self.local_peer_id
}
/// Calls the `nat_traversal` method on the underlying transport of the `Swarm`.
@ -493,18 +442,15 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
},
}
pub struct SwarmBuilder <TTransport, TBehaviour, TTopology>
where TTransport: Transport,
TBehaviour: NetworkBehaviour<TTopology>
{
pub struct SwarmBuilder<TTransport, TBehaviour> {
incoming_limit: Option<u32>,
topology: TTopology,
local_peer_id: PeerId,
transport: TTransport,
behaviour: TBehaviour,
}
impl<TTransport, TBehaviour, TMuxer, TTopology> SwarmBuilder<TTransport, TBehaviour, TTopology>
where TBehaviour: NetworkBehaviour<TTopology>,
impl<TTransport, TBehaviour, TMuxer> SwarmBuilder<TTransport, TBehaviour>
where TBehaviour: NetworkBehaviour,
TMuxer: StreamMuxer + Send + Sync + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
@ -532,59 +478,53 @@ where TBehaviour: NetworkBehaviour<TTopology>,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Future: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Substream<TMuxer>>>::Error: fmt::Debug + Send + 'static,
<NodeHandlerWrapper<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler> as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
TTopology: Topology,
{
pub fn new(transport: TTransport, behaviour: TBehaviour,
topology:TTopology) -> Self {
pub fn new(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self {
SwarmBuilder {
incoming_limit: None,
transport: transport,
topology: topology,
behaviour: behaviour,
local_peer_id,
transport,
behaviour,
}
}
pub fn incoming_limit(mut self, incoming_limit: Option<u32>) -> Self
{
pub fn incoming_limit(mut self, incoming_limit: Option<u32>) -> Self {
self.incoming_limit = incoming_limit;
self
}
pub fn build(mut self) ->
Swarm<TTransport, TBehaviour, TTopology>
{
pub fn build(mut self) -> Swarm<TTransport, TBehaviour> {
let supported_protocols = self.behaviour
.new_handler()
.into_handler(self.topology.local_peer_id())
.into_handler(&self.local_peer_id)
.listen_protocol()
.protocol_info()
.into_iter()
.map(|info| info.protocol_name().to_vec())
.collect();
let raw_swarm = RawSwarm::new_with_incoming_limit(self.transport,
self.topology.local_peer_id().clone(),
self.incoming_limit);
let raw_swarm = RawSwarm::new_with_incoming_limit(self.transport, self.local_peer_id, self.incoming_limit);
Swarm {
raw_swarm,
behaviour: self.behaviour,
topology: self.topology,
supported_protocols,
listened_addrs: SmallVec::new(),
external_addrs: SmallVec::new(),
}
}
}
#[cfg(test)]
mod tests {
use crate::nodes::raw_swarm::RawSwarm;
use crate::peer_id::PeerId;
use crate::protocols_handler::{DummyProtocolsHandler, ProtocolsHandler};
use crate::public_key::PublicKey;
use crate::tests::dummy_transport::DummyTransport;
use crate::topology::MemoryTopology;
use futures::prelude::*;
use multiaddr::Multiaddr;
use rand::random;
use smallvec::SmallVec;
use std::marker::PhantomData;
@ -600,7 +540,7 @@ mod tests {
trait TSubstream: AsyncRead + AsyncWrite {}
impl <TSubstream, TTopology> NetworkBehaviour<TTopology>
impl<TSubstream> NetworkBehaviour
for DummyBehaviour<TSubstream>
where TSubstream: AsyncRead + AsyncWrite
{
@ -611,6 +551,10 @@ mod tests {
DummyProtocolsHandler::default()
}
fn addresses_of_peer(&self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
@ -618,7 +562,7 @@ mod tests {
fn inject_node_event(&mut self, _: PeerId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
fn poll(&mut self, _:&mut PollParameters<TTopology>) ->
fn poll(&mut self, _: &mut PollParameters) ->
Async<NetworkBehaviourAction<<Self::ProtocolsHandler as
ProtocolsHandler>::InEvent, Self::OutEvent>>
{
@ -638,10 +582,9 @@ mod tests {
fn test_build_swarm() {
let id = get_random_id();
let transport = DummyTransport::new();
let topology = MemoryTopology::empty(id);
let behaviour = DummyBehaviour{marker: PhantomData};
let swarm = SwarmBuilder::new(transport, behaviour,
topology).incoming_limit(Some(4)).build();
id.into_peer_id()).incoming_limit(Some(4)).build();
assert_eq!(swarm.raw_swarm.incoming_limit(), Some(4));
}
@ -649,9 +592,8 @@ mod tests {
fn test_build_swarm_with_max_listeners_none() {
let id = get_random_id();
let transport = DummyTransport::new();
let topology = MemoryTopology::empty(id);
let behaviour = DummyBehaviour{marker: PhantomData};
let swarm = SwarmBuilder::new(transport, behaviour, topology)
let swarm = SwarmBuilder::new(transport, behaviour, id.into_peer_id())
.build();
assert!(swarm.raw_swarm.incoming_limit().is_none())

View File

@ -1,151 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! A *network topology* is a collection of nodes that are part of the network or that we think
//! are part of the network. In other words, it is essentially a container whose layout is
//! optimized for certain operations.
//!
//! In libp2p, a *topology* is any struct that implements at least the `Topology` trait. In order
//! to build a `Swarm`, you have to give to it ownership of a type that implements this trait.
//!
//! In order to use some protocols defined outside of `libp2p-core` (such as Kademlia) in your
//! `Swarm`, you will have to implement additional traits on your topology.
//!
//! While the `MemoryTopology` is provided as a ready-to-go topology that is suitable for quick
//! prototyping, it shouldn't be used in an actual high-performance production software.
use std::collections::HashMap;
use crate::{swarm::ConnectedPoint, Multiaddr, PeerId, PublicKey};
/// Storage for the network topology.
///
/// The topology should also store information about the local node, including its public key, its
/// `PeerId`, and the addresses it's advertising.
pub trait Topology {
/// Returns the addresses to try use to reach the given peer.
///
/// > **Note**: Keep in mind that `peer` can be the local node.
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr>;
/// Returns the `PeerId` of the local node.
fn local_peer_id(&self) -> &PeerId;
/// Returns the public key of the local node.
fn local_public_key(&self) -> &PublicKey;
/// Adds an address that other nodes can use to connect to our local node.
///
/// > **Note**: Should later be returned when calling `addresses_of_peer()` with the `PeerId`
/// > of the local node.
fn add_local_external_addrs<TIter>(&mut self, addrs: TIter)
where TIter: Iterator<Item = Multiaddr>;
/// Indicates to the topology that we have successfully connected to the given address with the
/// given `PeerId`.
fn set_connected(&mut self, _peer_id: &PeerId, _addr: &ConnectedPoint) {}
/// Indicates to the topology that we have been disconnected from the given address with the
/// given `PeerId`.
fn set_disconnected(&mut self, _peer_id: &PeerId, _addr: &ConnectedPoint, _reason: DisconnectReason) {}
/// Indicates to the topology that we have failed to reach the given address.
fn set_unreachable(&mut self, _addr: &Multiaddr) {}
}
/// Reason why the peer has been disconnected.
#[derive(Debug, Copy, Clone)]
pub enum DisconnectReason {
Error,
Graceful,
Replaced,
}
/// Topology of the network stored in memory.
pub struct MemoryTopology {
list: HashMap<PeerId, Vec<Multiaddr>>,
local_peer_id: PeerId,
local_public_key: PublicKey,
}
impl MemoryTopology {
/// Creates an empty topology.
#[inline]
pub fn empty(pubkey: PublicKey) -> MemoryTopology {
let local_peer_id = pubkey.clone().into_peer_id();
MemoryTopology {
list: Default::default(),
local_peer_id,
local_public_key: pubkey,
}
}
/// Returns true if the topology is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.list.is_empty()
}
/// Adds an address to the topology.
#[inline]
pub fn add_address(&mut self, peer: PeerId, addr: Multiaddr) {
let addrs = self.list.entry(peer).or_insert_with(|| Vec::new());
if addrs.iter().all(|a| a != &addr) {
addrs.push(addr);
}
}
/// Returns a list of all the known peers in the topology.
#[inline]
pub fn peers(&self) -> impl Iterator<Item = &PeerId> {
self.list.keys()
}
/// Returns an iterator to all the entries in the topology.
#[inline]
pub fn iter(&self) -> impl Iterator<Item = (&PeerId, &Multiaddr)> {
self.list.iter().flat_map(|(p, l)| l.iter().map(move |ma| (p, ma)))
}
}
impl Topology for MemoryTopology {
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
self.list.get(peer).map(|v| v.clone()).unwrap_or(Vec::new())
}
fn add_local_external_addrs<TIter>(&mut self, addrs: TIter)
where TIter: Iterator<Item = Multiaddr>
{
for addr in addrs {
let id = self.local_peer_id.clone();
self.add_address(id, addr);
}
}
#[inline]
fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
}
#[inline]
fn local_public_key(&self) -> &PublicKey {
&self.local_public_key
}
}

View File

@ -67,8 +67,8 @@ fn main() {
// Create a random PeerId
let local_key = secio::SecioKeyPair::ed25519_generated().unwrap();
let local_pub_key = local_key.to_public_key();
println!("Local peer id: {:?}", local_pub_key.clone().into_peer_id());
let local_peer_id = local_key.to_peer_id();
println!("Local peer id: {:?}", local_peer_id);
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::build_development_transport(local_key);
@ -84,9 +84,22 @@ fn main() {
mdns: libp2p::mdns::Mdns<TSubstream>,
}
impl<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> libp2p::core::swarm::NetworkBehaviourEventProcess<void::Void> for MyBehaviour<TSubstream> {
fn inject_event(&mut self, _ev: void::Void) {
void::unreachable(_ev)
impl<TSubstream: libp2p::tokio_io::AsyncRead + libp2p::tokio_io::AsyncWrite> libp2p::core::swarm::NetworkBehaviourEventProcess<libp2p::mdns::MdnsEvent> for MyBehaviour<TSubstream> {
fn inject_event(&mut self, event: libp2p::mdns::MdnsEvent) {
match event {
libp2p::mdns::MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.floodsub.add_node_to_partial_view(peer);
}
},
libp2p::mdns::MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.floodsub.remove_node_from_partial_view(&peer);
}
}
}
}
}
}
@ -102,12 +115,12 @@ fn main() {
// Create a Swarm to manage peers and events
let mut swarm = {
let mut behaviour = MyBehaviour {
floodsub: libp2p::floodsub::Floodsub::new(local_pub_key.clone().into_peer_id()),
floodsub: libp2p::floodsub::Floodsub::new(local_peer_id.clone()),
mdns: libp2p::mdns::Mdns::new().expect("Failed to create mDNS service"),
};
behaviour.floodsub.subscribe(floodsub_topic.clone());
libp2p::Swarm::new(transport, behaviour, libp2p::core::topology::MemoryTopology::empty(local_pub_key))
libp2p::Swarm::new(transport, behaviour, local_peer_id)
};
// Listen on all interfaces and whatever port the OS assigns

View File

@ -37,23 +37,11 @@ use libp2p::{
fn main() {
// Create a random key for ourselves.
let local_key = secio::SecioKeyPair::ed25519_generated().unwrap();
let local_pub_key = local_key.to_public_key();
let local_peer_id = local_key.to_peer_id();
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol
let transport = libp2p::build_development_transport(local_key);
// Create the topology of the network with the IPFS bootstrap nodes.
let mut topology = libp2p::core::topology::MemoryTopology::empty(local_pub_key.clone());
topology.add_address("QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap());
topology.add_address("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
topology.add_address("QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
topology.add_address("QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
topology.add_address("QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
topology.add_address("QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
topology.add_address("QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
topology.add_address("QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
topology.add_address("QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
// Create a swarm to manage peers and events.
let mut swarm = {
// Create a Kademlia behaviour.
@ -61,8 +49,17 @@ fn main() {
// to insert our local node in the DHT. However here we use `without_init` because this
// example is very ephemeral and we don't want to pollute the DHT. In a real world
// application, you want to use `new` instead.
let mut behaviour = libp2p::kad::Kademlia::without_init(local_pub_key.into_peer_id());
libp2p::core::Swarm::new(transport, behaviour, topology)
let mut behaviour = libp2p::kad::Kademlia::without_init(local_peer_id.clone());
behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse().unwrap(), "/ip4/104.131.131.82/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap());
behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap());
libp2p::core::Swarm::new(transport, behaviour, local_peer_id)
};
// Order Kademlia to search for a peer.
@ -78,10 +75,11 @@ fn main() {
tokio::run(futures::future::poll_fn(move || -> Result<_, ()> {
loop {
match swarm.poll().expect("Error while polling swarm") {
Async::Ready(Some(event)) => {
println!("Result: {:#?}", event);
Async::Ready(Some(ev @ libp2p::kad::KademliaOut::FindNodeResult { .. })) => {
println!("Result: {:#?}", ev);
return Ok(Async::Ready(()));
},
Async::Ready(Some(_)) => (),
Async::Ready(None) | Async::NotReady => break,
}
}

View File

@ -49,6 +49,7 @@ fn build(ast: &DeriveInput) -> TokenStream {
fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let name = &ast.ident;
let (_, ty_generics, where_clause) = ast.generics.split_for_impl();
let multiaddr = quote!{::libp2p::core::Multiaddr};
let trait_to_impl = quote!{::libp2p::core::swarm::NetworkBehaviour};
let net_behv_event_proc = quote!{::libp2p::core::swarm::NetworkBehaviourEventProcess};
let either_ident = quote!{::libp2p::core::either::EitherOutput};
@ -70,25 +71,14 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
quote!{#n}
};
// Name of the type parameter that represents the topology.
let topology_generic = {
let mut n = "TTopology".to_string();
// Avoid collisions.
while ast.generics.type_params().any(|tp| tp.ident.to_string() == n) {
n.push('1');
}
let n = Ident::new(&n, name.span());
quote!{#n}
};
let poll_parameters = quote!{::libp2p::core::swarm::PollParameters<#topology_generic>};
let poll_parameters = quote!{::libp2p::core::swarm::PollParameters};
// Build the generics.
let impl_generics = {
let tp = ast.generics.type_params();
let lf = ast.generics.lifetimes();
let cst = ast.generics.const_params();
quote!{<#(#lf,)* #(#tp,)* #(#cst,)* #topology_generic, #substream_generic>}
quote!{<#(#lf,)* #(#tp,)* #(#cst,)* #substream_generic>}
};
// Build the `where ...` clause of the trait implementation.
@ -98,12 +88,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.flat_map(|field| {
let ty = &field.ty;
vec![
quote!{#ty: #trait_to_impl<#topology_generic>},
quote!{Self: #net_behv_event_proc<<#ty as #trait_to_impl<#topology_generic>>::OutEvent>},
quote!{<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler: #protocols_handler<Substream = #substream_generic>},
quote!{#ty: #trait_to_impl},
quote!{Self: #net_behv_event_proc<<#ty as #trait_to_impl>::OutEvent>},
quote!{<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler: #protocols_handler<Substream = #substream_generic>},
// Note: this bound is required because of https://github.com/rust-lang/rust/issues/55697
quote!{<<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<#substream_generic>},
quote!{<<<#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<#substream_generic>},
quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<#substream_generic>},
quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<#substream_generic>},
]
})
.collect::<Vec<_>>();
@ -143,6 +133,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
out
};
// Build the list of statements to put in the body of `addresses_of_peer()`.
let addresses_of_peer_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}
Some(match field.ident {
Some(ref i) => quote!{ out.extend(self.#i.addresses_of_peer(peer_id)); },
None => quote!{ out.extend(self.#field_n.addresses_of_peer(peer_id)); },
})
})
};
// Build the list of statements to put in the body of `inject_connected()`.
let inject_connected_stmts = {
let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count();
@ -216,7 +220,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
continue;
}
let ty = &field.ty;
let field_info = quote!{ <#ty as #trait_to_impl<#topology_generic>>::ProtocolsHandler };
let field_info = quote!{ <#ty as #trait_to_impl>::ProtocolsHandler };
match ph_ty {
Some(ev) => ph_ty = Some(quote!{ #into_proto_select_ident<#ev, #field_info> }),
ref mut ev @ None => *ev = Some(field_info),
@ -321,7 +325,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
// Now the magic happens.
let final_quote = quote!{
impl #impl_generics #trait_to_impl<#topology_generic> for #name #ty_generics
impl #impl_generics #trait_to_impl for #name #ty_generics
#where_clause
{
type ProtocolsHandler = #protocols_handler_ty;
@ -333,6 +337,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#new_handler
}
fn addresses_of_peer(&self, peer_id: &#peer_id) -> Vec<#multiaddr> {
let mut out = Vec::new();
#(#addresses_of_peer_stmts);*
out
}
#[inline]
fn inject_connected(&mut self, peer_id: #peer_id, endpoint: #connected_point) {
#(#inject_connected_stmts);*

View File

@ -24,7 +24,7 @@ extern crate void;
/// Small utility to check that a type implements `NetworkBehaviour`.
#[allow(dead_code)]
fn require_net_behaviour<T: libp2p::core::swarm::NetworkBehaviour<libp2p::core::topology::MemoryTopology>>() {}
fn require_net_behaviour<T: libp2p::core::swarm::NetworkBehaviour>() {}
// TODO: doesn't compile
/*#[test]

View File

@ -1,5 +1,6 @@
[package]
name = "libp2p-mdns"
edition = "2018"
version = "0.2.0"
description = "Implementation of the libp2p mDNS discovery method"
authors = ["Parity Technologies <admin@parity.io>"]
@ -13,6 +14,7 @@ data-encoding = "2.0"
dns-parser = "0.8"
futures = "0.1"
libp2p-core = { version = "0.2.0", path = "../../core" }
log = "0.4"
multiaddr = { package = "parity-multiaddr", version = "0.1.0", path = "../multiaddr" }
net2 = "0.2"
rand = "0.6"

View File

@ -20,13 +20,14 @@
use crate::service::{MdnsService, MdnsPacket};
use futures::prelude::*;
use log::warn;
use libp2p_core::protocols_handler::{DummyProtocolsHandler, ProtocolsHandler};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{Multiaddr, PeerId, multiaddr::Protocol, topology::MemoryTopology, topology::Topology};
use libp2p_core::{Multiaddr, PeerId, multiaddr::Protocol};
use smallvec::SmallVec;
use std::{fmt, io, iter, marker::PhantomData, time::Duration};
use std::{cmp, fmt, io, iter, marker::PhantomData, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use void::{self, Void};
use tokio_timer::Delay;
/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology.
@ -34,10 +35,16 @@ pub struct Mdns<TSubstream> {
/// The inner service.
service: MdnsService,
/// If `Some`, then we automatically connect to nodes we discover and this is the list of nodes
/// to connect to. Drained in `poll()`.
/// If `None`, then we don't automatically connect.
to_connect_to: Option<SmallVec<[PeerId; 8]>>,
/// List of nodes that we have discovered, the address, and when their TTL expires.
///
/// Each combination of `PeerId` and `Multiaddr` can only appear once, but the same `PeerId`
/// can appear multiple times.
discovered_nodes: SmallVec<[(PeerId, Multiaddr, Instant); 8]>,
/// Future that fires when the TTL at least one node in `discovered_nodes` expires.
///
/// `None` if `discovered_nodes` is empty.
closest_expiration: Option<Delay>,
/// Marker to pin the generic.
marker: PhantomData<TSubstream>,
@ -48,39 +55,109 @@ impl<TSubstream> Mdns<TSubstream> {
pub fn new() -> io::Result<Mdns<TSubstream>> {
Ok(Mdns {
service: MdnsService::new()?,
to_connect_to: Some(SmallVec::new()),
discovered_nodes: SmallVec::new(),
closest_expiration: None,
marker: PhantomData,
})
}
}
/// Trait that must be implemented on the network topology for it to be usable with `Mdns`.
pub trait MdnsTopology: Topology {
/// Adds an address discovered by mDNS.
///
/// Will never be called with the local peer ID.
fn add_mdns_discovered_address(&mut self, peer: PeerId, addr: Multiaddr);
}
impl MdnsTopology for MemoryTopology {
#[inline]
fn add_mdns_discovered_address(&mut self, peer: PeerId, addr: Multiaddr) {
self.add_address(peer, addr)
/// Returns true if the given `PeerId` is in the list of nodes discovered through mDNS.
pub fn has_node(&self, peer_id: &PeerId) -> bool {
self.discovered_nodes.iter().any(|(p, _, _)| p == peer_id)
}
}
impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for Mdns<TSubstream>
/// Event that can be produced by the `Mdns` behaviour.
#[derive(Debug)]
pub enum MdnsEvent {
/// Discovered nodes through mDNS.
Discovered(DiscoveredAddrsIter),
/// The given combinations of `PeerId` and `Multiaddr` have expired.
///
/// Each discovered record has a time-to-live. When this TTL expires and the address hasn't
/// been refreshed, we remove it from the list emit it as an `Expired` event.
Expired(ExpiredAddrsIter),
}
/// Iterator that produces the list of addresses that have been discovered.
pub struct DiscoveredAddrsIter {
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>
}
impl Iterator for DiscoveredAddrsIter {
type Item = (PeerId, Multiaddr);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl ExactSizeIterator for DiscoveredAddrsIter {
}
impl fmt::Debug for DiscoveredAddrsIter {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("DiscoveredAddrsIter")
.finish()
}
}
/// Iterator that produces the list of addresses that have expired.
pub struct ExpiredAddrsIter {
inner: smallvec::IntoIter<[(PeerId, Multiaddr); 4]>
}
impl Iterator for ExpiredAddrsIter {
type Item = (PeerId, Multiaddr);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl ExactSizeIterator for ExpiredAddrsIter {
}
impl fmt::Debug for ExpiredAddrsIter {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ExpiredAddrsIter")
.finish()
}
}
impl<TSubstream> NetworkBehaviour for Mdns<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TTopology: MdnsTopology,
{
type ProtocolsHandler = DummyProtocolsHandler<TSubstream>;
type OutEvent = Void;
type OutEvent = MdnsEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
DummyProtocolsHandler::default()
}
fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec<Multiaddr> {
let now = Instant::now();
self.discovered_nodes
.iter()
.filter(move |(p, _, expires)| p == peer_id && *expires > now)
.map(|(_, addr, _)| addr.clone())
.collect()
}
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
@ -95,23 +172,39 @@ where
fn poll(
&mut self,
params: &mut PollParameters<TTopology>,
params: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
loop {
if let Some(ref mut to_connect_to) = self.to_connect_to {
if !to_connect_to.is_empty() {
let peer_id = to_connect_to.remove(0);
return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id });
} else {
to_connect_to.shrink_to_fit();
// Remove expired peers.
if let Some(ref mut closest_expiration) = self.closest_expiration {
match closest_expiration.poll() {
Ok(Async::Ready(())) => {
let now = Instant::now();
let mut expired = SmallVec::<[(PeerId, Multiaddr); 4]>::new();
while let Some(pos) = self.discovered_nodes.iter().position(|(_, _, exp)| *exp < now) {
let (peer_id, addr, _) = self.discovered_nodes.remove(pos);
expired.push((peer_id, addr));
}
if !expired.is_empty() {
let event = MdnsEvent::Expired(ExpiredAddrsIter {
inner: expired.into_iter(),
});
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
},
Ok(Async::NotReady) => (),
Err(err) => warn!("tokio timer has errored: {:?}", err),
}
}
// Polling the mDNS service, and obtain the list of nodes discovered this round.
let discovered = loop {
let event = match self.service.poll() {
Async::Ready(ev) => ev,
Async::NotReady => return Async::NotReady,
@ -134,29 +227,52 @@ where
.chain(iter::once(obs_port))
.collect();
let mut discovered: SmallVec<[_; 4]> = SmallVec::new();
for peer in response.discovered_peers() {
if peer.id() == params.local_peer_id() {
continue;
}
let new_expiration = Instant::now() + peer.ttl();
let mut addrs = Vec::new();
for addr in peer.addresses() {
if let Some(new_addr) = params.nat_traversal(&addr, &observed) {
params.topology().add_mdns_discovered_address(peer.id().clone(), new_addr);
addrs.push(new_addr);
}
addrs.push(addr);
}
params.topology().add_mdns_discovered_address(peer.id().clone(), addr);
for addr in addrs {
if let Some((_, _, cur_expires)) = self.discovered_nodes.iter_mut()
.find(|(p, a, _)| p == peer.id() && *a == addr)
{
*cur_expires = cmp::max(*cur_expires, new_expiration);
} else {
self.discovered_nodes.push((peer.id().clone(), addr.clone(), new_expiration));
}
if let Some(ref mut to_connect_to) = self.to_connect_to {
to_connect_to.push(peer.id().clone());
discovered.push((peer.id().clone(), addr));
}
}
break discovered;
},
MdnsPacket::ServiceDiscovery(disc) => {
disc.respond(Duration::from_secs(5 * 60));
},
}
}
};
// As the final step, we need to refresh `closest_expiration`.
self.closest_expiration = self.discovered_nodes.iter()
.fold(None, |exp, &(_, _, elem_exp)| {
Some(exp.map(|exp| cmp::min(exp, elem_exp)).unwrap_or(elem_exp))
})
.map(Delay::new);
Async::Ready(NetworkBehaviourAction::GenerateEvent(MdnsEvent::Discovered(DiscoveredAddrsIter {
inner: discovered.into_iter(),
})))
}
}

View File

@ -21,11 +21,11 @@
//! Contains methods that handle the DNS encoding and decoding capabilities not available in the
//! `dns_parser` library.
use crate::{META_QUERY_SERVICE, SERVICE_NAME};
use data_encoding;
use libp2p_core::{Multiaddr, PeerId};
use rand;
use std::{borrow::Cow, cmp, error, fmt, str, time::Duration};
use {META_QUERY_SERVICE, SERVICE_NAME};
/// Decodes a `<character-string>` (as defined by RFC1035) into a `Vec` of ASCII characters.
// TODO: better error type?

View File

@ -52,7 +52,7 @@ const SERVICE_NAME: &'static [u8] = b"_p2p._udp.local";
/// Hardcoded name of the service used for DNS-SD.
const META_QUERY_SERVICE: &'static [u8] = b"_services._dns-sd._udp.local";
pub use self::behaviour::{Mdns, MdnsTopology};
pub use self::behaviour::{Mdns, MdnsEvent};
pub use self::service::MdnsService;
mod behaviour;

View File

@ -451,6 +451,7 @@ impl<'a> MdnsResponse<'a> {
packet,
record_value,
peer_id,
ttl: record.ttl,
})
})
}
@ -478,6 +479,8 @@ pub struct MdnsPeer<'a> {
record_value: String,
/// Id of the peer.
peer_id: PeerId,
/// TTL of the record in seconds.
ttl: u32,
}
impl<'a> MdnsPeer<'a> {
@ -487,6 +490,12 @@ impl<'a> MdnsPeer<'a> {
&self.peer_id
}
/// Returns the requested time-to-live for the record.
#[inline]
pub fn ttl(&self) -> Duration {
Duration::from_secs(u64::from(self.ttl))
}
/// Returns the list of addresses the peer says it is listening on.
///
/// Filters out invalid addresses.

View File

@ -21,9 +21,10 @@
use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::topic::{Topic, TopicHash};
use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet;
use futures::prelude::*;
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::OneShotHandler, PeerId};
use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::OneShotHandler, Multiaddr, PeerId};
use rand;
use smallvec::SmallVec;
use std::{collections::VecDeque, iter, marker::PhantomData};
@ -39,6 +40,9 @@ pub struct Floodsub<TSubstream> {
/// Peer id of the local node. Used for the source of the messages that we publish.
local_peer_id: PeerId,
/// List of peers to send messages to.
target_peers: FnvHashSet<PeerId>,
/// List of peers the network is connected to, and the topics that they're subscribed to.
// TODO: filter out peers that don't support floodsub, so that we avoid hammering them with
// opened substreams
@ -62,12 +66,43 @@ impl<TSubstream> Floodsub<TSubstream> {
Floodsub {
events: VecDeque::new(),
local_peer_id,
target_peers: FnvHashSet::default(),
connected_peers: HashMap::new(),
subscribed_topics: SmallVec::new(),
received: CuckooFilter::new(),
marker: PhantomData,
}
}
/// Add a node to the list of nodes to propagate messages to.
#[inline]
pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
// Send our topics to this node if we're already connected to it.
if self.connected_peers.contains_key(&peer_id) {
for topic in self.subscribed_topics.iter() {
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: FloodsubRpc {
messages: Vec::new(),
subscriptions: vec![FloodsubSubscription {
topic: topic.hash().clone(),
action: FloodsubSubscriptionAction::Subscribe,
}],
},
});
}
}
if self.target_peers.insert(peer_id.clone()) {
self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id });
}
}
/// Remove a node from the list of nodes to propagate messages to.
#[inline]
pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
self.target_peers.remove(&peer_id);
}
}
impl<TSubstream> Floodsub<TSubstream> {
@ -171,7 +206,7 @@ impl<TSubstream> Floodsub<TSubstream> {
}
}
impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for Floodsub<TSubstream>
impl<TSubstream> NetworkBehaviour for Floodsub<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
@ -182,8 +217,13 @@ where
Default::default()
}
fn addresses_of_peer(&self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
// We need to send our subscriptions to the newly-connected node.
if self.target_peers.contains(&id) {
for topic in self.subscribed_topics.iter() {
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: id.clone(),
@ -196,6 +236,7 @@ where
},
});
}
}
self.connected_peers.insert(id.clone(), SmallVec::new());
}
@ -203,6 +244,12 @@ where
fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) {
let was_in = self.connected_peers.remove(id);
debug_assert!(was_in.is_some());
// We can be disconnected by the remote in case of inactivity for example, so we always
// try to reconnect.
if self.target_peers.contains(id) {
self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: id.clone() });
}
}
fn inject_node_event(
@ -290,7 +337,7 @@ where
fn poll(
&mut self,
_: &mut PollParameters<TTopology>,
_: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,

View File

@ -21,11 +21,10 @@
use crate::listen_handler::IdentifyListenHandler;
use crate::periodic_id_handler::{PeriodicIdHandler, PeriodicIdHandlerEvent};
use crate::protocol::{IdentifyInfo, IdentifySender, IdentifySenderFuture};
use crate::topology::IdentifyTopology;
use futures::prelude::*;
use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerSelect, ProtocolsHandlerUpgrErr};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{Multiaddr, PeerId, either::EitherOutput};
use libp2p_core::{Multiaddr, PeerId, PublicKey, either::EitherOutput};
use smallvec::SmallVec;
use std::{collections::HashMap, collections::VecDeque, io};
use tokio_io::{AsyncRead, AsyncWrite};
@ -38,6 +37,8 @@ pub struct Identify<TSubstream> {
protocol_version: String,
/// Agent version to send back to remotes.
agent_version: String,
/// The public key of the local node. To report on the wire.
local_public_key: PublicKey,
/// For each peer we're connected to, the observed address to send back to it.
observed_addresses: HashMap<PeerId, Multiaddr>,
/// List of senders to answer, with the observed multiaddr.
@ -50,10 +51,11 @@ pub struct Identify<TSubstream> {
impl<TSubstream> Identify<TSubstream> {
/// Creates a `Identify`.
pub fn new(protocol_version: String, agent_version: String) -> Self {
pub fn new(protocol_version: String, agent_version: String, local_public_key: PublicKey) -> Self {
Identify {
protocol_version,
agent_version,
local_public_key,
observed_addresses: HashMap::new(),
to_answer: SmallVec::new(),
futures: SmallVec::new(),
@ -62,10 +64,9 @@ impl<TSubstream> Identify<TSubstream> {
}
}
impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for Identify<TSubstream>
impl<TSubstream> NetworkBehaviour for Identify<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TTopology: IdentifyTopology,
{
type ProtocolsHandler = ProtocolsHandlerSelect<IdentifyListenHandler<TSubstream>, PeriodicIdHandler<TSubstream>>;
type OutEvent = IdentifyEvent;
@ -74,6 +75,10 @@ where
IdentifyListenHandler::new().select(PeriodicIdHandler::new())
}
fn addresses_of_peer(&self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
let observed = match endpoint {
ConnectedPoint::Dialer { address } => address,
@ -124,7 +129,7 @@ where
fn poll(
&mut self,
params: &mut PollParameters<TTopology>,
params: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
@ -132,12 +137,6 @@ where
>,
> {
if let Some(event) = self.events.pop_front() {
// We intercept identified events in order to insert the addresses in the topology.
if let NetworkBehaviourAction::GenerateEvent(IdentifyEvent::Identified { ref peer_id, ref info, .. }) = event {
let iter = info.listen_addrs.iter().cloned();
params.topology().add_identify_discovered_addrs(peer_id, iter);
}
return Async::Ready(event);
}
@ -150,7 +149,7 @@ where
.collect();
let send_back_info = IdentifyInfo {
public_key: params.local_public_key().clone(),
public_key: self.local_public_key.clone(),
protocol_version: self.protocol_version.clone(),
agent_version: self.agent_version.clone(),
listen_addrs: params.listened_addresses().cloned().collect(),

View File

@ -84,7 +84,6 @@ extern crate void;
pub use self::identify::{Identify, IdentifyEvent};
pub use self::id_transport::IdentifyTransport;
pub use self::protocol::IdentifyInfo;
pub use self::topology::IdentifyTopology;
pub mod listen_handler;
pub mod periodic_id_handler;
@ -93,4 +92,3 @@ pub mod protocol;
mod identify;
mod id_transport;
mod structs_proto;
mod topology;

View File

@ -1,43 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use libp2p_core::{Multiaddr, PeerId};
use libp2p_core::topology::{MemoryTopology, Topology};
/// Trait required on the topology for the identify system to store addresses.
pub trait IdentifyTopology: Topology {
/// Adds to the topology an address discovered through identification.
///
/// > **Note**: Will never be called with the local peer ID.
fn add_identify_discovered_addrs<TIter>(&mut self, peer: &PeerId, addr: TIter)
where
TIter: Iterator<Item = Multiaddr>;
}
impl IdentifyTopology for MemoryTopology {
fn add_identify_discovered_addrs<TIter>(&mut self, peer: &PeerId, addr: TIter)
where
TIter: Iterator<Item = Multiaddr>,
{
for addr in addr {
self.add_address(peer.clone(), addr);
}
}
}

View File

@ -19,13 +19,13 @@
// DEALINGS IN THE SOFTWARE.
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId};
use crate::kbucket::{KBucketsTable, Update};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut, QueryTarget};
use crate::topology::KademliaTopology;
use fnv::{FnvHashMap, FnvHashSet};
use futures::{prelude::*, stream};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, topology::Topology, Multiaddr, PeerId};
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use multihash::Multihash;
use rand;
use smallvec::SmallVec;
@ -35,8 +35,8 @@ use tokio_timer::Interval;
/// Network behaviour that handles Kademlia.
pub struct Kademlia<TSubstream> {
/// Peer ID of the local node.
local_peer_id: PeerId,
/// Storage for the nodes. Contains the known multiaddresses for this node.
kbuckets: KBucketsTable<PeerId, SmallVec<[Multiaddr; 4]>>,
/// All the iterative queries we are currently performing, with their ID. The last parameter
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
@ -58,11 +58,15 @@ pub struct Kademlia<TSubstream> {
/// Requests received by a remote that we should fulfill as soon as possible.
remote_requests: SmallVec<[(PeerId, KademliaRequestId, QueryTarget); 4]>,
/// List of multihashes that we're providing.
/// List of values and peers that are providing them.
///
/// Note that we use a `PeerId` so that we know that it uses SHA-256. The question as to how to
/// handle more hashes should eventually be resolved.
providing_keys: SmallVec<[PeerId; 8]>,
/// Our local peer ID can be in this container.
// TODO: Note that in reality the value is a SHA-256 of the actual value (https://github.com/libp2p/rust-libp2p/issues/694)
values_providers: FnvHashMap<Multihash, SmallVec<[PeerId; 20]>>,
/// List of values that we are providing ourselves. Must be kept in sync with
/// `values_providers`.
providing_keys: FnvHashSet<Multihash>,
/// Interval to send `ADD_PROVIDER` messages to everyone.
refresh_add_providers: stream::Fuse<Interval>,
@ -80,9 +84,6 @@ pub struct Kademlia<TSubstream> {
/// Events to return when polling.
queued_events: SmallVec<[NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaOut>; 32]>,
/// List of addresses to add to the topology as soon as we are in `poll()`.
add_to_topology: SmallVec<[(PeerId, Multiaddr, KadConnectionType); 32]>,
/// List of providers to add to the topology as soon as we are in `poll()`.
add_provider: SmallVec<[(Multihash, PeerId); 32]>,
@ -121,12 +122,19 @@ impl<TSubstream> Kademlia<TSubstream> {
Self::new_inner(local_peer_id, false)
}
/// Adds a known address for the given `PeerId`.
pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
if let Some(list) = self.kbuckets.entry_mut(peer_id) {
list.push(address);
}
}
/// Inner implementation of the constructors.
fn new_inner(local_peer_id: PeerId, initialize: bool) -> Self {
let parallelism = 3;
let mut behaviour = Kademlia {
local_peer_id: local_peer_id.clone(),
kbuckets: KBucketsTable::new(local_peer_id, Duration::from_secs(60)), // TODO: constant
queued_events: SmallVec::new(),
queries_to_starts: SmallVec::new(),
active_queries: Default::default(),
@ -134,12 +142,12 @@ impl<TSubstream> Kademlia<TSubstream> {
pending_rpcs: SmallVec::with_capacity(parallelism),
next_query_id: QueryId(0),
remote_requests: SmallVec::new(),
providing_keys: SmallVec::new(),
values_providers: FnvHashMap::default(),
providing_keys: FnvHashSet::default(),
refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant
parallelism,
num_results: 20,
rpc_timeout: Duration::from_secs(8),
add_to_topology: SmallVec::new(),
add_provider: SmallVec::new(),
marker: PhantomData,
};
@ -148,7 +156,7 @@ impl<TSubstream> Kademlia<TSubstream> {
// As part of the initialization process, we start one `FIND_NODE` for each bit of the
// possible range of peer IDs.
for n in 0..256 {
let peer_id = match gen_random_id(&local_peer_id, n) {
let peer_id = match gen_random_id(behaviour.kbuckets.my_id(), n) {
Ok(p) => p,
Err(()) => continue,
};
@ -160,29 +168,16 @@ impl<TSubstream> Kademlia<TSubstream> {
behaviour
}
/// Builds a `KadPeer` structure corresponding to the local node.
fn build_local_kad_peer(&self, local_addrs: impl IntoIterator<Item = Multiaddr>) -> KadPeer {
KadPeer {
node_id: self.local_peer_id.clone(),
multiaddrs: local_addrs.into_iter().collect(),
connection_ty: KadConnectionType::Connected,
}
}
/// Builds the answer to a request.
fn build_result<TUserData, TTopology>(&self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters<TTopology>)
fn build_result<TUserData>(&mut self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters)
-> KademliaHandlerIn<TUserData>
where TTopology: KademliaTopology
{
let local_kad_peer = self.build_local_kad_peer(parameters.external_addresses());
match query {
QueryTarget::FindPeer(key) => {
let topology = parameters.topology();
// TODO: insert local_kad_peer somewhere?
let closer_peers = topology
.closest_peers(key.as_ref(), self.num_results)
.map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers))
let closer_peers = self.kbuckets
.find_closest_with_self(&key)
.take(self.num_results)
.map(|peer_id| build_kad_peer(peer_id, parameters, &self.kbuckets, &self.connected_peers))
.collect();
KademliaHandlerIn::FindNodeRes {
@ -191,23 +186,17 @@ impl<TSubstream> Kademlia<TSubstream> {
}
},
QueryTarget::GetProviders(key) => {
let topology = parameters.topology();
// TODO: insert local_kad_peer somewhere?
let closer_peers = topology
.closest_peers(&key, self.num_results)
.map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers))
let closer_peers = self.kbuckets
.find_closest_with_self(&key)
.take(self.num_results)
.map(|peer_id| build_kad_peer(peer_id, parameters, &self.kbuckets, &self.connected_peers))
.collect();
let local_node_is_providing = self.providing_keys.iter().any(|k| k == &key);
let provider_peers = topology
.get_providers(&key)
.map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers))
.chain(if local_node_is_providing {
Some(local_kad_peer)
} else {
None
}.into_iter())
let provider_peers = self.values_providers
.get(&key)
.into_iter()
.flat_map(|peers| peers)
.map(|peer_id| build_kad_peer(peer_id.clone(), parameters, &self.kbuckets, &self.connected_peers))
.collect();
KademliaHandlerIn::GetProvidersRes {
@ -245,8 +234,11 @@ impl<TSubstream> Kademlia<TSubstream> {
/// The actual meaning of *providing* the value of a key is not defined, and is specific to
/// the value whose key is the hash.
pub fn add_providing(&mut self, key: PeerId) {
if !self.providing_keys.iter().any(|k| k == &key) {
self.providing_keys.push(key);
self.providing_keys.insert(key.clone().into());
let providers = self.values_providers.entry(key.into()).or_insert_with(Default::default);
let my_id = self.kbuckets.my_id();
if !providers.iter().any(|k| k == my_id) {
providers.push(my_id.clone());
}
// Trigger the next refresh now.
@ -258,23 +250,30 @@ impl<TSubstream> Kademlia<TSubstream> {
/// There doesn't exist any "remove provider" message to broadcast on the network, therefore we
/// will still be registered as a provider in the DHT for as long as the timeout doesn't expire.
pub fn remove_providing(&mut self, key: &Multihash) {
if let Some(position) = self.providing_keys.iter().position(|k| k == key) {
self.providing_keys.remove(position);
self.providing_keys.remove(key);
let providers = match self.values_providers.get_mut(key) {
Some(p) => p,
None => return,
};
if let Some(position) = providers.iter().position(|k| k == key) {
providers.remove(position);
providers.shrink_to_fit();
}
}
/// Internal function that starts a query.
fn start_query(&mut self, target: QueryTarget, purpose: QueryPurpose) {
let query_id = self.next_query_id.clone();
let query_id = self.next_query_id;
self.next_query_id.0 += 1;
self.queries_to_starts.push((query_id, target, purpose));
}
}
impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for Kademlia<TSubstream>
impl<TSubstream> NetworkBehaviour for Kademlia<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TTopology: KademliaTopology,
{
type ProtocolsHandler = KademliaHandler<TSubstream, QueryId>;
type OutEvent = KademliaOut;
@ -283,6 +282,13 @@ where
KademliaHandler::dial_and_listen()
}
fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.kbuckets
.get(peer_id)
.map(|l| l.iter().cloned().collect::<Vec<_>>())
.unwrap_or_else(Vec::new)
}
fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
if let Some(pos) = self.pending_rpcs.iter().position(|(p, _)| p == &id) {
let (_, rpc) = self.pending_rpcs.remove(pos);
@ -292,6 +298,15 @@ where
});
}
match self.kbuckets.set_connected(&id) {
Update::Pending(to_ping) => {
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.clone(),
})
},
_ => ()
}
self.connected_peers.insert(id);
}
@ -317,10 +332,15 @@ where
// It is possible that we obtain a response for a query that has finished, which is
// why we may not find an entry in `self.active_queries`.
for peer in closer_peers.iter() {
for addr in peer.multiaddrs.iter() {
self.add_to_topology
.push((peer.node_id.clone(), addr.clone(), peer.connection_ty));
if let Some(entry) = self.kbuckets.entry_mut(&peer.node_id) {
entry.extend(peer.multiaddrs.iter().cloned());
}
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
peer_id: peer.node_id.clone(),
addresses: peer.multiaddrs.clone(),
ty: peer.connection_ty,
}));
}
if let Some((query, _, _)) = self.active_queries.get_mut(&user_data) {
query.inject_rpc_result(&source, closer_peers.into_iter().map(|kp| kp.node_id))
@ -336,11 +356,17 @@ where
user_data,
} => {
for peer in closer_peers.iter().chain(provider_peers.iter()) {
for addr in peer.multiaddrs.iter() {
self.add_to_topology
.push((peer.node_id.clone(), addr.clone(), peer.connection_ty));
if let Some(entry) = self.kbuckets.entry_mut(&peer.node_id) {
entry.extend(peer.multiaddrs.iter().cloned());
}
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
peer_id: peer.node_id.clone(),
addresses: peer.multiaddrs.clone(),
ty: peer.connection_ty,
}));
}
// It is possible that we obtain a response for a query that has finished, which is
// why we may not find an entry in `self.active_queries`.
if let Some((query, _, providers)) = self.active_queries.get_mut(&user_data) {
@ -358,10 +384,14 @@ where
}
}
KademliaHandlerEvent::AddProvider { key, provider_peer } => {
for addr in provider_peer.multiaddrs.iter() {
self.add_to_topology
.push((provider_peer.node_id.clone(), addr.clone(), provider_peer.connection_ty));
if let Some(entry) = self.kbuckets.entry_mut(&provider_peer.node_id) {
entry.extend(provider_peer.multiaddrs.iter().cloned());
}
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(KademliaOut::Discovered {
peer_id: provider_peer.node_id.clone(),
addresses: provider_peer.multiaddrs.clone(),
ty: provider_peer.connection_ty,
}));
self.add_provider.push((key, provider_peer.node_id));
return;
}
@ -370,7 +400,7 @@ where
fn poll(
&mut self,
parameters: &mut PollParameters<TTopology>,
parameters: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
@ -378,12 +408,15 @@ where
>,
> {
// Flush the changes to the topology that we want to make.
for (peer_id, addr, connection_ty) in self.add_to_topology.drain() {
parameters.topology().add_kad_discovered_address(peer_id, addr, connection_ty);
}
self.add_to_topology.shrink_to_fit();
for (key, provider) in self.add_provider.drain() {
parameters.topology().add_provider(key, provider);
// Don't add ourselves to the providers.
if provider == *self.kbuckets.my_id() {
continue;
}
let providers = self.values_providers.entry(key).or_insert_with(Default::default);
if !providers.iter().any(|k| k == &provider) {
providers.push(provider);
}
}
self.add_provider.shrink_to_fit();
@ -392,8 +425,11 @@ where
Ok(Async::NotReady) => {},
Ok(Async::Ready(Some(_))) => {
for provided in self.providing_keys.clone().into_iter() {
let purpose = QueryPurpose::AddProvider(provided.clone().into());
self.start_query(QueryTarget::FindPeer(provided), purpose);
let purpose = QueryPurpose::AddProvider(provided.clone());
// TODO: messy because of the PeerId/Multihash division
if let Ok(key_as_peer) = PeerId::from_multihash(provided) {
self.start_query(QueryTarget::FindPeer(key_as_peer), purpose);
}
}
},
// Ignore errors.
@ -402,9 +438,9 @@ where
// Start queries that are waiting to start.
for (query_id, query_target, query_purpose) in self.queries_to_starts.drain() {
let known_closest_peers = parameters
.topology()
.closest_peers(query_target.as_hash(), self.num_results);
let known_closest_peers = self.kbuckets
.find_closest(query_target.as_hash())
.take(self.num_results);
self.active_queries.insert(
query_id,
(
@ -508,7 +544,7 @@ where
peer_id: closest,
event: KademliaHandlerIn::AddProvider {
key: key.clone(),
provider_peer: self.build_local_kad_peer(parameters.external_addresses()),
provider_peer: build_kad_peer(parameters.local_peer_id().clone(), parameters, &self.kbuckets, &self.connected_peers),
},
};
@ -526,6 +562,16 @@ where
/// Output event of the `Kademlia` behaviour.
#[derive(Debug, Clone)]
pub enum KademliaOut {
/// We have discovered a node.
Discovered {
/// Id of the node that was discovered.
peer_id: PeerId,
/// Addresses of the node.
addresses: Vec<Multiaddr>,
/// How the reporter is connected to the reported.
ty: KadConnectionType,
},
/// Result of a `FIND_NODE` iterative query.
FindNodeResult {
/// The key that we looked for in the query.
@ -579,15 +625,33 @@ fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
}
/// Builds a `KadPeer` struct corresponding to the given `PeerId`.
/// The `PeerId` can be the same as the local one.
///
/// > **Note**: This is just a convenience function that doesn't do anything note-worthy.
fn build_kad_peer<TTopology>(peer_id: PeerId, topology: &mut TTopology, connected_peers: &FnvHashSet<PeerId>) -> KadPeer
where TTopology: Topology
{
let multiaddrs = topology.addresses_of_peer(&peer_id);
fn build_kad_peer(
peer_id: PeerId,
parameters: &mut PollParameters,
kbuckets: &KBucketsTable<PeerId, SmallVec<[Multiaddr; 4]>>,
connected_peers: &FnvHashSet<PeerId>
) -> KadPeer {
let is_self = peer_id == *parameters.local_peer_id();
let multiaddrs = if is_self {
let mut addrs = parameters
.listened_addresses()
.cloned()
.collect::<Vec<_>>();
addrs.extend(parameters.external_addresses());
addrs
} else {
kbuckets
.get(&peer_id)
.map(|addrs| addrs.iter().cloned().collect())
.unwrap_or_else(Vec::new)
};
// TODO: implement the other possibilities correctly
let connection_ty = if connected_peers.contains(&peer_id) {
let connection_ty = if is_self || connected_peers.contains(&peer_id) {
KadConnectionType::Connected
} else {
KadConnectionType::NotConnected

View File

@ -29,8 +29,8 @@
use arrayvec::ArrayVec;
use bigint::U512;
use libp2p_core::PeerId;
use multihash::Multihash;
use std::mem;
use std::slice::IterMut as SliceIterMut;
use std::time::{Duration, Instant};
use std::vec::IntoIter as VecIntoIter;
@ -40,40 +40,46 @@ pub const MAX_NODES_PER_BUCKET: usize = 20;
/// Table of k-buckets.
#[derive(Debug, Clone)]
pub struct KBucketsTable<Id, Val> {
pub struct KBucketsTable<TPeerId, TVal> {
/// Peer ID of the local node.
my_id: Id,
my_id: TPeerId,
/// The actual tables that store peers or values.
tables: Vec<KBucket<Id, Val>>,
// The timeout when pinging the first node after which we consider it unresponsive.
ping_timeout: Duration,
tables: Vec<KBucket<TPeerId, TVal>>,
/// The timeout when trying to reach the first node after which we consider it unresponsive.
unresponsive_timeout: Duration,
}
/// An individual table that stores peers or values.
#[derive(Debug, Clone)]
struct KBucket<Id, Val> {
/// Nodes are always ordered from oldest to newest.
/// Note that we will very often move elements to the end of this. No benchmarking has been
/// performed, but it is very likely that a `ArrayVec` is the most performant data structure.
nodes: ArrayVec<[Node<Id, Val>; MAX_NODES_PER_BUCKET]>,
struct KBucket<TPeerId, TVal> {
/// Nodes are always ordered from oldest to newest. The nodes we are connected to are always
/// all on top of the nodes we are not connected to.
nodes: ArrayVec<[Node<TPeerId, TVal>; MAX_NODES_PER_BUCKET]>,
/// Index in `nodes` over which all nodes are connected. Must always be <= to the length
/// of `nodes`.
first_connected_pos: usize,
/// Node received when the bucket was full. Will be added to the list if the first node doesn't
/// respond in time to our ping. The second element is the time when the pending node was added.
/// If it is too old we drop the first node and add the pending node to the
/// end of the list.
pending_node: Option<(Node<Id, Val>, Instant)>,
/// respond in time to our reach attempt. The second element is the time when the pending node
/// was added. If it is too old we drop the first node and add the pending node to the end of
/// the list.
pending_node: Option<(Node<TPeerId, TVal>, Instant)>,
/// Last time this bucket was updated.
last_update: Instant,
latest_update: Instant,
}
/// A single node in a k-bucket.
#[derive(Debug, Clone)]
struct Node<Id, Val> {
id: Id,
value: Val,
struct Node<TPeerId, TVal> {
/// Id of the node.
id: TPeerId,
/// Value associated to it.
value: TVal,
}
impl<Id, Val> KBucket<Id, Val> {
impl<TPeerId, TVal> KBucket<TPeerId, TVal> {
/// Puts the kbucket into a coherent state.
/// If a node is pending and the timeout has expired, removes the first element of `nodes`
/// and puts the node back in `pending_node`.
@ -90,9 +96,9 @@ impl<Id, Val> KBucket<Id, Val> {
}
/// Trait that must be implemented on types that can be used as an identifier in a k-bucket.
pub trait KBucketsPeerId: Eq + Clone {
pub trait KBucketsPeerId<TOther = Self>: PartialEq<TOther> + Clone {
/// Computes the XOR of this value and another one. The lower the closer.
fn distance_with(&self, other: &Self) -> u32;
fn distance_with(&self, other: &TOther) -> u32;
/// Returns then number of bits that are necessary to store the distance between peer IDs.
/// Used for pre-allocations.
@ -101,6 +107,30 @@ pub trait KBucketsPeerId: Eq + Clone {
fn max_distance() -> usize;
}
impl KBucketsPeerId for PeerId {
#[inline]
fn distance_with(&self, other: &Self) -> u32 {
Multihash::distance_with(self.as_ref(), other.as_ref())
}
#[inline]
fn max_distance() -> usize {
<Multihash as KBucketsPeerId>::max_distance()
}
}
impl KBucketsPeerId<Multihash> for PeerId {
#[inline]
fn distance_with(&self, other: &Multihash) -> u32 {
Multihash::distance_with(self.as_ref(), other)
}
#[inline]
fn max_distance() -> usize {
<Multihash as KBucketsPeerId>::max_distance()
}
}
impl KBucketsPeerId for Multihash {
#[inline]
fn distance_with(&self, other: &Self) -> u32 {
@ -118,22 +148,23 @@ impl KBucketsPeerId for Multihash {
}
}
impl<Id, Val> KBucketsTable<Id, Val>
impl<TPeerId, TVal> KBucketsTable<TPeerId, TVal>
where
Id: KBucketsPeerId,
TPeerId: KBucketsPeerId,
{
/// Builds a new routing table.
pub fn new(my_id: Id, ping_timeout: Duration) -> Self {
pub fn new(my_id: TPeerId, unresponsive_timeout: Duration) -> Self {
KBucketsTable {
my_id: my_id,
tables: (0..Id::max_distance())
my_id,
tables: (0..TPeerId::max_distance())
.map(|_| KBucket {
nodes: ArrayVec::new(),
first_connected_pos: 0,
pending_node: None,
last_update: Instant::now(),
latest_update: Instant::now(),
})
.collect(),
ping_timeout: ping_timeout,
unresponsive_timeout,
}
}
@ -141,7 +172,7 @@ where
//
// Returns `None` if out of range, which happens if `id` is the same as the local peer id.
#[inline]
fn bucket_num(&self, id: &Id) -> Option<usize> {
fn bucket_num(&self, id: &TPeerId) -> Option<usize> {
(self.my_id.distance_with(id) as usize).checked_sub(1)
}
@ -150,26 +181,189 @@ where
/// Ordered by proximity to the local node. Closest bucket (with max. one node in it) comes
/// first.
#[inline]
pub fn buckets(&mut self) -> BucketsIter<Id, Val> {
BucketsIter(self.tables.iter_mut(), self.ping_timeout)
pub fn buckets(&mut self) -> BucketsIter<TPeerId, TVal> {
BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout)
}
/// Returns the ID of the local node.
#[inline]
pub fn my_id(&self) -> &Id {
pub fn my_id(&self) -> &TPeerId {
&self.my_id
}
/// Finds the `num` nodes closest to `id`, ordered by distance.
pub fn find_closest(&mut self, id: &Id) -> VecIntoIter<Id>
/// Returns the value associated to a node, if any is present.
///
/// Does **not** include pending nodes.
pub fn get(&self, id: &TPeerId) -> Option<&TVal> {
let table = match self.bucket_num(&id) {
Some(n) => &self.tables[n],
None => return None,
};
for elem in &table.nodes {
if elem.id == *id {
return Some(&elem.value);
}
}
None
}
/// Returns the value associated to a node, if any is present.
///
/// Does **not** include pending nodes.
pub fn get_mut(&mut self, id: &TPeerId) -> Option<&mut TVal> {
let table = match self.bucket_num(&id) {
Some(n) => &mut self.tables[n],
None => return None,
};
table.flush(self.unresponsive_timeout);
for elem in &mut table.nodes {
if elem.id == *id {
return Some(&mut elem.value);
}
}
None
}
/// Returns the value associated to a node if any is present. Otherwise, tries to add the
/// node to the table in a disconnected state and return its value. Returns `None` if `id` is
/// the local peer, or if the table is full.
pub fn entry_mut(&mut self, id: &TPeerId) -> Option<&mut TVal>
where
Id: Clone,
TVal: Default,
{
if let Some((bucket, entry)) = self.entry_mut_inner(id) {
Some(&mut self.tables[bucket].nodes[entry].value)
} else {
None
}
}
/// Apparently non-lexical lifetimes still aren't working properly in some situations, so we
/// delegate `entry_mut` to this method that returns an index within `self.tables` and the
/// node index within that table.
fn entry_mut_inner(&mut self, id: &TPeerId) -> Option<(usize, usize)>
where
TVal: Default,
{
let (bucket_num, table) = match self.bucket_num(&id) {
Some(n) => (n, &mut self.tables[n]),
None => return None,
};
table.flush(self.unresponsive_timeout);
if let Some(pos) = table.nodes.iter().position(|elem| elem.id == *id) {
return Some((bucket_num, pos));
}
if !table.nodes.is_full() {
table.nodes.insert(table.first_connected_pos, Node {
id: id.clone(),
value: Default::default(),
});
table.first_connected_pos += 1;
table.latest_update = Instant::now();
return Some((bucket_num, table.first_connected_pos - 1));
}
None
}
/// Reports that we are connected to the given node.
///
/// This inserts the node in the k-buckets, if possible. If it is already in a k-bucket, puts
/// it above the disconnected nodes. If it is not already in a k-bucket, then the value will
/// be built with the `Default` trait.
pub fn set_connected(&mut self, id: &TPeerId) -> Update<TPeerId>
where
TVal: Default,
{
let table = match self.bucket_num(&id) {
Some(n) => &mut self.tables[n],
None => return Update::FailSelfUpdate,
};
table.flush(self.unresponsive_timeout);
if let Some(pos) = table.nodes.iter().position(|elem| elem.id == *id) {
// Node is already in the table; move it over `first_connected_pos` if necessary.
// We do a `saturating_sub(1)`, because if `first_connected_pos` is 0 then
// `pos < first_connected_pos` can never be true anyway.
if pos < table.first_connected_pos.saturating_sub(1) {
let elem = table.nodes.remove(pos);
table.first_connected_pos -= 1;
table.nodes.insert(table.first_connected_pos, elem);
}
table.latest_update = Instant::now();
Update::Updated
} else if !table.nodes.is_full() {
// Node is not in the table yet, but there's plenty of space for it.
table.nodes.insert(table.first_connected_pos, Node {
id: id.clone(),
value: Default::default(),
});
table.latest_update = Instant::now();
Update::Added
} else if table.first_connected_pos > 0 && table.pending_node.is_none() {
// Node is not in the table yet, but there could be room for it if we drop the first
// element. However we first add the node to add to `pending_node` and try to reconnect
// to the oldest node.
let pending_node = Node {
id: id.clone(),
value: Default::default(),
};
table.pending_node = Some((pending_node, Instant::now()));
Update::Pending(&table.nodes[0].id)
} else {
debug_assert!(table.first_connected_pos == 0 || table.pending_node.is_some());
Update::Discarded
}
}
/// Reports that we are now disconnected from the given node.
///
/// This does *not* remove the node from the k-buckets, but moves it underneath the nodes we
/// are still connected to.
pub fn set_disconnected(&mut self, id: &TPeerId) {
let table = match self.bucket_num(&id) {
Some(n) => &mut self.tables[n],
None => return,
};
table.flush(self.unresponsive_timeout);
let pos = match table.nodes.iter().position(|elem| elem.id == *id) {
Some(pos) => pos,
None => return,
};
if pos > table.first_connected_pos {
let elem = table.nodes.remove(pos);
table.nodes.insert(table.first_connected_pos, elem);
table.first_connected_pos += 1;
} else if pos == table.first_connected_pos {
table.first_connected_pos += 1;
}
}
/// Finds the `num` nodes closest to `id`, ordered by distance.
pub fn find_closest<TOther>(&mut self, id: &TOther) -> VecIntoIter<TPeerId>
where
TPeerId: Clone + KBucketsPeerId<TOther>,
{
// TODO: optimize
let mut out = Vec::new();
for table in self.tables.iter_mut() {
table.flush(self.ping_timeout);
if table.last_update.elapsed() > self.ping_timeout {
table.flush(self.unresponsive_timeout);
if table.latest_update.elapsed() > self.unresponsive_timeout {
continue; // ignore bucket with expired nodes
}
for node in table.nodes.iter() {
@ -181,15 +375,15 @@ where
}
/// Same as `find_closest`, but includes the local peer as well.
pub fn find_closest_with_self(&mut self, id: &Id) -> VecIntoIter<Id>
pub fn find_closest_with_self<TOther>(&mut self, id: &TOther) -> VecIntoIter<TPeerId>
where
Id: Clone,
TPeerId: Clone + KBucketsPeerId<TOther>,
{
// TODO: optimize
let mut intermediate: Vec<_> = self.find_closest(&id).collect();
let mut intermediate: Vec<_> = self.find_closest(id).collect();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(&id) >= self.my_id.distance_with(&id))
.position(|e| e.distance_with(id) >= self.my_id.distance_with(id))
{
if intermediate[pos] != self.my_id {
intermediate.insert(pos, self.my_id.clone());
@ -199,69 +393,18 @@ where
}
intermediate.into_iter()
}
/// Marks the node as "most recent" in its bucket and modifies the value associated to it.
/// This function should be called whenever we receive a communication from a node.
pub fn update(&mut self, id: Id, value: Val) -> UpdateOutcome<Id, Val> {
let table = match self.bucket_num(&id) {
Some(n) => &mut self.tables[n],
None => return UpdateOutcome::FailSelfUpdate,
};
table.flush(self.ping_timeout);
if let Some(pos) = table.nodes.iter().position(|n| n.id == id) {
// Node is already in the bucket.
let mut existing = table.nodes.remove(pos);
let old_val = mem::replace(&mut existing.value, value);
if pos == 0 {
// If it's the first node of the bucket that we update, then we drop the node that
// was waiting for a ping.
table.nodes.truncate(MAX_NODES_PER_BUCKET - 1);
table.pending_node = None;
}
table.nodes.push(existing);
table.last_update = Instant::now();
UpdateOutcome::Refreshed(old_val)
} else if table.nodes.len() < MAX_NODES_PER_BUCKET {
// Node not yet in the bucket, but there's plenty of space.
table.nodes.push(Node {
id: id,
value: value,
});
table.last_update = Instant::now();
UpdateOutcome::Added
} else {
// Not enough space to put the node, but we can add it to the end as "pending". We
// then need to tell the caller that we want it to ping the node at the top of the
// list.
if table.pending_node.is_none() {
table.pending_node = Some((
Node {
id: id,
value: value,
},
Instant::now(),
));
UpdateOutcome::NeedPing(table.nodes[0].id.clone())
} else {
UpdateOutcome::Discarded
}
}
}
}
/// Return value of the `update()` method.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
/// Return value of the `set_connected()` method.
#[derive(Debug)]
#[must_use]
pub enum UpdateOutcome<Id, Val> {
pub enum Update<'a, TPeerId> {
/// The node has been added to the bucket.
Added,
/// The node was already in the bucket and has been refreshed.
Refreshed(Val),
/// The node wasn't added. Instead we need to ping the node passed as parameter, and call
/// `update` if it responds.
NeedPing(Id),
/// The node was already in the bucket and has been updated.
Updated,
/// The node has been added as pending. We need to try connect to the node passed as parameter.
Pending(&'a TPeerId),
/// The node wasn't added at all because a node was already pending.
Discarded,
/// Tried to update the local peer ID. This is an invalid operation.
@ -269,10 +412,10 @@ pub enum UpdateOutcome<Id, Val> {
}
/// Iterator giving access to a bucket.
pub struct BucketsIter<'a, Id: 'a, Val: 'a>(SliceIterMut<'a, KBucket<Id, Val>>, Duration);
pub struct BucketsIter<'a, TPeerId: 'a, TVal: 'a>(SliceIterMut<'a, KBucket<TPeerId, TVal>>, Duration);
impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> {
type Item = Bucket<'a, Id, Val>;
impl<'a, TPeerId: 'a, TVal: 'a> Iterator for BucketsIter<'a, TPeerId, TVal> {
type Item = Bucket<'a, TPeerId, TVal>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
@ -288,12 +431,12 @@ impl<'a, Id: 'a, Val: 'a> Iterator for BucketsIter<'a, Id, Val> {
}
}
impl<'a, Id: 'a, Val: 'a> ExactSizeIterator for BucketsIter<'a, Id, Val> {}
impl<'a, TPeerId: 'a, TVal: 'a> ExactSizeIterator for BucketsIter<'a, TPeerId, TVal> {}
/// Access to a bucket.
pub struct Bucket<'a, Id: 'a, Val: 'a>(&'a mut KBucket<Id, Val>);
pub struct Bucket<'a, TPeerId: 'a, TVal: 'a>(&'a mut KBucket<TPeerId, TVal>);
impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> {
impl<'a, TPeerId: 'a, TVal: 'a> Bucket<'a, TPeerId, TVal> {
/// Returns the number of entries in that bucket.
///
/// > **Note**: Keep in mind that this operation can be racy. If `update()` is called on the
@ -314,8 +457,8 @@ impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> {
///
/// If the bucket is empty, this returns the time when the whole table was created.
#[inline]
pub fn last_update(&self) -> Instant {
self.0.last_update.clone()
pub fn latest_update(&self) -> Instant {
self.0.latest_update
}
}
@ -323,8 +466,7 @@ impl<'a, Id: 'a, Val: 'a> Bucket<'a, Id, Val> {
mod tests {
extern crate rand;
use self::rand::random;
use crate::kbucket::{KBucketsPeerId, KBucketsTable};
use crate::kbucket::{UpdateOutcome, MAX_NODES_PER_BUCKET};
use crate::kbucket::{KBucketsPeerId, KBucketsTable, Update, MAX_NODES_PER_BUCKET};
use multihash::{Multihash, Hash};
use std::thread;
use std::time::Duration;
@ -334,8 +476,8 @@ mod tests {
let my_id = Multihash::random(Hash::SHA2256);
let other_id = Multihash::random(Hash::SHA2256);
let mut table = KBucketsTable::new(my_id, Duration::from_secs(5));
let _ = table.update(other_id.clone(), ());
let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5));
table.entry_mut(&other_id);
let res = table.find_closest(&other_id).collect::<Vec<_>>();
assert_eq!(res.len(), 1);
@ -346,9 +488,10 @@ mod tests {
fn update_local_id_fails() {
let my_id = Multihash::random(Hash::SHA2256);
let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(5));
match table.update(my_id, ()) {
UpdateOutcome::FailSelfUpdate => (),
let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(5));
assert!(table.entry_mut(&my_id).is_none());
match table.set_connected(&my_id) {
Update::FailSelfUpdate => (),
_ => panic!(),
}
}
@ -367,15 +510,15 @@ mod tests {
})
.collect::<Vec<_>>();
let mut table = KBucketsTable::new(my_id, Duration::from_secs(5));
let before_update = table.buckets().map(|b| b.last_update()).collect::<Vec<_>>();
let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5));
let before_update = table.buckets().map(|b| b.latest_update()).collect::<Vec<_>>();
thread::sleep(Duration::from_secs(2));
for &(ref id, _) in &other_ids {
let _ = table.update(id.clone(), ());
table.entry_mut(&id);
}
let after_update = table.buckets().map(|b| b.last_update()).collect::<Vec<_>>();
let after_update = table.buckets().map(|b| b.latest_update()).collect::<Vec<_>>();
for (offset, (bef, aft)) in before_update.iter().zip(after_update.iter()).enumerate() {
if other_ids.iter().any(|&(_, bucket)| bucket == offset) {
@ -403,10 +546,14 @@ mod tests {
let first_node = fill_ids[0].clone();
let second_node = fill_ids[1].clone();
let mut table = KBucketsTable::new(my_id.clone(), Duration::from_secs(1));
let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(1));
for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() {
assert_eq!(table.update(id, ()), UpdateOutcome::Added);
match table.set_connected(&id) {
Update::Added => (),
_ => panic!()
}
table.set_disconnected(&id);
assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1);
}
@ -415,27 +562,31 @@ mod tests {
MAX_NODES_PER_BUCKET
);
assert!(!table.buckets().nth(255).unwrap().has_pending());
assert_eq!(
table.update(fill_ids.remove(0), ()),
UpdateOutcome::NeedPing(first_node)
);
match table.set_connected(&fill_ids.remove(0)) {
Update::Pending(to_ping) => {
assert_eq!(*to_ping, first_node);
},
_ => panic!()
}
assert_eq!(
table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET
);
assert!(table.buckets().nth(255).unwrap().has_pending());
assert_eq!(
table.update(fill_ids.remove(0), ()),
UpdateOutcome::Discarded
);
match table.set_connected(&fill_ids.remove(0)) {
Update::Discarded => (),
_ => panic!()
}
thread::sleep(Duration::from_secs(2));
assert!(!table.buckets().nth(255).unwrap().has_pending());
assert_eq!(
table.update(fill_ids.remove(0), ()),
UpdateOutcome::NeedPing(second_node)
);
match table.set_connected(&fill_ids.remove(0)) {
Update::Pending(to_ping) => {
assert_eq!(*to_ping, second_node);
},
_ => panic!()
}
}
#[test]

View File

@ -85,7 +85,6 @@ extern crate tokio;
pub use self::behaviour::{Kademlia, KademliaOut};
pub use self::kbucket::KBucketsPeerId;
pub use self::protocol::KadConnectionType;
pub use self::topology::KademliaTopology;
pub mod handler;
pub mod kbucket;
@ -94,4 +93,3 @@ pub mod protocol;
mod behaviour;
mod protobuf_structs;
mod query;
mod topology;

View File

@ -1,87 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::kbucket::KBucketsPeerId;
use crate::protocol::KadConnectionType;
use libp2p_core::{Multiaddr, PeerId, topology::MemoryTopology, topology::Topology};
use multihash::Multihash;
use std::vec;
/// Trait allowing retreival of information necessary for the Kadmelia system to work.
pub trait KademliaTopology: Topology {
/// Iterator returned by `closest_peers`.
type ClosestPeersIter: Iterator<Item = PeerId>;
/// Iterator returned by `get_providers`.
type GetProvidersIter: Iterator<Item = PeerId>;
/// Adds an address discovered through Kademlia to the topology.
///
/// > **Note**: Keep in mind that `peer` can be the local peer.
fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr,
connection_ty: KadConnectionType);
/// Returns the known peers closest by XOR distance to the `target`.
///
/// The `max` parameter is the maximum number of results that we are going to use. If more
/// than `max` elements are returned, they will be ignored.
///
/// > **Note**: The results should include the local node.
fn closest_peers(&mut self, target: &Multihash, max: usize) -> Self::ClosestPeersIter;
/// Registers the given peer as provider of the resource with the given ID.
///
/// > **Note**: There is no `remove_provider` method. Implementations must include a
/// > time-to-live system so that entries disappear after a while.
// TODO: specify the TTL? it has to match the timeout in the behaviour somehow, but this could
// also be handled by the user
fn add_provider(&mut self, key: Multihash, peer_id: PeerId);
/// Returns the list of providers that have been registered with `add_provider`.
///
/// If the local node is a provider for `key`, our local peer ID should also be returned.
fn get_providers(&mut self, key: &Multihash) -> Self::GetProvidersIter;
}
// TODO: stupid idea to implement on `MemoryTopology`
impl KademliaTopology for MemoryTopology {
type ClosestPeersIter = vec::IntoIter<PeerId>;
type GetProvidersIter = vec::IntoIter<PeerId>;
fn add_kad_discovered_address(&mut self, peer: PeerId, addr: Multiaddr, _: KadConnectionType) {
if &peer != self.local_peer_id() {
self.add_address(peer, addr)
}
}
fn closest_peers(&mut self, target: &Multihash, _: usize) -> Self::ClosestPeersIter {
let mut list = self.peers().cloned().collect::<Vec<_>>();
list.sort_by(|a, b| target.distance_with(b.as_ref()).cmp(&target.distance_with(a.as_ref())));
list.into_iter()
}
fn add_provider(&mut self, _: Multihash, _: PeerId) {
unimplemented!()
}
fn get_providers(&mut self, _: &Multihash) -> Self::GetProvidersIter {
unimplemented!()
}
}

View File

@ -38,7 +38,7 @@ pub mod protocol;
use futures::prelude::*;
use libp2p_core::either::EitherOutput;
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::ProtocolsHandlerSelect, PeerId};
use libp2p_core::{protocols_handler::ProtocolsHandler, protocols_handler::ProtocolsHandlerSelect, Multiaddr, PeerId};
use std::{marker::PhantomData, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
@ -81,7 +81,7 @@ impl<TSubstream> Default for Ping<TSubstream> {
}
}
impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for Ping<TSubstream>
impl<TSubstream> NetworkBehaviour for Ping<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
@ -93,6 +93,10 @@ where
.select(dial_handler::PeriodicPingHandler::new())
}
fn addresses_of_peer(&self, peer_id: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
@ -112,7 +116,7 @@ where
fn poll(
&mut self,
_: &mut PollParameters<TTopology>,
_: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,