Refactor the Identify protocol. (#1231)

* Refactor the Identify protocol.

Thereby updating the documentation. The low-level protocol
and handler modules are no longer exposed and some constructors
of the IdentifyEvent renamed.

* Update protocols/identify/src/protocol.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Roman Borschel
2019-09-02 11:16:52 +02:00
committed by GitHub
parent c0b379b908
commit 663c6e4e64
5 changed files with 212 additions and 313 deletions

View File

@ -18,9 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::protocol::{RemoteInfo, IdentifyProtocolConfig}; use crate::protocol::{RemoteInfo, IdentifyProtocolConfig, ReplySubstream};
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::upgrade::{DeniedUpgrade, OutboundUpgrade}; use libp2p_core::upgrade::{
InboundUpgrade,
OutboundUpgrade,
Negotiated
};
use libp2p_swarm::{ use libp2p_swarm::{
KeepAlive, KeepAlive,
SubstreamProtocol, SubstreamProtocol,
@ -28,10 +32,11 @@ use libp2p_swarm::{
ProtocolsHandlerEvent, ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr ProtocolsHandlerUpgrErr
}; };
use smallvec::SmallVec;
use std::{io, marker::PhantomData, time::Duration}; use std::{io, marker::PhantomData, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use wasm_timer::{Delay, Instant}; use wasm_timer::{Delay, Instant};
use void::{Void, unreachable}; use void::Void;
/// Delay between the moment we connect and the first time we identify. /// Delay between the moment we connect and the first time we identify.
const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500); const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500);
@ -40,67 +45,73 @@ const DELAY_TO_NEXT_ID: Duration = Duration::from_secs(5 * 60);
/// After we failed to identify the remote, try again after the given delay. /// After we failed to identify the remote, try again after the given delay.
const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60); const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60);
/// Protocol handler that identifies the remote at a regular period. /// Protocol handler for sending and receiving identification requests.
pub struct PeriodicIdHandler<TSubstream> { ///
/// Outbound requests are sent periodically. The handler performs expects
/// at least one identification request to be answered by the remote before
/// permitting the underlying connection to be closed.
pub struct IdentifyHandler<TSubstream> {
/// Configuration for the protocol. /// Configuration for the protocol.
config: IdentifyProtocolConfig, config: IdentifyProtocolConfig,
/// If `Some`, we successfully generated an `PeriodicIdHandlerEvent` and we will produce /// Pending events to yield.
/// it the next time `poll()` is invoked. events: SmallVec<[IdentifyHandlerEvent<TSubstream>; 4]>,
pending_result: Option<PeriodicIdHandlerEvent>,
/// Future that fires when we need to identify the node again. /// Future that fires when we need to identify the node again.
next_id: Delay, next_id: Delay,
/// If `true`, we have started an identification of the remote at least once in the past. /// Whether the handler should keep the connection alive.
first_id_happened: bool, keep_alive: KeepAlive,
/// Marker for strong typing. /// Marker for strong typing.
marker: PhantomData<TSubstream>, marker: PhantomData<TSubstream>,
} }
/// Event produced by the periodic identifier. /// Event produced by the `IdentifyHandler`.
#[derive(Debug)] #[derive(Debug)]
pub enum PeriodicIdHandlerEvent { pub enum IdentifyHandlerEvent<TSubstream> {
/// We obtained identification information from the remote /// We obtained identification information from the remote
Identified(RemoteInfo), Identified(RemoteInfo),
/// We received a request for identification.
Identify(ReplySubstream<Negotiated<TSubstream>>),
/// Failed to identify the remote. /// Failed to identify the remote.
IdentificationError(ProtocolsHandlerUpgrErr<io::Error>), IdentificationError(ProtocolsHandlerUpgrErr<io::Error>),
} }
impl<TSubstream> PeriodicIdHandler<TSubstream> { impl<TSubstream> IdentifyHandler<TSubstream> {
/// Builds a new `PeriodicIdHandler`. /// Creates a new `IdentifyHandler`.
#[inline]
pub fn new() -> Self { pub fn new() -> Self {
PeriodicIdHandler { IdentifyHandler {
config: IdentifyProtocolConfig, config: IdentifyProtocolConfig,
pending_result: None, events: SmallVec::new(),
next_id: Delay::new(Instant::now() + DELAY_TO_FIRST_ID), next_id: Delay::new(Instant::now() + DELAY_TO_FIRST_ID),
first_id_happened: false, keep_alive: KeepAlive::Yes,
marker: PhantomData, marker: PhantomData,
} }
} }
} }
impl<TSubstream> ProtocolsHandler for PeriodicIdHandler<TSubstream> impl<TSubstream> ProtocolsHandler for IdentifyHandler<TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
{ {
type InEvent = Void; type InEvent = Void;
type OutEvent = PeriodicIdHandlerEvent; type OutEvent = IdentifyHandlerEvent<TSubstream>;
type Error = wasm_timer::Error; type Error = wasm_timer::Error;
type Substream = TSubstream; type Substream = TSubstream;
type InboundProtocol = DeniedUpgrade; type InboundProtocol = IdentifyProtocolConfig;
type OutboundProtocol = IdentifyProtocolConfig; type OutboundProtocol = IdentifyProtocolConfig;
type OutboundOpenInfo = (); type OutboundOpenInfo = ();
#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(DeniedUpgrade) SubstreamProtocol::new(self.config.clone())
} }
fn inject_fully_negotiated_inbound(&mut self, protocol: Void) { fn inject_fully_negotiated_inbound(
unreachable(protocol) &mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output
) {
self.events.push(IdentifyHandlerEvent::Identify(protocol))
} }
fn inject_fully_negotiated_outbound( fn inject_fully_negotiated_outbound(
@ -108,42 +119,39 @@ where
protocol: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output, protocol: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
_info: Self::OutboundOpenInfo, _info: Self::OutboundOpenInfo,
) { ) {
self.pending_result = Some(PeriodicIdHandlerEvent::Identified(protocol)); self.events.push(IdentifyHandlerEvent::Identified(protocol));
self.first_id_happened = true; self.keep_alive = KeepAlive::No;
} }
#[inline]
fn inject_event(&mut self, _: Self::InEvent) {} fn inject_event(&mut self, _: Self::InEvent) {}
#[inline] fn inject_dial_upgrade_error(
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) { &mut self,
self.pending_result = Some(PeriodicIdHandlerEvent::IdentificationError(err)); _info: Self::OutboundOpenInfo,
self.first_id_happened = true; err: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error
>
) {
self.events.push(IdentifyHandlerEvent::IdentificationError(err));
self.keep_alive = KeepAlive::No;
self.next_id.reset(Instant::now() + TRY_AGAIN_ON_ERR); self.next_id.reset(Instant::now() + TRY_AGAIN_ON_ERR);
} }
#[inline]
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
if self.first_id_happened { self.keep_alive
KeepAlive::No
} else {
KeepAlive::Yes
}
} }
fn poll( fn poll(&mut self) -> Poll<
&mut self,
) -> Poll<
ProtocolsHandlerEvent< ProtocolsHandlerEvent<
Self::OutboundProtocol, Self::OutboundProtocol,
Self::OutboundOpenInfo, Self::OutboundOpenInfo,
PeriodicIdHandlerEvent, IdentifyHandlerEvent<TSubstream>,
>, >,
Self::Error, Self::Error,
> { > {
if let Some(pending_result) = self.pending_result.take() { if !self.events.is_empty() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
pending_result, self.events.remove(0),
))); )));
} }

