Enhance the swarm a bit (#711)

* Replace the &mut TTopology with a &mut PollParameters

* Add supported_protocols

* Add external_addresses

* Report out addresses in Kademlia

* Fix the custom derive

* Some comments

* Fix compilation on stable
This commit is contained in:
Pierre Krieger 2018-12-01 13:34:57 +01:00 committed by GitHub
parent 69eafd9869
commit c5d08ab48c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 113 additions and 26 deletions

View File

@ -30,6 +30,7 @@ use crate::{
topology::Topology
};
use futures::prelude::*;
use smallvec::SmallVec;
use std::{fmt, io, ops::{Deref, DerefMut}};
pub use crate::nodes::raw_swarm::ConnectedPoint;
@ -53,6 +54,12 @@ where TTransport: Transport,
/// Holds the topology of the network. In other words all the nodes that we think exist, even
/// if we're not connected to them.
topology: TTopology,
/// List of protocols that the behaviour says it supports.
supported_protocols: SmallVec<[Vec<u8>; 16]>,
/// List of multiaddresses we're listening on after NAT traversal.
external_addresses: SmallVec<[Multiaddr; 8]>,
}
impl<TTransport, TBehaviour, TTopology> Deref for Swarm<TTransport, TBehaviour, TTopology>
@ -105,12 +112,21 @@ where TBehaviour: NetworkBehaviour<TTopology>,
{
/// Builds a new `Swarm`.
#[inline]
pub fn new(transport: TTransport, behaviour: TBehaviour, topology: TTopology) -> Self {
pub fn new(transport: TTransport, mut behaviour: TBehaviour, topology: TTopology) -> Self {
let supported_protocols = behaviour
.new_handler()
.listen_protocol()
.protocol_names()
.map(|(name, _)| name.to_vec())
.collect();
let raw_swarm = RawSwarm::new(transport);
Swarm {
raw_swarm,
behaviour,
topology,
supported_protocols,
external_addresses: SmallVec::new(),
}
}
@ -230,7 +246,16 @@ where TBehaviour: NetworkBehaviour<TTopology>,
Async::Ready(RawSwarmEvent::UnknownPeerDialError { .. }) => {},
}
match self.behaviour.poll(&mut self.topology) {
let behaviour_poll = {
let mut parameters = PollParameters {
topology: &mut self.topology,
supported_protocols: &self.supported_protocols,
external_addresses: &self.external_addresses,
};
self.behaviour.poll(&mut parameters)
};
match behaviour_poll {
Async::NotReady if raw_swarm_not_ready => return Ok(Async::NotReady),
Async::NotReady => (),
Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
@ -247,6 +272,13 @@ where TBehaviour: NetworkBehaviour<TTopology>,
peer.send_event(event);
}
},
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => {
for addr in self.raw_swarm.nat_traversal(&address) {
// TODO: is it a good idea to add these addresses permanently? what about
// a TTL instead?
self.external_addresses.push(addr);
}
},
}
}
}
@ -286,7 +318,42 @@ pub trait NetworkBehaviour<TTopology> {
/// Polls for things that swarm should do.
///
/// This API mimics the API of the `Stream` trait.
fn poll(&mut self, topology: &mut TTopology) -> Async<NetworkBehaviourAction<<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>>;
fn poll(&mut self, topology: &mut PollParameters<TTopology>) -> Async<NetworkBehaviourAction<<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent>>;
}
/// Parameters passed to `poll()` that the `NetworkBehaviour` has access to.
#[derive(Debug)]
pub struct PollParameters<'a, TTopology: 'a> {
topology: &'a mut TTopology,
supported_protocols: &'a [Vec<u8>],
external_addresses: &'a [Multiaddr],
}
impl<'a, TTopology> PollParameters<'a, TTopology> {
/// Returns a reference to the topology of the network.
#[inline]
pub fn topology(&mut self) -> &mut TTopology {
&mut self.topology
}
/// Returns the list of protocol the behaviour supports when a remote negotiates a protocol on
/// an inbound substream.
///
/// The iterator's elements are the ASCII names as reported on the wire.
///
/// Note that the list is computed once at initialization and never refreshed.
#[inline]
pub fn supported_protocols(&self) -> impl ExactSizeIterator<Item = &[u8]> {
self.supported_protocols.iter().map(AsRef::as_ref)
}
/// Returns the list of the addresses we're listening on, after accounting for NAT traversal.
///
/// This corresponds to the elements produced with `ReportObservedAddr`.
#[inline]
pub fn external_addresses(&self) -> impl ExactSizeIterator<Item = &Multiaddr> {
self.external_addresses.iter()
}
}
/// Action to perform.
@ -319,4 +386,12 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
/// Event to send to the peer.
event: TInEvent,
},
/// Reports that a remote observes us as this address.
///
/// The swarm will pass this address through the transport's NAT traversal.
ReportObservedAddr {
/// The address we're being observed as.
address: Multiaddr,
},
}

