diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs new file mode 100644 index 00000000..48b7802e --- /dev/null +++ b/protocols/identify/src/identify.rs @@ -0,0 +1,189 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::listen_handler::IdentifyListenHandler; +use crate::periodic_id_handler::{PeriodicIdentification, PeriodicIdentificationEvent}; +use crate::protocol::{IdentifyInfo, IdentifySender, IdentifySenderFuture}; +use futures::prelude::*; +use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerSelect}; +use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p_core::{Multiaddr, PeerId, either::EitherOutput}; +use smallvec::SmallVec; +use std::{collections::HashMap, collections::VecDeque, io}; +use tokio_io::{AsyncRead, AsyncWrite}; +use void::Void; + +/// Network behaviour that automatically identifies nodes periodically, returns information +/// about them, and answers identify queries from other nodes. +pub struct Identify { + /// Protocol version to send back to remotes. + protocol_version: String, + /// Agent version to send back to remotes. + agent_version: String, + /// For each peer we're connected to, the observed address to send back to it. + observed_addresses: HashMap, + /// List of senders to answer, with the observed multiaddr. + to_answer: SmallVec<[(IdentifySender, Multiaddr); 4]>, + /// List of futures that send back information back to remotes. + futures: SmallVec<[IdentifySenderFuture; 4]>, + /// Events that need to be produced outside when polling.. + events: VecDeque, IdentifyEvent>>, +} + +impl Identify { + /// Creates a `Identify`. + pub fn new(protocol_version: String, agent_version: String) -> Self { + Identify { + protocol_version, + agent_version, + observed_addresses: HashMap::new(), + to_answer: SmallVec::new(), + futures: SmallVec::new(), + events: VecDeque::new(), + } + } +} + +impl NetworkBehaviour for Identify +where + TSubstream: AsyncRead + AsyncWrite, +{ + type ProtocolsHandler = ProtocolsHandlerSelect, PeriodicIdentification>; + type OutEvent = IdentifyEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + IdentifyListenHandler::new().select(PeriodicIdentification::new()) + } + + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + let observed = match endpoint { + ConnectedPoint::Dialer { address } => address, + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + }; + + self.observed_addresses.insert(peer_id, observed); + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { + self.observed_addresses.remove(peer_id); + } + + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: ::OutEvent, + ) { + match event { + EitherOutput::Second(PeriodicIdentificationEvent::Identified(remote)) => { + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(IdentifyEvent::Identified { + peer_id, + info: remote.info, + observed_addr: remote.observed_addr.clone(), + })); + self.events + .push_back(NetworkBehaviourAction::ReportObservedAddr { + address: remote.observed_addr, + }); + } + EitherOutput::First(sender) => { + let observed = self.observed_addresses.get(&peer_id) + .expect("We only receive events from nodes we're connected to ; we insert \ + into the hashmap when we connect to a node and remove only when we \ + disconnect; QED"); + self.to_answer.push((sender, observed.clone())); + } + EitherOutput::Second(PeriodicIdentificationEvent::IdentificationError(err)) => { + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(IdentifyEvent::Error { + peer_id, + error: err, + })); + } + } + } + + fn poll( + &mut self, + params: &mut PollParameters, + ) -> Async< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + if let Some(event) = self.events.pop_front() { + return Async::Ready(event); + } + + for (sender, observed) in self.to_answer.drain() { + // The protocol names can be bytes, but the identify protocol except UTF-8 strings. + // There's not much we can do to solve this conflict except strip non-UTF-8 characters. + let protocols = params + .supported_protocols() + .map(|p| String::from_utf8_lossy(p).to_string()) + .collect(); + + let send_back_info = IdentifyInfo { + public_key: params.local_public_key().clone(), + protocol_version: self.protocol_version.clone(), + agent_version: self.agent_version.clone(), + listen_addrs: params.listened_addresses().cloned().collect(), + protocols, + }; + + let future = sender.send(send_back_info, &observed); + self.futures.push(future); + } + + // Removes each future one by one, and pushes them back if they're not ready. + for n in (0..self.futures.len()).rev() { + let mut future = self.futures.swap_remove(n); + match future.poll() { + Ok(Async::Ready(())) => {} + Ok(Async::NotReady) => self.futures.push(future), + Err(_) => {}, + } + } + + Async::NotReady + } +} + +/// Event generated by the `Identify`. +#[derive(Debug)] +pub enum IdentifyEvent { + /// We obtained identification information from the remote + Identified { + /// Peer that has been successfully identified. + peer_id: PeerId, + /// Information of the remote. + info: IdentifyInfo, + /// Address the remote observes us as. + observed_addr: Multiaddr, + }, + /// Error while attempting to identify the remote. + Error { + /// Peer that we fail to identify. + peer_id: PeerId, + /// The error that happened. + error: io::Error, + }, +} diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index f2fdfd84..3dc50a64 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -81,6 +81,7 @@ extern crate tokio_timer; extern crate unsigned_varint; extern crate void; +pub use self::identify::{Identify, IdentifyEvent}; pub use self::id_transport::IdentifyTransport; pub use self::listen_layer::IdentifyListen; pub use self::periodic_id_layer::{PeriodicIdentify, PeriodicIdentifyEvent}; @@ -89,6 +90,7 @@ pub mod listen_handler; pub mod periodic_id_handler; pub mod protocol; +mod identify; mod id_transport; mod listen_layer; mod periodic_id_layer; diff --git a/protocols/identify/src/periodic_id_layer.rs b/protocols/identify/src/periodic_id_layer.rs index a7f20fff..07f91625 100644 --- a/protocols/identify/src/periodic_id_layer.rs +++ b/protocols/identify/src/periodic_id_layer.rs @@ -25,12 +25,13 @@ use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourActio use libp2p_core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; use std::{collections::VecDeque, marker::PhantomData}; use tokio_io::{AsyncRead, AsyncWrite}; +use void::Void; /// Network behaviour that automatically identifies nodes periodically, and returns information /// about them. pub struct PeriodicIdentify { /// Events that need to be produced outside when polling.. - events: VecDeque, + events: VecDeque>, /// Marker to pin the generics. marker: PhantomData, } @@ -68,11 +69,15 @@ where match event { PeriodicIdentificationEvent::Identified(remote) => { self.events - .push_back(PeriodicIdentifyEvent::Identified { + .push_back(NetworkBehaviourAction::ReportObservedAddr { + address: remote.observed_addr.clone(), + }); + self.events + .push_back(NetworkBehaviourAction::GenerateEvent(PeriodicIdentifyEvent::Identified { peer_id: peer_id, info: remote.info, observed_addr: remote.observed_addr, - }); + })); } _ => (), // TODO: exhaustive pattern } @@ -88,7 +93,7 @@ where >, > { if let Some(event) = self.events.pop_front() { - return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Async::Ready(event); } Async::NotReady