From c5d08ab48c31971e7e63fd9c17833c90de46e3be Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Sat, 1 Dec 2018 13:34:57 +0100 Subject: [PATCH] Enhance the swarm a bit (#711) * Replace the &mut TTopology with a &mut PollParameters * Add supported_protocols * Add external_addresses * Report out addresses in Kademlia * Fix the custom derive * Some comments * Fix compilation on stable --- core/src/swarm.rs | 81 ++++++++++++++++++++- misc/core-derive/src/lib.rs | 9 ++- protocols/floodsub/src/layer.rs | 4 +- protocols/identify/src/listen_layer.rs | 4 +- protocols/identify/src/periodic_id_layer.rs | 4 +- protocols/kad/src/behaviour.rs | 29 +++++--- protocols/ping/src/dial_layer.rs | 4 +- protocols/ping/src/listen_layer.rs | 4 +- 8 files changed, 113 insertions(+), 26 deletions(-) diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 5f687167..55c535df 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -30,6 +30,7 @@ use crate::{ topology::Topology }; use futures::prelude::*; +use smallvec::SmallVec; use std::{fmt, io, ops::{Deref, DerefMut}}; pub use crate::nodes::raw_swarm::ConnectedPoint; @@ -53,6 +54,12 @@ where TTransport: Transport, /// Holds the topology of the network. In other words all the nodes that we think exist, even /// if we're not connected to them. topology: TTopology, + + /// List of protocols that the behaviour says it supports. + supported_protocols: SmallVec<[Vec; 16]>, + + /// List of multiaddresses we're listening on after NAT traversal. + external_addresses: SmallVec<[Multiaddr; 8]>, } impl Deref for Swarm @@ -105,12 +112,21 @@ where TBehaviour: NetworkBehaviour, { /// Builds a new `Swarm`. #[inline] - pub fn new(transport: TTransport, behaviour: TBehaviour, topology: TTopology) -> Self { + pub fn new(transport: TTransport, mut behaviour: TBehaviour, topology: TTopology) -> Self { + let supported_protocols = behaviour + .new_handler() + .listen_protocol() + .protocol_names() + .map(|(name, _)| name.to_vec()) + .collect(); + let raw_swarm = RawSwarm::new(transport); Swarm { raw_swarm, behaviour, topology, + supported_protocols, + external_addresses: SmallVec::new(), } } @@ -230,7 +246,16 @@ where TBehaviour: NetworkBehaviour, Async::Ready(RawSwarmEvent::UnknownPeerDialError { .. }) => {}, } - match self.behaviour.poll(&mut self.topology) { + let behaviour_poll = { + let mut parameters = PollParameters { + topology: &mut self.topology, + supported_protocols: &self.supported_protocols, + external_addresses: &self.external_addresses, + }; + self.behaviour.poll(&mut parameters) + }; + + match behaviour_poll { Async::NotReady if raw_swarm_not_ready => return Ok(Async::NotReady), Async::NotReady => (), Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { @@ -247,6 +272,13 @@ where TBehaviour: NetworkBehaviour, peer.send_event(event); } }, + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => { + for addr in self.raw_swarm.nat_traversal(&address) { + // TODO: is it a good idea to add these addresses permanently? what about + // a TTL instead? + self.external_addresses.push(addr); + } + }, } } } @@ -286,7 +318,42 @@ pub trait NetworkBehaviour { /// Polls for things that swarm should do. /// /// This API mimics the API of the `Stream` trait. - fn poll(&mut self, topology: &mut TTopology) -> Async::InEvent, Self::OutEvent>>; + fn poll(&mut self, topology: &mut PollParameters) -> Async::InEvent, Self::OutEvent>>; +} + +/// Parameters passed to `poll()` that the `NetworkBehaviour` has access to. +#[derive(Debug)] +pub struct PollParameters<'a, TTopology: 'a> { + topology: &'a mut TTopology, + supported_protocols: &'a [Vec], + external_addresses: &'a [Multiaddr], +} + +impl<'a, TTopology> PollParameters<'a, TTopology> { + /// Returns a reference to the topology of the network. + #[inline] + pub fn topology(&mut self) -> &mut TTopology { + &mut self.topology + } + + /// Returns the list of protocol the behaviour supports when a remote negotiates a protocol on + /// an inbound substream. + /// + /// The iterator's elements are the ASCII names as reported on the wire. + /// + /// Note that the list is computed once at initialization and never refreshed. + #[inline] + pub fn supported_protocols(&self) -> impl ExactSizeIterator { + self.supported_protocols.iter().map(AsRef::as_ref) + } + + /// Returns the list of the addresses we're listening on, after accounting for NAT traversal. + /// + /// This corresponds to the elements produced with `ReportObservedAddr`. + #[inline] + pub fn external_addresses(&self) -> impl ExactSizeIterator { + self.external_addresses.iter() + } } /// Action to perform. @@ -319,4 +386,12 @@ pub enum NetworkBehaviourAction { /// Event to send to the peer. event: TInEvent, }, + + /// Reports that a remote observes us as this address. + /// + /// The swarm will pass this address through the transport's NAT traversal. + ReportObservedAddr { + /// The address we're being observed as. + address: Multiaddr, + }, } diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index 078ffcec..ed3ea742 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -79,6 +79,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { quote!{#n} }; + let poll_parameters = quote!{::libp2p::core::swarm::PollParameters<#topology_generic>}; + // Build the generics. let impl_generics = { let tp = ast.generics.type_params(); @@ -306,7 +308,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { Some(quote!{ loop { - match #field_name.poll(topology) { + match #field_name.poll(poll_params) { Async::Ready(#network_behaviour_action::GenerateEvent(event)) => { #handling } @@ -322,6 +324,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { event: #wrapped_event, }); } + Async::Ready(#network_behaviour_action::ReportObservedAddr { address }) => { + return Async::Ready(#network_behaviour_action::ReportObservedAddr { address }); + } Async::NotReady => break, } } @@ -363,7 +368,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { } } - fn poll(&mut self, topology: &mut #topology_generic) -> ::libp2p::futures::Async<#network_behaviour_action<::InEvent, Self::OutEvent>> { + fn poll(&mut self, poll_params: &mut #poll_parameters) -> ::libp2p::futures::Async<#network_behaviour_action<::InEvent, Self::OutEvent>> { use libp2p::futures::prelude::*; #(#poll_stmts)* let f: ::libp2p::futures::Async<#network_behaviour_action<::InEvent, Self::OutEvent>> = #poll_method; diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 70c81f95..ed7fd694 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -21,7 +21,7 @@ use cuckoofilter::CuckooFilter; use futures::prelude::*; use handler::FloodsubHandler; -use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; +use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId}; use protocol::{FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction}; use rand; @@ -276,7 +276,7 @@ where fn poll( &mut self, - _: &mut TTopology, + _: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, diff --git a/protocols/identify/src/listen_layer.rs b/protocols/identify/src/listen_layer.rs index 6e4d7b22..ed8e5fbe 100644 --- a/protocols/identify/src/listen_layer.rs +++ b/protocols/identify/src/listen_layer.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; +use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use smallvec::SmallVec; use std::collections::HashMap; @@ -99,7 +99,7 @@ where fn poll( &mut self, - _: &mut TTopology, + _: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, diff --git a/protocols/identify/src/periodic_id_layer.rs b/protocols/identify/src/periodic_id_layer.rs index 034c0640..b3bf898f 100644 --- a/protocols/identify/src/periodic_id_layer.rs +++ b/protocols/identify/src/periodic_id_layer.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; +use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use std::{collections::VecDeque, marker::PhantomData}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -79,7 +79,7 @@ where fn poll( &mut self, - _: &mut TTopology, + _: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 248d8f93..9972ce69 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -21,7 +21,7 @@ use fnv::{FnvHashMap, FnvHashSet}; use futures::{prelude::*, stream}; use handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId}; -use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; +use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{protocols_handler::ProtocolsHandler, topology::Topology, Multiaddr, PeerId}; use multihash::Multihash; use protocol::{KadConnectionType, KadPeer}; @@ -161,21 +161,25 @@ impl Kademlia { } /// Builds a `KadPeer` structure corresponding to the local node. - fn build_local_kad_peer(&self) -> KadPeer { + fn build_local_kad_peer<'a>(&self, local_addrs: impl IntoIterator) -> KadPeer { KadPeer { node_id: self.local_peer_id.clone(), - multiaddrs: Vec::new(), // FIXME: return the addresses we're listening on + multiaddrs: local_addrs.into_iter().cloned().collect(), connection_ty: KadConnectionType::Connected, } } /// Builds the answer to a request. - fn build_result(&self, query: QueryTarget, request_id: KademliaRequestId, topology: &mut TTopology) + fn build_result(&self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters) -> KademliaHandlerIn where TTopology: KademliaTopology { + let local_kad_peer = self.build_local_kad_peer(parameters.external_addresses()); + match query { QueryTarget::FindPeer(key) => { + let mut topology = parameters.topology(); + // TODO: insert local_kad_peer somewhere? let closer_peers = topology .closest_peers(key.as_ref(), self.num_results) .map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers)) @@ -187,6 +191,8 @@ impl Kademlia { } }, QueryTarget::GetProviders(key) => { + let mut topology = parameters.topology(); + // TODO: insert local_kad_peer somewhere? let closer_peers = topology .closest_peers(&key, self.num_results) .map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers)) @@ -198,7 +204,7 @@ impl Kademlia { .get_providers(&key) .map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers)) .chain(if local_node_is_providing { - Some(self.build_local_kad_peer()) + Some(local_kad_peer) } else { None }.into_iter()) @@ -364,7 +370,7 @@ where fn poll( &mut self, - topology: &mut TTopology, + parameters: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, @@ -373,11 +379,11 @@ where > { // Flush the changes to the topology that we want to make. for (peer_id, addr, connection_ty) in self.add_to_topology.drain() { - topology.add_kad_discovered_address(peer_id, addr, connection_ty); + parameters.topology().add_kad_discovered_address(peer_id, addr, connection_ty); } self.add_to_topology.shrink_to_fit(); for (key, provider) in self.add_provider.drain() { - topology.add_provider(key, provider); + parameters.topology().add_provider(key, provider); } self.add_provider.shrink_to_fit(); @@ -396,7 +402,8 @@ where // Start queries that are waiting to start. for (query_id, query_target, query_purpose) in self.queries_to_starts.drain() { - let known_closest_peers = topology + let known_closest_peers = parameters + .topology() .closest_peers(query_target.as_hash(), self.num_results); self.active_queries.insert( query_id, @@ -418,7 +425,7 @@ where // Handle remote queries. if !self.remote_requests.is_empty() { let (peer_id, request_id, query) = self.remote_requests.remove(0); - let result = self.build_result(query, request_id, topology); + let result = self.build_result(query, request_id, parameters); return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event: result, @@ -501,7 +508,7 @@ where peer_id: closest, event: KademliaHandlerIn::AddProvider { key: key.clone(), - provider_peer: self.build_local_kad_peer(), + provider_peer: self.build_local_kad_peer(parameters.external_addresses()), }, }; diff --git a/protocols/ping/src/dial_layer.rs b/protocols/ping/src/dial_layer.rs index ea022bf8..f922009f 100644 --- a/protocols/ping/src/dial_layer.rs +++ b/protocols/ping/src/dial_layer.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; +use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId}; use std::marker::PhantomData; use tokio_io::{AsyncRead, AsyncWrite}; @@ -72,7 +72,7 @@ where fn poll( &mut self, - _: &mut TTopology, + _: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent, diff --git a/protocols/ping/src/listen_layer.rs b/protocols/ping/src/listen_layer.rs index 8027e4d8..32546cb7 100644 --- a/protocols/ping/src/listen_layer.rs +++ b/protocols/ping/src/listen_layer.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; +use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId}; use std::marker::PhantomData; use tokio_io::{AsyncRead, AsyncWrite}; @@ -72,7 +72,7 @@ where fn poll( &mut self, - _: &mut TTopology, + _: &mut PollParameters, ) -> Async< NetworkBehaviourAction< ::InEvent,