diff --git a/core/src/lib.rs b/core/src/lib.rs index dbe2426a..11d279b4 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -212,6 +212,7 @@ mod tests; pub mod either; pub mod muxing; pub mod nodes; +pub mod topology; pub mod transport; pub mod upgrade; diff --git a/core/src/nodes/mod.rs b/core/src/nodes/mod.rs index 2636de35..3db430bb 100644 --- a/core/src/nodes/mod.rs +++ b/core/src/nodes/mod.rs @@ -25,8 +25,10 @@ pub mod listeners; pub mod node; pub mod protocols_handler; pub mod raw_swarm; +pub mod swarm; pub use self::node::Substream; pub use self::handled_node::{NodeHandlerEvent, NodeHandlerEndpoint}; pub use self::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent}; pub use self::raw_swarm::{ConnectedPoint, Peer, RawSwarm, RawSwarmEvent}; +pub use self::swarm::{Swarm, NetworkBehavior, NetworkBehaviorAction}; diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 3e2261a3..f2fb5148 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -1130,7 +1130,10 @@ where TOutEvent: Send + 'static, { let mut addrs = addrs.into_iter(); - let first = addrs.next().unwrap(); // TODO: bad + let first = match addrs.next() { + Some(f) => f, + None => return Err(self) + }; let rest = addrs.collect(); self.connect_inner(handler, first, rest) } diff --git a/core/src/nodes/swarm.rs b/core/src/nodes/swarm.rs new file mode 100644 index 00000000..7cc92b5c --- /dev/null +++ b/core/src/nodes/swarm.rs @@ -0,0 +1,304 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use muxing::StreamMuxer; +use nodes::handled_node::NodeHandler; +use nodes::node::Substream; +use nodes::protocols_handler::{NodeHandlerWrapper, ProtocolsHandler}; +use nodes::raw_swarm::{RawSwarm, RawSwarmEvent, ConnectedPoint}; +use std::{io, ops::{Deref, DerefMut}}; +use topology::Topology; +use {ConnectionUpgrade, Multiaddr, PeerId, Transport}; + +/// Contains the state of the network, plus the way it should behave. +pub struct Swarm +where TTransport: Transport, + TBehaviour: NetworkBehavior, +{ + raw_swarm: RawSwarm< + TTransport, + <::ProtocolsHandler as ProtocolsHandler>::InEvent, + <::ProtocolsHandler as ProtocolsHandler>::OutEvent, + NodeHandlerWrapper, + >, + + /// Handles which nodes to connect to and how to handle the events sent back by the protocol + /// handlers. + behaviour: TBehaviour, + + /// Holds the topology of the network. In other words all the nodes that we think exist, even + /// if we're not connected to them. + topology: TTopology, +} + +impl Deref for Swarm +where TTransport: Transport, + TBehaviour: NetworkBehavior, +{ + type Target = TBehaviour; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.behaviour + } +} + +impl DerefMut for Swarm +where TTransport: Transport, + TBehaviour: NetworkBehavior, +{ + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.behaviour + } +} + +impl Swarm +where TBehaviour: NetworkBehavior, + TMuxer: StreamMuxer + Send + Sync + 'static, + ::OutboundSubstream: Send + 'static, + ::Substream: Send + 'static, + TTransport: Transport + Clone, + TTransport::Dial: Send + 'static, + TTransport::Listener: Send + 'static, + TTransport::ListenerUpgrade: Send + 'static, + TBehaviour::ProtocolsHandler: ProtocolsHandler> + Send + 'static, + ::InEvent: Send + 'static, + ::OutEvent: Send + 'static, + ::Protocol: ConnectionUpgrade> + Send + 'static, + <::Protocol as ConnectionUpgrade>>::Future: Send + 'static, + <::Protocol as ConnectionUpgrade>>::NamesIter: Clone + Send + 'static, + <::Protocol as ConnectionUpgrade>>::UpgradeIdentifier: Send + 'static, + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + TTopology: Topology, +{ + /// Builds a new `Swarm`. + #[inline] + pub fn new(transport: TTransport, behaviour: TBehaviour, topology: TTopology) -> Self { + let raw_swarm = RawSwarm::new(transport); + Swarm { + raw_swarm, + behaviour, + topology, + } + } + + /// Returns the transport passed when building this object. + #[inline] + pub fn transport(me: &Self) -> &TTransport { + me.raw_swarm.transport() + } + + /// Starts listening on the given address. + /// + /// Returns an error if the address is not supported. + /// On success, returns an alternative version of the address. + #[inline] + pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result { + me.raw_swarm.listen_on(addr) + } + + /// Tries to dial the given address. + /// + /// Returns an error if the address is not supported. + #[inline] + pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), Multiaddr> { + let handler = me.behaviour.new_handler(); + me.raw_swarm.dial(addr, handler.into_node_handler()) + } + + /// Tries to reach the given peer using the elements in the topology. + /// + /// Has no effect if we are already connected to that peer, or if no address is known for the + /// peer. + #[inline] + pub fn dial(me: &mut Self, peer_id: PeerId) { + let addrs = me.topology.addresses_of_peer(&peer_id); + let handler = me.behaviour.new_handler().into_node_handler(); + if let Some(peer) = me.raw_swarm.peer(peer_id).as_not_connected() { + let _ = peer.connect_iter(addrs, handler); + } + } + + /// Returns an iterator that produces the list of addresses we're listening on. + #[inline] + pub fn listeners(me: &Self) -> impl Iterator { + RawSwarm::listeners(&me.raw_swarm) + } + + /// Returns the topology of the swarm. + #[inline] + pub fn topology(me: &Self) -> &TTopology { + &me.topology + } + + /// Returns the topology of the swarm. + #[inline] + pub fn topology_mut(me: &mut Self) -> &mut TTopology { + &mut me.topology + } +} + +impl Stream for Swarm +where TBehaviour: NetworkBehavior, + TMuxer: StreamMuxer + Send + Sync + 'static, + ::OutboundSubstream: Send + 'static, + ::Substream: Send + 'static, + TTransport: Transport + Clone, + TTransport::Dial: Send + 'static, + TTransport::Listener: Send + 'static, + TTransport::ListenerUpgrade: Send + 'static, + TBehaviour::ProtocolsHandler: ProtocolsHandler> + Send + 'static, + ::InEvent: Send + 'static, + ::OutEvent: Send + 'static, + ::Protocol: ConnectionUpgrade> + Send + 'static, + <::Protocol as ConnectionUpgrade>>::Future: Send + 'static, + <::Protocol as ConnectionUpgrade>>::NamesIter: Clone + Send + 'static, + <::Protocol as ConnectionUpgrade>>::UpgradeIdentifier: Send + 'static, + ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary + TTopology: Topology, +{ + type Item = TBehaviour::OutEvent; + type Error = io::Error; + + #[inline] + fn poll(&mut self) -> Poll, io::Error> { + loop { + let mut raw_swarm_not_ready = false; + + match self.raw_swarm.poll() { + Async::NotReady => raw_swarm_not_ready = true, + Async::Ready(RawSwarmEvent::NodeEvent { peer_id, event }) => { + self.behaviour.inject_node_event(peer_id, event); + }, + Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => { + self.behaviour.inject_connected(peer_id, endpoint); + }, + Async::Ready(RawSwarmEvent::NodeClosed { peer_id, endpoint }) | + Async::Ready(RawSwarmEvent::NodeError { peer_id, endpoint, .. }) => { + self.behaviour.inject_disconnected(&peer_id, endpoint); + }, + Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => { + self.behaviour.inject_disconnected(&peer_id, closed_endpoint); + self.behaviour.inject_connected(peer_id, endpoint); + }, + Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => { + let handler = self.behaviour.new_handler(); + incoming.accept(handler.into_node_handler()); + }, + Async::Ready(RawSwarmEvent::ListenerClosed { .. }) => {}, + Async::Ready(RawSwarmEvent::IncomingConnectionError { .. }) => {}, + Async::Ready(RawSwarmEvent::DialError { .. }) => {}, + Async::Ready(RawSwarmEvent::UnknownPeerDialError { .. }) => {}, + } + + match self.behaviour.poll() { + Async::NotReady if raw_swarm_not_ready => return Ok(Async::NotReady), + Async::NotReady => (), + Async::Ready(NetworkBehaviorAction::GenerateEvent(event)) => { + return Ok(Async::Ready(Some(event))); + }, + Async::Ready(NetworkBehaviorAction::DialAddress { address }) => { + let _ = Swarm::dial_addr(self, address); + }, + Async::Ready(NetworkBehaviorAction::DialPeer { peer_id }) => { + Swarm::dial(self, peer_id) + }, + Async::Ready(NetworkBehaviorAction::SendEvent { peer_id, event }) => { + if let Some(mut peer) = self.raw_swarm.peer(peer_id).as_connected() { + peer.send_event(event); + } + }, + } + } + } +} + +/// A behaviour for the network. Allows customizing the swarm. +/// +/// This trait has been designed to be composable. Multiple implementations can be combined into +/// one that handles all the behaviours at once. +pub trait NetworkBehavior { + /// Handler for all the protocols the network supports. + type ProtocolsHandler: ProtocolsHandler; + /// Event generated by the swarm. + type OutEvent; + + /// Builds a new `ProtocolsHandler`. + fn new_handler(&mut self) -> Self::ProtocolsHandler; + + /// Indicates the behaviour that we connected to the node with the given peer id through the + /// given endpoint. + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint); + + /// Indicates the behaviour that we disconnected from the node with the given peer id. The + /// endpoint is the one we used to be connected to. + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint); + + /// Indicates the behaviour that the node with the given peer id has generated an event for + /// us. + /// + /// > **Note**: This method is only called for events generated by the protocols handler. + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: ::OutEvent + ); + + /// Polls for things that swarm should do. + /// + /// This API mimics the API of the `Stream` trait. + fn poll(&mut self) -> Async::InEvent, Self::OutEvent>>; +} + +/// Action to perform. +#[derive(Debug, Clone)] +pub enum NetworkBehaviorAction { + /// Generate an event for the outside. + GenerateEvent(TOutEvent), + + // TODO: report new raw connection for usage after intercepting an address dial + + /// Instructs the swarm to dial the given multiaddress without any expectation of a peer id. + DialAddress { + /// The address to dial. + address: Multiaddr, + }, + + /// Instructs the swarm to try reach the given peer. + DialPeer { + /// The peer to try reach. + peer_id: PeerId, + }, + + /// If we're connected to the given peer, sends a message to the protocol handler. + /// + /// If we're not connected to this peer, does nothing. If necessary, the implementation of + /// `NetworkBehaviour` is supposed to track which peers we are connected to. + SendEvent { + /// The peer which to send the message to. + peer_id: PeerId, + /// Event to send to the peer. + event: TInEvent, + }, +} diff --git a/core/src/topology/mod.rs b/core/src/topology/mod.rs new file mode 100644 index 00000000..f27a6512 --- /dev/null +++ b/core/src/topology/mod.rs @@ -0,0 +1,62 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::collections::HashMap; +use {Multiaddr, PeerId}; + +/// Storage for the network topology. +pub trait Topology { + /// Adds a discovered address to the topology. + fn add_discovered_address(&mut self, peer: &PeerId, addr: Multiaddr); + /// Returns the addresses to try use to reach the given peer. + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec; +} + +/// Topology of the network stored in memory. +pub struct MemoryTopology { + list: HashMap>, +} + +impl MemoryTopology { + /// Creates an empty topology. + #[inline] + pub fn empty() -> MemoryTopology { + MemoryTopology { + list: Default::default() + } + } +} + +impl Default for MemoryTopology { + #[inline] + fn default() -> MemoryTopology { + MemoryTopology::empty() + } +} + +impl Topology for MemoryTopology { + fn add_discovered_address(&mut self, peer: &PeerId, addr: Multiaddr) { + self.list.entry(peer.clone()).or_insert_with(|| Vec::new()).push(addr); + } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + self.list.get(peer).map(|v| v.clone()).unwrap_or(Vec::new()) + } +}