View File

@ -79,6 +79,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
quote!{#n}
};
let poll_parameters = quote!{::libp2p::core::swarm::PollParameters<#topology_generic>};
// Build the generics.
let impl_generics = {
let tp = ast.generics.type_params();
@ -306,7 +308,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
Some(quote!{
loop {
match #field_name.poll(topology) {
match #field_name.poll(poll_params) {
Async::Ready(#network_behaviour_action::GenerateEvent(event)) => {
#handling
}
@ -322,6 +324,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
event: #wrapped_event,
});
}
Async::Ready(#network_behaviour_action::ReportObservedAddr { address }) => {
return Async::Ready(#network_behaviour_action::ReportObservedAddr { address });
}
Async::NotReady => break,
}
}
@ -363,7 +368,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}
}
fn poll(&mut self, topology: &mut #topology_generic) -> ::libp2p::futures::Async<#network_behaviour_action<<Self::ProtocolsHandler as #protocols_handler>::InEvent, Self::OutEvent>> {
fn poll(&mut self, poll_params: &mut #poll_parameters) -> ::libp2p::futures::Async<#network_behaviour_action<<Self::ProtocolsHandler as #protocols_handler>::InEvent, Self::OutEvent>> {
use libp2p::futures::prelude::*;
#(#poll_stmts)*
let f: ::libp2p::futures::Async<#network_behaviour_action<<Self::ProtocolsHandler as #protocols_handler>::InEvent, Self::OutEvent>> = #poll_method;

View File

@ -21,7 +21,7 @@
use cuckoofilter::CuckooFilter;
use futures::prelude::*;
use handler::FloodsubHandler;
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId};
use protocol::{FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use rand;
@ -276,7 +276,7 @@ where
fn poll(
&mut self,
_: &mut TTopology,
_: &mut PollParameters<TTopology>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use smallvec::SmallVec;
use std::collections::HashMap;
@ -99,7 +99,7 @@ where
fn poll(
&mut self,
_: &mut TTopology,
_: &mut PollParameters<TTopology>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use std::{collections::VecDeque, marker::PhantomData};
use tokio_io::{AsyncRead, AsyncWrite};
@ -79,7 +79,7 @@ where
fn poll(
&mut self,
_: &mut TTopology,
_: &mut PollParameters<TTopology>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,

View File

@ -21,7 +21,7 @@
use fnv::{FnvHashMap, FnvHashSet};
use futures::{prelude::*, stream};
use handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn, KademliaRequestId};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, topology::Topology, Multiaddr, PeerId};
use multihash::Multihash;
use protocol::{KadConnectionType, KadPeer};
@ -161,21 +161,25 @@ impl<TSubstream> Kademlia<TSubstream> {
}
/// Builds a `KadPeer` structure corresponding to the local node.
fn build_local_kad_peer(&self) -> KadPeer {
fn build_local_kad_peer<'a>(&self, local_addrs: impl IntoIterator<Item = &'a Multiaddr>) -> KadPeer {
KadPeer {
node_id: self.local_peer_id.clone(),
multiaddrs: Vec::new(), // FIXME: return the addresses we're listening on
multiaddrs: local_addrs.into_iter().cloned().collect(),
connection_ty: KadConnectionType::Connected,
}
}
/// Builds the answer to a request.
fn build_result<TUserData, TTopology>(&self, query: QueryTarget, request_id: KademliaRequestId, topology: &mut TTopology)
fn build_result<TUserData, TTopology>(&self, query: QueryTarget, request_id: KademliaRequestId, parameters: &mut PollParameters<TTopology>)
-> KademliaHandlerIn<TUserData>
where TTopology: KademliaTopology
{
let local_kad_peer = self.build_local_kad_peer(parameters.external_addresses());
match query {
QueryTarget::FindPeer(key) => {
let mut topology = parameters.topology();
// TODO: insert local_kad_peer somewhere?
let closer_peers = topology
.closest_peers(key.as_ref(), self.num_results)
.map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers))
@ -187,6 +191,8 @@ impl<TSubstream> Kademlia<TSubstream> {
}
},
QueryTarget::GetProviders(key) => {
let mut topology = parameters.topology();
// TODO: insert local_kad_peer somewhere?
let closer_peers = topology
.closest_peers(&key, self.num_results)
.map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers))
@ -198,7 +204,7 @@ impl<TSubstream> Kademlia<TSubstream> {
.get_providers(&key)
.map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers))
.chain(if local_node_is_providing {
Some(self.build_local_kad_peer())
Some(local_kad_peer)
} else {
None
}.into_iter())
@ -364,7 +370,7 @@ where
fn poll(
&mut self,
topology: &mut TTopology,
parameters: &mut PollParameters<TTopology>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
@ -373,11 +379,11 @@ where
> {
// Flush the changes to the topology that we want to make.
for (peer_id, addr, connection_ty) in self.add_to_topology.drain() {
topology.add_kad_discovered_address(peer_id, addr, connection_ty);
parameters.topology().add_kad_discovered_address(peer_id, addr, connection_ty);
}
self.add_to_topology.shrink_to_fit();
for (key, provider) in self.add_provider.drain() {
topology.add_provider(key, provider);
parameters.topology().add_provider(key, provider);
}
self.add_provider.shrink_to_fit();
@ -396,7 +402,8 @@ where
// Start queries that are waiting to start.
for (query_id, query_target, query_purpose) in self.queries_to_starts.drain() {
let known_closest_peers = topology
let known_closest_peers = parameters
.topology()
.closest_peers(query_target.as_hash(), self.num_results);
self.active_queries.insert(
query_id,
@ -418,7 +425,7 @@ where
// Handle remote queries.
if !self.remote_requests.is_empty() {
let (peer_id, request_id, query) = self.remote_requests.remove(0);
let result = self.build_result(query, request_id, topology);
let result = self.build_result(query, request_id, parameters);
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id,
event: result,
@ -501,7 +508,7 @@ where
peer_id: closest,
event: KademliaHandlerIn::AddProvider {
key: key.clone(),
provider_peer: self.build_local_kad_peer(),
provider_peer: self.build_local_kad_peer(parameters.external_addresses()),
},
};

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId};
use std::marker::PhantomData;
use tokio_io::{AsyncRead, AsyncWrite};
@ -72,7 +72,7 @@ where
fn poll(
&mut self,
_: &mut TTopology,
_: &mut PollParameters<TTopology>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId};
use std::marker::PhantomData;
use tokio_io::{AsyncRead, AsyncWrite};
@ -72,7 +72,7 @@ where
fn poll(
&mut self,
_: &mut TTopology,
_: &mut PollParameters<TTopology>,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,