View File

@ -18,27 +18,23 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::listen_handler::IdentifyListenHandler; use crate::handler::{IdentifyHandler, IdentifyHandlerEvent};
use crate::periodic_id_handler::{PeriodicIdHandler, PeriodicIdHandlerEvent}; use crate::protocol::{IdentifyInfo, ReplySubstream, ReplyFuture};
use crate::protocol::{IdentifyInfo, IdentifySender, IdentifySenderFuture};
use futures::prelude::*; use futures::prelude::*;
use libp2p_core::{ use libp2p_core::{
ConnectedPoint, ConnectedPoint,
Multiaddr, Multiaddr,
PeerId, PeerId,
PublicKey, PublicKey,
either::EitherOutput, upgrade::{Negotiated, UpgradeError}
upgrade::Negotiated
}; };
use libp2p_swarm::{ use libp2p_swarm::{
NetworkBehaviour, NetworkBehaviour,
NetworkBehaviourAction, NetworkBehaviourAction,
PollParameters, PollParameters,
ProtocolsHandler, ProtocolsHandler,
ProtocolsHandlerSelect,
ProtocolsHandlerUpgrErr ProtocolsHandlerUpgrErr
}; };
use smallvec::SmallVec;
use std::{collections::HashMap, collections::VecDeque, io}; use std::{collections::HashMap, collections::VecDeque, io};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use void::Void; use void::Void;
@ -54,24 +50,36 @@ pub struct Identify<TSubstream> {
local_public_key: PublicKey, local_public_key: PublicKey,
/// For each peer we're connected to, the observed address to send back to it. /// For each peer we're connected to, the observed address to send back to it.
observed_addresses: HashMap<PeerId, Multiaddr>, observed_addresses: HashMap<PeerId, Multiaddr>,
/// List of senders to answer, with the observed multiaddr. /// Pending replies to send.
to_answer: SmallVec<[(PeerId, IdentifySender<Negotiated<TSubstream>>, Multiaddr); 4]>, pending_replies: VecDeque<Reply<TSubstream>>,
/// List of futures that send back information back to remotes. /// Pending events to be emitted when polled.
futures: SmallVec<[(PeerId, IdentifySenderFuture<Negotiated<TSubstream>>); 4]>, events: VecDeque<NetworkBehaviourAction<Void, IdentifyEvent>>,
/// Events that need to be produced outside when polling.. }
events: VecDeque<NetworkBehaviourAction<EitherOutput<Void, Void>, IdentifyEvent>>,
/// A pending reply to an inbound identification request.
enum Reply<TSubstream> {
/// The reply is queued for sending.
Queued {
peer: PeerId,
io: ReplySubstream<Negotiated<TSubstream>>,
observed: Multiaddr
},
/// The reply is being sent.
Sending {
peer: PeerId,
io: ReplyFuture<Negotiated<TSubstream>>
}
} }
impl<TSubstream> Identify<TSubstream> { impl<TSubstream> Identify<TSubstream> {
/// Creates a `Identify`. /// Creates a new `Identify` network behaviour.
pub fn new(protocol_version: String, agent_version: String, local_public_key: PublicKey) -> Self { pub fn new(protocol_version: String, agent_version: String, local_public_key: PublicKey) -> Self {
Identify { Identify {
protocol_version, protocol_version,
agent_version, agent_version,
local_public_key, local_public_key,
observed_addresses: HashMap::new(), observed_addresses: HashMap::new(),
to_answer: SmallVec::new(), pending_replies: VecDeque::new(),
futures: SmallVec::new(),
events: VecDeque::new(), events: VecDeque::new(),
} }
} }
@ -81,11 +89,11 @@ impl<TSubstream> NetworkBehaviour for Identify<TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
{ {
type ProtocolsHandler = ProtocolsHandlerSelect<IdentifyListenHandler<TSubstream>, PeriodicIdHandler<TSubstream>>; type ProtocolsHandler = IdentifyHandler<TSubstream>;
type OutEvent = IdentifyEvent; type OutEvent = IdentifyEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {
IdentifyListenHandler::new().select(PeriodicIdHandler::new()) IdentifyHandler::new()
} }
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> { fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
@ -111,31 +119,35 @@ where
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent, event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) { ) {
match event { match event {
EitherOutput::Second(PeriodicIdHandlerEvent::Identified(remote)) => { IdentifyHandlerEvent::Identified(remote) => {
self.events self.events.push_back(
.push_back(NetworkBehaviourAction::GenerateEvent(IdentifyEvent::Identified { NetworkBehaviourAction::GenerateEvent(
IdentifyEvent::Received {
peer_id, peer_id,
info: remote.info, info: remote.info,
observed_addr: remote.observed_addr.clone(), observed_addr: remote.observed_addr.clone(),
})); }));
self.events self.events.push_back(
.push_back(NetworkBehaviourAction::ReportObservedAddr { NetworkBehaviourAction::ReportObservedAddr {
address: remote.observed_addr, address: remote.observed_addr,
}); });
} }
EitherOutput::First(sender) => { IdentifyHandlerEvent::Identify(sender) => {
let observed = self.observed_addresses.get(&peer_id) let observed = self.observed_addresses.get(&peer_id)
.expect("We only receive events from nodes we're connected to. We insert \ .expect("We only receive events from nodes we're connected to. We insert \
into the hashmap when we connect to a node and remove only when we \ into the hashmap when we connect to a node and remove only when we \
disconnect; QED"); disconnect; QED");
self.to_answer.push((peer_id, sender, observed.clone())); self.pending_replies.push_back(
Reply::Queued {
peer: peer_id,
io: sender,
observed: observed.clone()
});
} }
EitherOutput::Second(PeriodicIdHandlerEvent::IdentificationError(err)) => { IdentifyHandlerEvent::IdentificationError(error) => {
self.events self.events.push_back(
.push_back(NetworkBehaviourAction::GenerateEvent(IdentifyEvent::Error { NetworkBehaviourAction::GenerateEvent(
peer_id, IdentifyEvent::Error { peer_id, error }));
error: err,
}));
} }
} }
} }
@ -153,10 +165,10 @@ where
return Async::Ready(event); return Async::Ready(event);
} }
for (peer_id, sender, observed) in self.to_answer.drain() { if let Some(r) = self.pending_replies.pop_front() {
// The protocol names can be bytes, but the identify protocol except UTF-8 strings. // The protocol names can be bytes, but the identify protocol except UTF-8 strings.
// There's not much we can do to solve this conflict except strip non-UTF-8 characters. // There's not much we can do to solve this conflict except strip non-UTF-8 characters.
let protocols = params let protocols: Vec<_> = params
.supported_protocols() .supported_protocols()
.map(|p| String::from_utf8_lossy(&p).to_string()) .map(|p| String::from_utf8_lossy(&p).to_string())
.collect(); .collect();
@ -164,70 +176,80 @@ where
let mut listen_addrs: Vec<_> = params.external_addresses().collect(); let mut listen_addrs: Vec<_> = params.external_addresses().collect();
listen_addrs.extend(params.listened_addresses()); listen_addrs.extend(params.listened_addresses());
let send_back_info = IdentifyInfo { let mut sending = 0;
let to_send = self.pending_replies.len() + 1;
let mut reply = Some(r);
loop {
match reply {
Some(Reply::Queued { peer, io, observed }) => {
let info = IdentifyInfo {
public_key: self.local_public_key.clone(), public_key: self.local_public_key.clone(),
protocol_version: self.protocol_version.clone(), protocol_version: self.protocol_version.clone(),
agent_version: self.agent_version.clone(), agent_version: self.agent_version.clone(),
listen_addrs, listen_addrs: listen_addrs.clone(),
protocols, protocols: protocols.clone(),
}; };
let io = io.send(info, &observed);
let future = sender.send(send_back_info, &observed); reply = Some(Reply::Sending { peer, io });
self.futures.push((peer_id, future));
} }
Some(Reply::Sending { peer, mut io }) => {
// Removes each future one by one, and pushes them back if they're not ready. sending += 1;
for n in (0..self.futures.len()).rev() { match io.poll() {
let (peer_id, mut future) = self.futures.swap_remove(n);
match future.poll() {
Ok(Async::Ready(())) => { Ok(Async::Ready(())) => {
let event = IdentifyEvent::SendBack { let event = IdentifyEvent::Sent { peer_id: peer };
peer_id,
result: Ok(()),
};
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
}, },
Ok(Async::NotReady) => self.futures.push((peer_id, future)), Ok(Async::NotReady) => {
self.pending_replies.push_back(Reply::Sending { peer, io });
if sending == to_send {
// All remaining futures are NotReady
break
} else {
reply = self.pending_replies.pop_front();
}
}
Err(err) => { Err(err) => {
let event = IdentifyEvent::SendBack { let event = IdentifyEvent::Error {
peer_id, peer_id: peer,
result: Err(err), error: ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err))
}; };
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)); return Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
}, },
} }
} }
None => unreachable!()
}
}
}
Async::NotReady Async::NotReady
} }
} }
/// Event generated by the `Identify`. /// Event emitted by the `Identify` behaviour.
#[derive(Debug)] #[derive(Debug)]
pub enum IdentifyEvent { pub enum IdentifyEvent {
/// We obtained identification information from the remote /// Identifying information has been received from a peer.
Identified { Received {
/// Peer that has been successfully identified. /// The peer that has been identified.
peer_id: PeerId, peer_id: PeerId,
/// Information of the remote. /// The information provided by the peer.
info: IdentifyInfo, info: IdentifyInfo,
/// Address the remote observes us as. /// The address observed by the peer for the local node.
observed_addr: Multiaddr, observed_addr: Multiaddr,
}, },
/// Identifying information of the local node has been sent to a peer.
Sent {
/// The peer that the information has been sent to.
peer_id: PeerId,
},
/// Error while attempting to identify the remote. /// Error while attempting to identify the remote.
Error { Error {
/// Peer that we fail to identify. /// The peer with whom the error originated.
peer_id: PeerId, peer_id: PeerId,
/// The error that happened. /// The error that occurred.
error: ProtocolsHandlerUpgrErr<io::Error>, error: ProtocolsHandlerUpgrErr<io::Error>,
}, },
/// Finished sending back our identification information to a remote.
SendBack {
/// Peer that we sent our identification info to.
peer_id: PeerId,
/// Contains the error that potentially happened when sending back.
result: Result<(), io::Error>,
},
} }
#[cfg(test)] #[cfg(test)]
@ -246,7 +268,7 @@ mod tests {
use libp2p_secio::SecioConfig; use libp2p_secio::SecioConfig;
use libp2p_swarm::Swarm; use libp2p_swarm::Swarm;
use libp2p_mplex::MplexConfig; use libp2p_mplex::MplexConfig;
use rand::Rng; use rand::{Rng, thread_rng};
use std::{fmt, io}; use std::{fmt, io};
use tokio::runtime::current_thread; use tokio::runtime::current_thread;
@ -290,7 +312,7 @@ mod tests {
}; };
let addr: Multiaddr = { let addr: Multiaddr = {
let port = rand::thread_rng().gen_range(49152, std::u16::MAX); let port = thread_rng().gen_range(49152, std::u16::MAX);
format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap() format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap()
}; };
@ -299,13 +321,13 @@ mod tests {
// nb. Either swarm may receive the `Identified` event first, upon which // nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by // it will permit the connection to be closed, as defined by
// `PeriodicIdHandler::connection_keep_alive`. Hence the test succeeds if // `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly. // either `Identified` event arrives correctly.
current_thread::Runtime::new().unwrap().block_on( current_thread::Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> { future::poll_fn(move || -> Result<_, io::Error> {
loop { loop {
match swarm1.poll().unwrap() { match swarm1.poll().unwrap() {
Async::Ready(Some(IdentifyEvent::Identified { info, .. })) => { Async::Ready(Some(IdentifyEvent::Received { info, .. })) => {
assert_eq!(info.public_key, pubkey2); assert_eq!(info.public_key, pubkey2);
assert_eq!(info.protocol_version, "c"); assert_eq!(info.protocol_version, "c");
assert_eq!(info.agent_version, "d"); assert_eq!(info.agent_version, "d");
@ -313,13 +335,13 @@ mod tests {
assert!(info.listen_addrs.is_empty()); assert!(info.listen_addrs.is_empty());
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
}, },
Async::Ready(Some(IdentifyEvent::SendBack { result: Ok(()), .. })) => (), Async::Ready(Some(IdentifyEvent::Sent { .. })) => (),
Async::Ready(e) => panic!("{:?}", e), Async::Ready(e) => panic!("{:?}", e),
Async::NotReady => {} Async::NotReady => {}
} }
match swarm2.poll().unwrap() { match swarm2.poll().unwrap() {
Async::Ready(Some(IdentifyEvent::Identified { info, .. })) => { Async::Ready(Some(IdentifyEvent::Received { info, .. })) => {
assert_eq!(info.public_key, pubkey1); assert_eq!(info.public_key, pubkey1);
assert_eq!(info.protocol_version, "a"); assert_eq!(info.protocol_version, "a");
assert_eq!(info.agent_version, "b"); assert_eq!(info.agent_version, "b");
@ -327,7 +349,7 @@ mod tests {
assert_eq!(info.listen_addrs.len(), 1); assert_eq!(info.listen_addrs.len(), 1);
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
}, },
Async::Ready(Some(IdentifyEvent::SendBack { result: Ok(()), .. })) => (), Async::Ready(Some(IdentifyEvent::Sent { .. })) => (),
Async::Ready(e) => panic!("{:?}", e), Async::Ready(e) => panic!("{:?}", e),
Async::NotReady => break Async::NotReady => break
} }

View File

@ -18,59 +18,29 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
//! Implementation of the `/ipfs/id/1.0.0` protocol. Allows a node A to query another node B which //! Implementation of the [Identify] protocol.
//! information B knows about A. Also includes the addresses B is listening on.
//! //!
//! When two nodes connect to each other, the listening half sends a message to the dialing half, //! This implementation of the protocol periodically exchanges
//! indicating the information, and then the protocol stops. //! [`IdentifyInfo`] messages between the peers on an established connection.
//!
//! At least one identification request is sent on a newly established
//! connection, beyond which the behaviour does not keep connections alive.
//! //!
//! # Usage //! # Usage
//! //!
//! Both low-level and high-level usages are available. //! The [`Identify`] struct implements a `NetworkBehaviour` that negotiates
//! and executes the protocol on every established connection, emitting
//! [`IdentifyEvent`]s.
//! //!
//! ## High-level usage through the `IdentifyTransport` struct //! [Identify]: https://github.com/libp2p/specs/tree/master/identify
//! //! [`Identify`]: self::Identify
//! This crate provides the `IdentifyTransport` struct, which wraps around a `Transport` and an //! [`IdentifyEvent`]: self::IdentifyEvent
//! implementation of `Peerstore`. `IdentifyTransport` is itself a transport that accepts //! [`IdentifyInfo`]: self::IdentifyEvent
//! multiaddresses of the form `/p2p/...` or `/ipfs/...`.
//!
//! > **Note**: All the documentation refers to `/p2p/...`, however `/ipfs/...` is also supported.
//!
//! If you dial a multiaddr of the form `/p2p/...`, then the `IdentifyTransport` will look into
//! the `Peerstore` for any known multiaddress for this peer and try to dial them using the
//! underlying transport. If you dial any other multiaddr, then it will dial this multiaddr using
//! the underlying transport, then negotiate the *identify* protocol with the remote in order to
//! obtain its ID, then add it to the peerstore, and finally dial the same multiaddr again and
//! return the connection.
//!
//! Listening doesn't support multiaddresses of the form `/p2p/...` (because that wouldn't make
//! sense). Any address passed to `listen_on` will be passed directly to the underlying transport.
//!
//! Whenever a remote connects to us, either through listening or through `next_incoming`, the
//! `IdentifyTransport` dials back the remote, upgrades the connection to the *identify* protocol
//! in order to obtain the ID of the remote, stores the information in the peerstore, and finally
//! only returns the connection. From the exterior, the multiaddress of the remote is of the form
//! `/p2p/...`. If the remote doesn't support the *identify* protocol, then the socket is closed.
//!
//! Because of the behaviour of `IdentifyProtocol`, it is recommended to build it on top of a
//! `ConnectionReuse`.
//!
//! ## Low-level usage through the `IdentifyProtocolConfig` struct
//!
//! The `IdentifyProtocolConfig` struct implements the `ConnectionUpgrade` trait. Using it will
//! negotiate the *identify* protocol.
//!
//! The output of the upgrade is a `IdentifyOutput`. If we are the dialer, then `IdentifyOutput`
//! will contain the information sent by the remote. If we are the listener, then it will contain
//! a `IdentifySender` struct that can be used to transmit back to the remote the information about
//! it.
pub use self::identify::{Identify, IdentifyEvent}; pub use self::identify::{Identify, IdentifyEvent};
pub use self::protocol::IdentifyInfo; pub use self::protocol::IdentifyInfo;
pub mod listen_handler; mod handler;
pub mod periodic_id_handler;
pub mod protocol;
mod identify; mod identify;
mod protocol;
mod structs_proto; mod structs_proto;

View File

@ -1,112 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::protocol::{IdentifySender, IdentifyProtocolConfig};
use futures::prelude::*;
use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade, Negotiated};
use libp2p_swarm::{
KeepAlive,
SubstreamProtocol,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use smallvec::SmallVec;
use tokio_io::{AsyncRead, AsyncWrite};
use void::{Void, unreachable};
/// Protocol handler that identifies the remote at a regular period.
pub struct IdentifyListenHandler<TSubstream> {
/// Configuration for the protocol.
config: IdentifyProtocolConfig,
/// List of senders to yield to the user.
pending_result: SmallVec<[IdentifySender<Negotiated<TSubstream>>; 4]>,
}
impl<TSubstream> IdentifyListenHandler<TSubstream> {
/// Builds a new `IdentifyListenHandler`.
#[inline]
pub fn new() -> Self {
IdentifyListenHandler {
config: IdentifyProtocolConfig,
pending_result: SmallVec::new(),
}
}
}
impl<TSubstream> ProtocolsHandler for IdentifyListenHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
type InEvent = Void;
type OutEvent = IdentifySender<Negotiated<TSubstream>>;
type Error = Void;
type Substream = TSubstream;
type InboundProtocol = IdentifyProtocolConfig;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = ();
#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(self.config.clone())
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output
) {
self.pending_result.push(protocol)
}
fn inject_fully_negotiated_outbound(&mut self, protocol: Void, _: Self::OutboundOpenInfo) {
unreachable(protocol)
}
#[inline]
fn inject_event(&mut self, _: Self::InEvent) {}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error>) {}
#[inline]
fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::No
}
fn poll(
&mut self,
) -> Poll<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
>,
Self::Error,
> {
if !self.pending_result.is_empty() {
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
self.pending_result.remove(0),
)));
}
Ok(Async::NotReady)
}
}

View File

@ -33,12 +33,12 @@ use protobuf::parse_from_bytes as protobuf_parse_from_bytes;
use protobuf::RepeatedField; use protobuf::RepeatedField;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter; use std::{fmt, iter};
use tokio_codec::Framed; use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec; use unsigned_varint::codec;
/// Configuration for an upgrade to the identity protocol. /// Configuration for an upgrade to the `Identify` protocol.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct IdentifyProtocolConfig; pub struct IdentifyProtocolConfig;
@ -52,15 +52,26 @@ pub struct RemoteInfo {
_priv: () _priv: ()
} }
/// Object used to send back information to the client. /// The substream on which a reply is expected to be sent.
pub struct IdentifySender<T> { pub struct ReplySubstream<T> {
inner: Framed<T, codec::UviBytes<Vec<u8>>>, inner: Framed<T, codec::UviBytes<Vec<u8>>>,
} }
impl<T> IdentifySender<T> where T: AsyncWrite { impl<T> fmt::Debug for ReplySubstream<T> {
/// Sends back information to the client. Returns a future that is signalled whenever the fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
/// info have been sent. f.debug_tuple("ReplySubstream").finish()
pub fn send(self, info: IdentifyInfo, observed_addr: &Multiaddr) -> IdentifySenderFuture<T> { }
}
impl<T> ReplySubstream<T>
where
T: AsyncWrite
{
/// Sends back the requested information on the substream.
///
/// Consumes the substream, returning a `ReplyFuture` that resolves
/// when the reply has been sent on the underlying connection.
pub fn send(self, info: IdentifyInfo, observed_addr: &Multiaddr) -> ReplyFuture<T> {
debug!("Sending identify info to client"); debug!("Sending identify info to client");
trace!("Sending: {:?}", info); trace!("Sending: {:?}", info);
@ -83,7 +94,7 @@ impl<T> IdentifySender<T> where T: AsyncWrite {
.write_to_bytes() .write_to_bytes()
.expect("writing protobuf failed; should never happen"); .expect("writing protobuf failed; should never happen");
IdentifySenderFuture { ReplyFuture {
inner: self.inner, inner: self.inner,
item: Some(bytes), item: Some(bytes),
} }
@ -96,14 +107,14 @@ impl<T> IdentifySender<T> where T: AsyncWrite {
// means that we would require `T: AsyncWrite` in this struct definition. This requirement // means that we would require `T: AsyncWrite` in this struct definition. This requirement
// would then propagate everywhere. // would then propagate everywhere.
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub struct IdentifySenderFuture<T> { pub struct ReplyFuture<T> {
/// The Sink where to send the data. /// The Sink where to send the data.
inner: Framed<T, codec::UviBytes<Vec<u8>>>, inner: Framed<T, codec::UviBytes<Vec<u8>>>,
/// Bytes to send, or `None` if we've already sent them. /// Bytes to send, or `None` if we've already sent them.
item: Option<Vec<u8>>, item: Option<Vec<u8>>,
} }
impl<T> Future for IdentifySenderFuture<T> impl<T> Future for ReplyFuture<T>
where T: AsyncWrite where T: AsyncWrite
{ {
type Item = (); type Item = ();
@ -123,19 +134,20 @@ where T: AsyncWrite
} }
} }
/// Information sent from the listener to the dialer. /// Information of a peer sent in `Identify` protocol responses.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct IdentifyInfo { pub struct IdentifyInfo {
/// Public key of the node. /// The public key underlying the peer's `PeerId`.
pub public_key: PublicKey, pub public_key: PublicKey,
/// Version of the "global" protocol, e.g. `ipfs/1.0.0` or `polkadot/1.0.0`. /// Version of the protocol family used by the peer, e.g. `ipfs/1.0.0`
/// or `polkadot/1.0.0`.
pub protocol_version: String, pub protocol_version: String,
/// Name and version of the client. Can be thought as similar to the `User-Agent` header /// Name and version of the peer, similar to the `User-Agent` header in
/// of HTTP. /// the HTTP protocol.
pub agent_version: String, pub agent_version: String,
/// Addresses that the node is listening on. /// The addresses that the peer is listening on.
pub listen_addrs: Vec<Multiaddr>, pub listen_addrs: Vec<Multiaddr>,
/// Protocols supported by the node, e.g. `/ipfs/ping/1.0.0`. /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
pub protocols: Vec<String>, pub protocols: Vec<String>,
} }
@ -152,15 +164,14 @@ impl<C> InboundUpgrade<C> for IdentifyProtocolConfig
where where
C: AsyncRead + AsyncWrite, C: AsyncRead + AsyncWrite,
{ {
type Output = IdentifySender<Negotiated<C>>; type Output = ReplySubstream<Negotiated<C>>;
type Error = IoError; type Error = IoError;
type Future = FutureResult<Self::Output, IoError>; type Future = FutureResult<Self::Output, IoError>;
fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future { fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
trace!("Upgrading inbound connection"); trace!("Upgrading inbound connection");
let socket = Framed::new(socket, codec::UviBytes::default()); let inner = Framed::new(socket, codec::UviBytes::default());
let sender = IdentifySender { inner: socket }; future::ok(ReplySubstream { inner })
future::ok(sender)
} }
} }