diff --git a/core/src/protocols_handler/dummy.rs b/core/src/protocols_handler/dummy.rs index 5b354b59..777e6519 100644 --- a/core/src/protocols_handler/dummy.rs +++ b/core/src/protocols_handler/dummy.rs @@ -19,7 +19,13 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{ + KeepAlive, + SubstreamProtocol, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr + }, upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -58,8 +64,8 @@ where type OutboundOpenInfo = Void; #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { - DeniedUpgrade + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade) } #[inline] diff --git a/core/src/protocols_handler/map_in.rs b/core/src/protocols_handler/map_in.rs index d8510a8d..6c60719a 100644 --- a/core/src/protocols_handler/map_in.rs +++ b/core/src/protocols_handler/map_in.rs @@ -19,7 +19,13 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{ + KeepAlive, + SubstreamProtocol, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr + }, upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -61,7 +67,7 @@ where type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { self.inner.listen_protocol() } diff --git a/core/src/protocols_handler/map_out.rs b/core/src/protocols_handler/map_out.rs index 89f1b07b..0d54006c 100644 --- a/core/src/protocols_handler/map_out.rs +++ b/core/src/protocols_handler/map_out.rs @@ -19,7 +19,13 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{ + KeepAlive, + SubstreamProtocol, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr + }, upgrade::{ InboundUpgrade, OutboundUpgrade, @@ -58,7 +64,7 @@ where type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { self.inner.listen_protocol() } @@ -104,8 +110,8 @@ where Ok(self.inner.poll()?.map(|ev| { match ev { ProtocolsHandlerEvent::Custom(ev) => ProtocolsHandlerEvent::Custom((self.map)(ev)), - ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { - ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } } } })) diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index 4151d040..dee3c367 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -18,18 +18,22 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Once we are connected to a node, a *protocols handler* handles one or more specific protocols -//! on this connection. +//! Once a connection to a remote peer is established, a `ProtocolsHandler` negotiates +//! and handles one or more specific protocols on the connection. //! -//! This includes: how to handle incoming substreams, which protocols are supported, when to open -//! a new outbound substream, and so on. +//! Protocols are negotiated and used on individual substreams of the connection. +//! Thus a `ProtocolsHandler` defines the inbound and outbound upgrades to apply +//! when creating a new inbound or outbound substream, respectively, and is notified +//! by a `Swarm` when these upgrades have been successfully applied, including the +//! final output of the upgrade. A `ProtocolsHandler` can then continue communicating +//! with the peer over the substream using the negotiated protocol(s). //! -//! Each implementation of the `ProtocolsHandler` trait handles one or more specific protocols. -//! Two `ProtocolsHandler`s can be combined together with the `select()` method in order to build -//! a `ProtocolsHandler` that combines both. This can be repeated multiple times in order to create -//! a handler that handles all the protocols that you wish. +//! Two `ProtocolsHandler`s can be composed with [`ProtocolsHandler::select()`] +//! in order to build a new handler supporting the combined set of protocols, +//! with methods being dispatched to the appropriate handler according to the +//! used protocol(s) determined by the associated types of the handlers. //! -//! > **Note**: A `ProtocolsHandler` handles one or more protocols in relation to a specific +//! > **Note**: A `ProtocolsHandler` handles one or more protocols in the context of a single //! > connection with a remote. In order to handle a protocol that requires knowledge of //! > the network as a whole, see the `NetworkBehaviour` trait. @@ -57,73 +61,73 @@ mod node_handler; mod one_shot; mod select; -/// Handler for a set of protocols for a specific connection with a remote. +/// A handler for a set of protocols used on a connection with a remote. /// -/// This trait should be implemented on a struct that holds the state for a specific protocol -/// behaviour with a specific remote. +/// This trait should be implemented for a type that maintains the state for +/// the execution of a specific protocol with a remote. /// /// # Handling a protocol /// -/// Communication with a remote over a set of protocols opened in two different ways: +/// Communication with a remote over a set of protocols is initiated in one of two ways: /// -/// - Dialing, which is a voluntary process. In order to do so, make `poll()` return an -/// `OutboundSubstreamRequest` variant containing the connection upgrade to use to start using a -/// protocol. -/// - Listening, which is used to determine which protocols are supported when the remote wants -/// to open a substream. The `listen_protocol()` method should return the upgrades supported when -/// listening. +/// 1. Dialing by initiating a new outbound substream. In order to do so, +/// [`ProtocolsHandler::poll()`] must return an [`OutboundSubstreamRequest`], providing an +/// instance of [`ProtocolsHandler::OutboundUpgrade`] that is used to negotiate the +/// protocol(s). Upon success, [`ProtocolsHandler::inject_fully_negotiated_outbound`] +/// is called with the final output of the upgrade. /// -/// The upgrade when dialing and the upgrade when listening have to be of the same type, but you -/// are free to return for example an `OrUpgrade` enum, or an enum of your own, containing the -/// upgrade you want depending on the situation. +/// 2. Listening by accepting a new inbound substream. When a new inbound substream +/// is created on a connection, [`ProtocolsHandler::listen_protocol`] is called +/// to obtain an instance of [`ProtocolsHandler::InboundUpgrade`] that is used to +/// negotiate the protocol(s). Upon success, +/// [`ProtocolsHandler::inject_fully_negotiated_inbound`] is called with the final +/// output of the upgrade. /// -/// # Shutting down +/// # Connection Keep-Alive +/// +/// A `ProtocolsHandler` can influence the lifetime of the underlying connection +/// through [`ProtocolsHandler::connection_keep_alive`]. That is, the protocol +/// implemented by the handler can include conditions for terminating the connection. +/// The lifetime of successfully negotiated substreams is fully controlled by the handler. /// /// Implementors of this trait should keep in mind that the connection can be closed at any time. -/// When a connection is closed (either by us or by the remote) `shutdown()` is called and the -/// handler continues to be processed until it produces `ProtocolsHandlerEvent::Shutdown`. Only -/// then the handler is destroyed. -/// -/// This makes it possible for the handler to finish delivering events even after knowing that it -/// is shutting down. -/// -/// Implementors of this trait should keep in mind that when `shutdown()` is called, the connection -/// might already be closed or unresponsive. They should therefore not rely on being able to -/// deliver messages. -/// +/// When a connection is closed gracefully, the substreams used by the handler may still +/// continue reading data until the remote closes its side of the connection. pub trait ProtocolsHandler { /// Custom event that can be received from the outside. type InEvent; /// Custom event that can be produced by the handler and that will be returned to the outside. type OutEvent; - /// Error that can happen when polling. + /// The type of errors returned by [`ProtocolsHandler::poll`]. type Error: error::Error; - /// The type of the substream that contains the raw data. + /// The type of substreams on which the protocol(s) are negotiated. type Substream: AsyncRead + AsyncWrite; - /// The upgrade for the protocol or protocols handled by this handler. + /// The inbound upgrade for the protocol(s) used by the handler. type InboundProtocol: InboundUpgrade; - /// The upgrade for the protocol or protocols handled by this handler. + /// The outbound upgrade for the protocol(s) used by the handler. type OutboundProtocol: OutboundUpgrade; - /// Information about a substream. Can be sent to the handler through a `NodeHandlerEndpoint`, - /// and will be passed back in `inject_substream` or `inject_outbound_closed`. + /// The type of additional information passed to an `OutboundSubstreamRequest`. type OutboundOpenInfo; - /// Produces a `ConnectionUpgrade` for the protocol or protocols to accept when listening. + /// The [`InboundUpgrade`] to apply on inbound substreams to negotiate the + /// desired protocols. /// - /// > **Note**: You should always accept all the protocols you support, even if in a specific - /// > context you wouldn't accept one in particular (eg. only allow one substream at - /// > a time for a given protocol). The reason is that remotes are allowed to put the - /// > list of supported protocols in a cache in order to avoid spurious queries. - fn listen_protocol(&self) -> Self::InboundProtocol; + /// > **Note**: The returned `InboundUpgrade` should always accept all the generally + /// > supported protocols, even if in a specific context a particular one is + /// > not supported, (eg. when only allowing one substream at a time for a protocol). + /// > This allows a remote to put the list of supported protocols in a cache. + fn listen_protocol(&self) -> SubstreamProtocol; - /// Injects a fully-negotiated substream in the handler. - /// - /// This method is called when a substream has been successfully opened and negotiated. + /// Injects the output of a successful upgrade on a new inbound substream. fn inject_fully_negotiated_inbound( &mut self, protocol: >::Output ); + /// Injects the output of a successful upgrade on a new outbound substream. + /// + /// The second argument is the information that was previously passed to + /// [`ProtocolsHandlerEvent::OutboundSubstreamRequest`]. fn inject_fully_negotiated_outbound( &mut self, protocol: >::Output, @@ -134,28 +138,43 @@ pub trait ProtocolsHandler { fn inject_event(&mut self, event: Self::InEvent); /// Indicates to the handler that upgrading a substream to the given protocol has failed. - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>); + fn inject_dial_upgrade_error( + &mut self, + info: Self::OutboundOpenInfo, + error: ProtocolsHandlerUpgrErr< + >::Error + > + ); /// Returns until when the connection should be kept alive. /// - /// If returns `Until`, that indicates that this connection may be closed and this handler - /// destroyed after the returned `Instant` has elapsed if they think that they will no longer - /// need the connection in the future. Returning `Forever` is equivalent to "infinite". - /// Returning `Now` is equivalent to `Until(Instant::now())`. + /// This method is called by the `Swarm` after each invocation of + /// [`ProtocolsHandler::poll`] to determine if the connection and the associated + /// `ProtocolsHandler`s should be kept alive and if so, for how long. /// - /// On the other hand, the return value is only an indication and doesn't mean that the user - /// will not close the connection. + /// Returning [`KeepAlive::Now`] indicates that the connection should be + /// closed and this handler destroyed immediately. /// - /// When multiple `ProtocolsHandler` are combined together, the largest `KeepAlive` should be - /// used. + /// Returning [`KeepAlive::Until`] indicates that the connection may be closed + /// and this handler destroyed after the specified `Instant`. + /// + /// Returning [`KeepAlive::Forever`] indicates that the connection should + /// be kept alive until the next call to this method. + /// + /// > **Note**: The connection is always closed and the handler destroyed + /// > when [`ProtocolsHandler::poll`] returns an error. Furthermore, the + /// > connection may be closed for reasons outside of the control + /// > of the handler. /// - /// The result of this method should be checked every time `poll()` is invoked. fn connection_keep_alive(&self) -> KeepAlive; /// Should behave like `Stream::poll()`. /// /// Returning an error will close the connection to the remote. - fn poll(&mut self) -> Poll, Self::Error>; + fn poll(&mut self) -> Poll< + ProtocolsHandlerEvent, + Self::Error + >; /// Adds a closure that turns the input event into something else. #[inline] @@ -177,8 +196,12 @@ pub trait ProtocolsHandler { MapOutEvent::new(self, map) } - /// Builds an implementation of `ProtocolsHandler` that handles both this protocol and the - /// other one together. + /// Creates a new `ProtocolsHandler` that selects either this handler or + /// `other` by delegating methods calls appropriately. + /// + /// > **Note**: The largest `KeepAlive` returned by the two handlers takes precedence, + /// > i.e. is returned from [`ProtocolsHandler::connection_keep_alive`] by the returned + /// > handler. #[inline] fn select(self, other: TProto2) -> ProtocolsHandlerSelect where @@ -187,8 +210,10 @@ pub trait ProtocolsHandler { ProtocolsHandlerSelect::new(self, other) } - /// Creates a builder that will allow creating a `NodeHandler` that handles this protocol + /// Creates a builder that allows creating a `NodeHandler` that handles this protocol /// exclusively. + /// + /// > **Note**: This method should not be redefined in a custom `ProtocolsHandler`. #[inline] fn into_node_handler_builder(self) -> NodeHandlerWrapperBuilder where @@ -211,13 +236,75 @@ pub trait ProtocolsHandler { } } +/// Configuration of inbound or outbound substream protocol(s) +/// for a [`ProtocolsHandler`]. +/// +/// The inbound substream protocol(s) are defined by [`ProtocolsHandler::listen_protocol`] +/// and the outbound substream protocol(s) by [`ProtocolsHandlerEvent::OutboundSubstreamRequest`]. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct SubstreamProtocol { + upgrade: TUpgrade, + timeout: Duration, +} + +impl SubstreamProtocol { + /// Create a new `ListenProtocol` from the given upgrade. + /// + /// The default timeout for applying the given upgrade on a substream is + /// 10 seconds. + pub fn new(upgrade: TUpgrade) -> SubstreamProtocol { + SubstreamProtocol { + upgrade, + timeout: Duration::from_secs(10), + } + } + + /// Maps a function over the protocol upgrade. + pub fn map_upgrade(self, f: F) -> SubstreamProtocol + where + F: FnOnce(TUpgrade) -> U, + { + SubstreamProtocol { + upgrade: f(self.upgrade), + timeout: self.timeout, + } + } + + /// Sets a new timeout for the protocol upgrade. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + /// Borrows the contained protocol upgrade. + pub fn upgrade(&self) -> &TUpgrade { + &self.upgrade + } + + /// Borrows the timeout for the protocol upgrade. + pub fn timeout(&self) -> &Duration { + &self.timeout + } + + /// Converts the substream protocol configuration into the contained upgrade. + pub fn into_upgrade(self) -> TUpgrade { + self.upgrade + } +} + +impl From for SubstreamProtocol { + fn from(upgrade: TUpgrade) -> SubstreamProtocol { + SubstreamProtocol::new(upgrade) + } +} + /// Event produced by a handler. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ProtocolsHandlerEvent { /// Request a new outbound substream to be opened with the remote. OutboundSubstreamRequest { - /// The upgrade to apply on the substream. - upgrade: TConnectionUpgrade, + /// The protocol(s) to apply on the substream. + protocol: SubstreamProtocol, /// User-defined information, passed back when the substream is open. info: TOutboundOpenInfo, }, @@ -230,7 +317,8 @@ pub enum ProtocolsHandlerEvent { impl ProtocolsHandlerEvent { - /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a `TOutboundOpenInfo` to something else. + /// If this is an `OutboundSubstreamRequest`, maps the `info` member from a + /// `TOutboundOpenInfo` to something else. #[inline] pub fn map_outbound_open_info( self, @@ -240,9 +328,9 @@ impl F: FnOnce(TOutboundOpenInfo) -> I, { match self { - ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade, + protocol, info: map(info), } } @@ -250,7 +338,8 @@ impl } } - /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`) to something else. + /// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`) + /// to something else. #[inline] pub fn map_protocol( self, @@ -260,9 +349,9 @@ impl F: FnOnce(TConnectionUpgrade) -> I, { match self { - ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade: map(upgrade), + protocol: protocol.map_upgrade(map), info, } } @@ -280,8 +369,8 @@ impl F: FnOnce(TCustom) -> I, { match self { - ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { - ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } => { + ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info } } ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(map(val)), } @@ -356,7 +445,7 @@ pub trait IntoProtocolsHandler { where Self: Sized, { - NodeHandlerWrapperBuilder::new(self, Duration::from_secs(10), Duration::from_secs(10)) + NodeHandlerWrapperBuilder::new(self) } } diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index 9519c792..855c8c15 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -31,17 +31,13 @@ use crate::{ } }; use futures::prelude::*; -use std::{error, fmt, time::Duration, time::Instant}; +use std::{error, fmt, time::{Duration, Instant}}; use tokio_timer::{Delay, Timeout}; /// Prototype for a `NodeHandlerWrapper`. pub struct NodeHandlerWrapperBuilder { /// The underlying handler. handler: TIntoProtoHandler, - /// Timeout for incoming substreams negotiation. - in_timeout: Duration, - /// Timeout for outgoing substreams negotiation. - out_timeout: Duration, } impl NodeHandlerWrapperBuilder @@ -50,28 +46,12 @@ where { /// Builds a `NodeHandlerWrapperBuilder`. #[inline] - pub(crate) fn new(handler: TIntoProtoHandler, in_timeout: Duration, out_timeout: Duration) -> Self { + pub(crate) fn new(handler: TIntoProtoHandler) -> Self { NodeHandlerWrapperBuilder { handler, - in_timeout, - out_timeout, } } - /// Sets the timeout to use when negotiating a protocol on an ingoing substream. - #[inline] - pub fn with_in_negotiation_timeout(mut self, timeout: Duration) -> Self { - self.in_timeout = timeout; - self - } - - /// Sets the timeout to use when negotiating a protocol on an outgoing substream. - #[inline] - pub fn with_out_negotiation_timeout(mut self, timeout: Duration) -> Self { - self.out_timeout = timeout; - self - } - /// Builds the `NodeHandlerWrapper`. #[deprecated(note = "Pass the NodeHandlerWrapperBuilder directly")] #[inline] @@ -82,8 +62,6 @@ where handler: self.handler, negotiating_in: Vec::new(), negotiating_out: Vec::new(), - in_timeout: self.in_timeout, - out_timeout: self.out_timeout, queued_dial_upgrades: Vec::new(), unique_dial_upgrade_id: 0, connection_shutdown: None, @@ -105,8 +83,6 @@ where handler: self.handler.into_handler(remote_peer_id), negotiating_in: Vec::new(), negotiating_out: Vec::new(), - in_timeout: self.in_timeout, - out_timeout: self.out_timeout, queued_dial_upgrades: Vec::new(), unique_dial_upgrade_id: 0, connection_shutdown: None, @@ -131,10 +107,6 @@ where TProtoHandler::OutboundOpenInfo, Timeout>, )>, - /// Timeout for incoming substreams negotiation. - in_timeout: Duration, - /// Timeout for outgoing substreams negotiation. - out_timeout: Duration, /// For each outbound substream request, how to upgrade it. The first element of the tuple /// is the unique identifier (see `unique_dial_upgrade_id`). queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>, @@ -197,7 +169,7 @@ where type Substream = TProtoHandler::Substream; // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). - type OutboundOpenInfo = (u64, TProtoHandler::OutboundOpenInfo); + type OutboundOpenInfo = (u64, TProtoHandler::OutboundOpenInfo, Duration); fn inject_substream( &mut self, @@ -207,11 +179,12 @@ where match endpoint { NodeHandlerEndpoint::Listener => { let protocol = self.handler.listen_protocol(); - let upgrade = upgrade::apply_inbound(substream, protocol); - let with_timeout = Timeout::new(upgrade, self.in_timeout); + let timeout = protocol.timeout().clone(); + let upgrade = upgrade::apply_inbound(substream, protocol.into_upgrade()); + let with_timeout = Timeout::new(upgrade, timeout); self.negotiating_in.push(with_timeout); } - NodeHandlerEndpoint::Dialer((upgrade_id, user_data)) => { + NodeHandlerEndpoint::Dialer((upgrade_id, user_data, timeout)) => { let pos = match self .queued_dial_upgrades .iter() @@ -226,7 +199,7 @@ where let (_, proto_upgrade) = self.queued_dial_upgrades.remove(pos); let upgrade = upgrade::apply_outbound(substream, proto_upgrade); - let with_timeout = Timeout::new(upgrade, self.out_timeout); + let with_timeout = Timeout::new(upgrade, timeout); self.negotiating_out.push((user_data, with_timeout)); } } @@ -270,7 +243,7 @@ where } else { debug_assert!(err.is_inner()); let err = err.into_inner().expect("Timeout error is one of {elapsed, \ - timer, inner}; is_inner and is_elapsed are both false; error is \ + timer, inner}; is_elapsed and is_timer are both false; error is \ inner; QED"); ProtocolsHandlerUpgrErr::Upgrade(err) }; @@ -280,8 +253,8 @@ where } } - // Poll the handler at the end so that we see the consequences of the method calls on - // `self.handler`. + // Poll the handler at the end so that we see the consequences of the method + // calls on `self.handler`. let poll_result = self.handler.poll()?; self.connection_shutdown = match self.handler.connection_keep_alive() { @@ -295,14 +268,15 @@ where return Ok(Async::Ready(NodeHandlerEvent::Custom(event))); } Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade, + protocol, info, }) => { let id = self.unique_dial_upgrade_id; + let timeout = protocol.timeout().clone(); self.unique_dial_upgrade_id += 1; - self.queued_dial_upgrades.push((id, upgrade)); + self.queued_dial_upgrades.push((id, protocol.into_upgrade())); return Ok(Async::Ready( - NodeHandlerEvent::OutboundSubstreamRequest((id, info)), + NodeHandlerEvent::OutboundSubstreamRequest((id, info, timeout)), )); } Async::NotReady => (), diff --git a/core/src/protocols_handler/one_shot.rs b/core/src/protocols_handler/one_shot.rs index 3f8ec501..b88cb197 100644 --- a/core/src/protocols_handler/one_shot.rs +++ b/core/src/protocols_handler/one_shot.rs @@ -20,6 +20,7 @@ use crate::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, + SubstreamProtocol }; use crate::upgrade::{InboundUpgrade, OutboundUpgrade}; use futures::prelude::*; @@ -36,7 +37,7 @@ where TOutProto: OutboundUpgrade, { /// The upgrade for inbound substreams. - listen_protocol: TInProto, + listen_protocol: SubstreamProtocol, /// If `Some`, something bad happened and we should shut down the handler with an error. pending_error: Option>::Error>>, @@ -63,7 +64,10 @@ where { /// Creates a `OneShotHandler`. #[inline] - pub fn new(listen_protocol: TInProto, inactive_timeout: Duration) -> Self { + pub fn new( + listen_protocol: SubstreamProtocol, + inactive_timeout: Duration + ) -> Self { OneShotHandler { listen_protocol, pending_error: None, @@ -88,7 +92,7 @@ where /// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > substreams, not the ones already being negotiated. #[inline] - pub fn listen_protocol_ref(&self) -> &TInProto { + pub fn listen_protocol_ref(&self) -> &SubstreamProtocol { &self.listen_protocol } @@ -97,7 +101,7 @@ where /// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > substreams, not the ones already being negotiated. #[inline] - pub fn listen_protocol_mut(&mut self) -> &mut TInProto { + pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol { &mut self.listen_protocol } @@ -113,11 +117,11 @@ impl Default for OneShotHandler where TOutProto: OutboundUpgrade, - TInProto: Default, + TInProto: InboundUpgrade + Default, { #[inline] fn default() -> Self { - OneShotHandler::new(Default::default(), Duration::from_secs(10)) + OneShotHandler::new(SubstreamProtocol::new(Default::default()), Duration::from_secs(10)) } } @@ -125,11 +129,12 @@ impl ProtocolsHandler for OneShotHandler where TSubstream: AsyncRead + AsyncWrite, - TInProto: InboundUpgrade + Clone, + TInProto: InboundUpgrade, TOutProto: OutboundUpgrade, TInProto::Output: Into, TOutProto::Output: Into, TOutProto::Error: error::Error + 'static, + SubstreamProtocol: Clone, { type InEvent = TOutProto; type OutEvent = TOutEvent; @@ -142,7 +147,7 @@ where type OutboundOpenInfo = (); #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { self.listen_protocol.clone() } @@ -220,7 +225,7 @@ where self.dial_negotiated += 1; return Ok(Async::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade: self.dial_queue.remove(0), + protocol: SubstreamProtocol::new(self.dial_queue.remove(0)), info: (), }, )); diff --git a/core/src/protocols_handler/select.rs b/core/src/protocols_handler/select.rs index 2ffcc2a0..a88f9081 100644 --- a/core/src/protocols_handler/select.rs +++ b/core/src/protocols_handler/select.rs @@ -24,6 +24,7 @@ use crate::{ either::EitherOutput, protocols_handler::{ KeepAlive, + SubstreamProtocol, IntoProtocolsHandler, ProtocolsHandler, ProtocolsHandlerEvent, @@ -123,10 +124,12 @@ where type OutboundOpenInfo = EitherOutput; #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { let proto1 = self.proto1.listen_protocol(); let proto2 = self.proto2.listen_protocol(); - SelectUpgrade::new(proto1, proto2) + let timeout = std::cmp::max(proto1.timeout(), proto2.timeout()).clone(); + SubstreamProtocol::new(SelectUpgrade::new(proto1.into_upgrade(), proto2.into_upgrade())) + .with_timeout(timeout) } fn inject_fully_negotiated_outbound(&mut self, protocol: >::Output, endpoint: Self::OutboundOpenInfo) { @@ -206,9 +209,12 @@ where Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::First(event)))); }, - Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info}) => { + Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol, + info, + }) => { return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade: EitherUpgrade::A(upgrade), + protocol: protocol.map_upgrade(EitherUpgrade::A), info: EitherOutput::First(info), })); }, @@ -219,9 +225,12 @@ where Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event)))); }, - Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info }) => { + Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol, + info, + }) => { return Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade: EitherUpgrade::B(upgrade), + protocol: protocol.map_upgrade(EitherUpgrade::B), info: EitherOutput::Second(info), })); }, diff --git a/core/src/swarm/behaviour.rs b/core/src/swarm/behaviour.rs index 29407a4b..47af48c9 100644 --- a/core/src/swarm/behaviour.rs +++ b/core/src/swarm/behaviour.rs @@ -37,7 +37,7 @@ pub trait NetworkBehaviour { /// Event generated by the swarm. type OutEvent; - /// Builds a new `ProtocolsHandler`. + /// Creates a new `ProtocolsHandler` for a connection with a peer. fn new_handler(&mut self) -> Self::ProtocolsHandler; /// Addresses that this behaviour is aware of for this specific peer, and that may allow @@ -104,45 +104,48 @@ pub trait NetworkBehaviourEventProcess { fn inject_event(&mut self, event: TEvent); } -/// Action to perform. +/// An action that a [`NetworkBehaviour`] can trigger in the [`Swarm`] +/// in whose context it is executing. #[derive(Debug, Clone)] pub enum NetworkBehaviourAction { - /// Generate an event for the outside. + /// Instructs the `Swarm` to return an event when it is being polled. GenerateEvent(TOutEvent), // TODO: report new raw connection for usage after intercepting an address dial - /// Instructs the swarm to dial the given multiaddress without any expectation of a peer id. + /// Instructs the swarm to dial the given multiaddress, without a known `PeerId`. DialAddress { /// The address to dial. address: Multiaddr, }, - /// Instructs the swarm to try reach the given peer. + /// Instructs the swarm to dial a known `PeerId`. /// - /// In the future, a corresponding `inject_dial_failure` or `inject_connected` function call - /// must be performed. + /// On success, [`NetworkBehaviour::inject_connected`] is invoked. + /// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked. DialPeer { /// The peer to try reach. peer_id: PeerId, }, - /// If we're connected to the given peer, sends a message to the protocol handler. + /// Instructs the `Swarm` to send a message to a connected peer. /// - /// If we're not connected to this peer, does nothing. If necessary, the implementation of - /// `NetworkBehaviour` is supposed to track which peers we are connected to. + /// If the `Swarm` is connected to the peer, the message is delivered to the remote's + /// protocol handler. If there is no connection to the peer, the message is ignored. + /// To ensure delivery, the `NetworkBehaviour` must keep track of connected peers. SendEvent { - /// The peer which to send the message to. + /// The peer to which to send the message. peer_id: PeerId, - /// Event to send to the peer. + /// The message to send. event: TInEvent, }, - /// Reports that a remote observes us as this address. + /// Informs the `Swarm` about a multi-address observed by a remote for + /// the local node. /// /// The swarm will pass this address through the transport's NAT traversal. ReportObservedAddr { - /// The address we're being observed as. + /// The observed address of the local node. address: Multiaddr, }, } diff --git a/core/src/swarm/swarm.rs b/core/src/swarm/swarm.rs index 3fe03e37..31875666 100644 --- a/core/src/swarm/swarm.rs +++ b/core/src/swarm/swarm.rs @@ -438,6 +438,7 @@ where TBehaviour: NetworkBehaviour, .new_handler() .into_handler(&self.local_peer_id) .listen_protocol() + .into_upgrade() .protocol_info() .into_iter() .map(|info| info.protocol_name().to_vec()) diff --git a/core/src/swarm/toggle.rs b/core/src/swarm/toggle.rs index 4cf3cbf6..0d5aa499 100644 --- a/core/src/swarm/toggle.rs +++ b/core/src/swarm/toggle.rs @@ -20,7 +20,14 @@ use crate::{ either::EitherOutput, - protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, IntoProtocolsHandler}, + protocols_handler::{ + KeepAlive, + SubstreamProtocol, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, + IntoProtocolsHandler + }, swarm::{NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess}, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade, EitherUpgrade}, PeerId, Multiaddr, nodes::ConnectedPoint, swarm::PollParameters, @@ -168,11 +175,11 @@ where type OutboundProtocol = TInner::OutboundProtocol; type OutboundOpenInfo = TInner::OutboundOpenInfo; - fn listen_protocol(&self) -> Self::InboundProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { if let Some(inner) = self.inner.as_ref() { - EitherUpgrade::A(inner.listen_protocol()) + inner.listen_protocol().map_upgrade(EitherUpgrade::A) } else { - EitherUpgrade::B(DeniedUpgrade) + SubstreamProtocol::new(EitherUpgrade::B(DeniedUpgrade)) } } diff --git a/core/tests/raw_swarm_dial_error.rs b/core/tests/raw_swarm_dial_error.rs index eb23aadc..8b1b909c 100644 --- a/core/tests/raw_swarm_dial_error.rs +++ b/core/tests/raw_swarm_dial_error.rs @@ -23,7 +23,14 @@ use libp2p_core::identity; use libp2p_core::multiaddr::multiaddr; use libp2p_core::nodes::raw_swarm::{RawSwarm, RawSwarmEvent, RawSwarmReachError, PeerState, UnknownPeerDialErr, IncomingError}; use libp2p_core::{PeerId, Transport, upgrade, upgrade::InboundUpgradeExt, upgrade::OutboundUpgradeExt}; -use libp2p_core::protocols_handler::{ProtocolsHandler, KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, NodeHandlerWrapperBuilder}; +use libp2p_core::protocols_handler::{ + ProtocolsHandler, + KeepAlive, + SubstreamProtocol, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, + NodeHandlerWrapperBuilder +}; use rand::seq::SliceRandom; use std::io; @@ -48,8 +55,8 @@ where type OutboundProtocol = upgrade::DeniedUpgrade; type OutboundOpenInfo = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) - fn listen_protocol(&self) -> Self::InboundProtocol { - upgrade::DeniedUpgrade + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(upgrade::DeniedUpgrade) } fn inject_fully_negotiated_inbound( diff --git a/core/tests/raw_swarm_simult.rs b/core/tests/raw_swarm_simult.rs index 2a60eb2a..6c0fd2bd 100644 --- a/core/tests/raw_swarm_simult.rs +++ b/core/tests/raw_swarm_simult.rs @@ -22,7 +22,13 @@ use futures::{future, prelude::*}; use libp2p_core::identity; use libp2p_core::nodes::raw_swarm::{RawSwarm, RawSwarmEvent, IncomingError}; use libp2p_core::{Transport, upgrade, upgrade::OutboundUpgradeExt, upgrade::InboundUpgradeExt}; -use libp2p_core::protocols_handler::{ProtocolsHandler, KeepAlive, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; +use libp2p_core::protocols_handler::{ + ProtocolsHandler, + KeepAlive, + SubstreamProtocol, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr +}; use std::{io, time::Duration, time::Instant}; use tokio_timer::Delay; @@ -47,8 +53,8 @@ where type OutboundProtocol = upgrade::DeniedUpgrade; type OutboundOpenInfo = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) - fn listen_protocol(&self) -> Self::InboundProtocol { - upgrade::DeniedUpgrade + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(upgrade::DeniedUpgrade) } fn inject_fully_negotiated_inbound( diff --git a/examples/ping.rs b/examples/ping.rs new file mode 100644 index 00000000..95a1d958 --- /dev/null +++ b/examples/ping.rs @@ -0,0 +1,91 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A basic example demonstrating some core APIs and concepts of libp2p. +//! +//! In the first terminal window, run: +//! +//! ```sh +//! cargo run --example ping +//! ``` +//! +//! It will print the PeerId and the listening address, e.g. `Listening on +//! "/ip4/0.0.0.0/tcp/24915"` +//! +//! In the second terminal window, start a new instance of the example with: +//! +//! ```sh +//! cargo run --example ping -- /ip4/127.0.0.1/tcp/24915 +//! ``` +//! +//! The two nodes establish a connection, negotiate the ping protocol +//! and begin pinging each other. + +use futures::{prelude::*, future}; +use libp2p::{ identity, PeerId, ping::Ping, Swarm }; +use std::env; + +fn main() { + env_logger::init(); + + // Create a random PeerId. + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(id_keys.public()); + println!("Local peer id: {:?}", peer_id); + + // Create a transport. + let transport = libp2p::build_development_transport(id_keys); + + // Create a ping network behaviour. + let behaviour = Ping::default(); + + // Create a Swarm that establishes connections through the given transport + // and applies the ping behaviour on each connection. + let mut swarm = Swarm::new(transport, behaviour, peer_id); + + // Listen on all interfaces and a random, OS-assigned port. + let listen_addr = Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + println!("Listening on {:?}", listen_addr); + + // Dial the peer identified by the multi-address given as the second + // command-line argument, if any. + if let Some(addr) = env::args().nth(1) { + let remote_addr = addr.clone(); + match addr.parse() { + Ok(remote) => { + match Swarm::dial_addr(&mut swarm, remote) { + Ok(()) => println!("Dialed {:?}", remote_addr), + Err(e) => println!("Dialing {:?} failed with: {:?}", remote_addr, e) + } + }, + Err(err) => println!("Failed to parse address to dial: {:?}", err), + } + } + + // Use tokio to drive the `Swarm`. + tokio::run(future::poll_fn(move || -> Result<_, ()> { + loop { + match swarm.poll().expect("Error while polling swarm") { + Async::Ready(Some(e)) => println!("{:?}", e), + Async::Ready(None) | Async::NotReady => return Ok(Async::NotReady), + } + } + })); +} diff --git a/protocols/identify/src/listen_handler.rs b/protocols/identify/src/listen_handler.rs index ca57b101..09f366fe 100644 --- a/protocols/identify/src/listen_handler.rs +++ b/protocols/identify/src/listen_handler.rs @@ -21,7 +21,13 @@ use crate::protocol::{IdentifySender, IdentifyProtocolConfig}; use futures::prelude::*; use libp2p_core::{ - protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{ + KeepAlive, + SubstreamProtocol, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr + }, upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade, Negotiated} }; use smallvec::SmallVec; @@ -61,8 +67,8 @@ where type OutboundOpenInfo = (); #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { - self.config.clone() + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(self.config.clone()) } fn inject_fully_negotiated_inbound( diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs index 35e12b0e..1de93a27 100644 --- a/protocols/identify/src/periodic_id_handler.rs +++ b/protocols/identify/src/periodic_id_handler.rs @@ -21,7 +21,13 @@ use crate::protocol::{RemoteInfo, IdentifyProtocolConfig}; use futures::prelude::*; use libp2p_core::{ - protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, + protocols_handler::{ + KeepAlive, + SubstreamProtocol, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr + }, upgrade::{DeniedUpgrade, OutboundUpgrade} }; use std::{io, marker::PhantomData, time::{Duration, Instant}}; @@ -91,8 +97,8 @@ where type OutboundOpenInfo = (); #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { - DeniedUpgrade + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade) } fn inject_fully_negotiated_inbound(&mut self, protocol: Void) { @@ -148,8 +154,10 @@ where Async::NotReady => Ok(Async::NotReady), Async::Ready(()) => { self.next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); - let upgrade = self.config.clone(); - let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; + let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(self.config.clone()), + info: (), + }; Ok(Async::Ready(ev)) } } diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 1d765830..c235fcc9 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -23,7 +23,13 @@ use crate::protocol::{ KademliaProtocolConfig, }; use futures::prelude::*; -use libp2p_core::protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}; +use libp2p_core::protocols_handler::{ + KeepAlive, + SubstreamProtocol, + ProtocolsHandler, + ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr +}; use libp2p_core::{upgrade, either::EitherOutput, InboundUpgrade, OutboundUpgrade, PeerId, upgrade::Negotiated}; use multihash::Multihash; use std::{error, fmt, io, time::Duration, time::Instant}; @@ -350,11 +356,11 @@ where type OutboundOpenInfo = (KadRequestMsg, Option); #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { + fn listen_protocol(&self) -> SubstreamProtocol { if self.allow_listening { - upgrade::EitherUpgrade::A(self.config) + SubstreamProtocol::new(self.config).map_upgrade(upgrade::EitherUpgrade::A) } else { - upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade) + SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)) } } @@ -550,7 +556,7 @@ where match state { SubstreamState::OutPendingOpen(msg, user_data) => { let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade, + protocol: SubstreamProtocol::new(upgrade), info: (msg, user_data), }; (None, Some(ev), false) diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index cde1bb2d..6bcb4113 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -25,5 +25,7 @@ void = "1.0" [dev-dependencies] libp2p-tcp = { version = "0.6.0", path = "../../transports/tcp" } +libp2p-secio = { version = "0.6.0", path = "../../protocols/secio" } +libp2p-yamux = { version = "0.6.0", path = "../../muxers/yamux" } tokio = "0.1" tokio-tcp = "0.1" diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 87949605..0754e2d7 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -21,107 +21,262 @@ use crate::protocol; use futures::prelude::*; use libp2p_core::ProtocolsHandlerEvent; -use libp2p_core::protocols_handler::{KeepAlive, OneShotHandler, ProtocolsHandler, ProtocolsHandlerUpgrErr}; -use std::{io, time::Duration, time::Instant}; +use libp2p_core::protocols_handler::{ + KeepAlive, + SubstreamProtocol, + ProtocolsHandler, + ProtocolsHandlerUpgrErr, +}; +use std::{io, num::NonZeroU32, time::{Duration, Instant}}; +use std::collections::VecDeque; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; +use void::Void; -/// Protocol handler that handles pinging the remote at a regular period and answering ping -/// queries. +/// The configuration for outbound pings. +#[derive(Clone, Debug)] +pub struct PingConfig { + /// The timeout of an outbound ping. + timeout: Duration, + /// The duration between the last successful outbound or inbound ping + /// and the next outbound ping. + interval: Duration, + /// The maximum number of failed outbound pings before the associated + /// connection is deemed unhealthy, indicating to the `Swarm` that it + /// should be closed. + max_timeouts: NonZeroU32, + /// The policy for outbound pings. + policy: PingPolicy +} + +impl PingConfig { + /// Creates a new `PingConfig` with the following default settings: + /// + /// * [`PingConfig::with_interval`] 15s + /// * [`PingConfig::with_timeout`] 20s + /// * [`PingConfig::with_max_timeouts`] 1 + /// * [`PingConfig::with_policy`] [`PingPolicy::Always`] + /// + /// These settings have the following effect: + /// + /// * A ping is sent every 15 seconds on a healthy connection. + /// * Every ping sent must yield a response within 20 seconds in order to + /// be successful. + /// * The duration of a single ping timeout without sending or receiving + /// a pong is sufficient for the connection to be subject to being closed, + /// i.e. the connection timeout is reset to 20 seconds from the current + /// [`Instant`] upon a sent or received pong. + /// + /// In general, every successfully sent or received pong resets the connection + /// timeout, which is defined by + /// ```raw + /// max_timeouts * timeout + /// ```raw + /// relative to the current [`Instant`]. + /// + /// A sensible configuration should thus obey the invariant: + /// ```raw + /// max_timeouts * timeout > interval + /// ``` + pub fn new() -> Self { + Self { + timeout: Duration::from_secs(20), + interval: Duration::from_secs(15), + max_timeouts: NonZeroU32::new(1).expect("1 != 0"), + policy: PingPolicy::Always + } + } + + /// Sets the ping timeout. + pub fn with_timeout(mut self, d: Duration) -> Self { + self.timeout = d; + self + } + + /// Sets the ping interval. + pub fn with_interval(mut self, d: Duration) -> Self { + self.interval = d; + self + } + + /// Sets the number of successive ping timeouts upon which the remote + /// peer is considered unreachable and the connection closed. + /// + /// > **Note**: Successful inbound pings from the remote peer can keep + /// > the connection alive, even if outbound pings fail. I.e. + /// > the connection is closed after `ping_timeout * max_timeouts` + /// > only if in addition to failing outbound pings, no ping from + /// > the remote is received within that time window. + pub fn with_max_timeouts(mut self, n: NonZeroU32) -> Self { + self.max_timeouts = n; + self + } + + /// Sets the [`PingPolicy`] to use for outbound pings. + pub fn with_policy(mut self, p: PingPolicy) -> Self { + self.policy = p; + self + } +} + +/// The `PingPolicy` determines under what conditions an outbound ping +/// is sent w.r.t. inbound pings. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum PingPolicy { + /// Always send a ping in the configured interval, regardless + /// of received pings. + /// + /// This policy is appropriate e.g. if continuous measurement of + /// the RTT to the remote is desired. + Always, + /// Only sent a ping as necessary to keep the connection alive. + /// + /// This policy resets the local ping timer whenever an inbound ping + /// is received, effectively letting the peer with the lower ping + /// frequency drive the ping protocol. Hence, to avoid superfluous ping + /// traffic, the ping interval of the peers should ideally not be the + /// same when using this policy, e.g. through randomization. + /// + /// This policy is appropriate if the ping protocol is only used + /// as an application-layer connection keep-alive, without a need + /// for measuring the round-trip times on both peers, as it tries + /// to keep the ping traffic to a minimum. + KeepAlive +} + +/// The result of an inbound or outbound ping. +pub type PingResult = Result; + +/// The successful result of processing an inbound or outbound ping. +#[derive(Debug)] +pub enum PingSuccess { + /// Received a ping and sent back a pong. + Pong, + /// Sent a ping and received back a pong. + /// + /// Includes the round-trip time. + Ping { rtt: Duration }, +} + +/// An outbound ping failure. +#[derive(Debug)] +pub enum PingFailure { + /// The ping timed out, i.e. no response was received within the + /// configured ping timeout. + Timeout, + /// The ping failed for reasons other than a timeout. + Other { error: Box } +} + +/// Protocol handler that handles pinging the remote at a regular period +/// and answering ping queries. /// /// If the remote doesn't respond, produces an error that closes the connection. -pub struct PingHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - /// The actual handler which we delegate the substreams handling to. - inner: OneShotHandler, - /// After a ping succeeds, how long before the next ping. - delay_to_next_ping: Duration, - /// When the next ping triggers. +pub struct PingHandler { + /// Configuration options. + config: PingConfig, + /// The timer for when to send the next ping. next_ping: Delay, + /// The connection timeout. + connection_timeout: Duration, + /// The current keep-alive, i.e. until when the connection that + /// the handler operates on should be kept alive. + connection_keepalive: KeepAlive, + /// The pending results from inbound or outbound pings, ready + /// to be `poll()`ed. + pending_results: VecDeque<(Instant, PingResult)>, + _marker: std::marker::PhantomData } impl PingHandler where TSubstream: AsyncRead + AsyncWrite, { - /// Builds a new `PingHandler`. - pub fn new() -> Self { - // TODO: allow customizing timeout; depends on https://github.com/libp2p/rust-libp2p/issues/864 + /// Builds a new `PingHandler` with the given configuration. + pub fn new(config: PingConfig) -> Self { + let now = Instant::now(); + let connection_timeout = config.timeout * config.max_timeouts.get(); + let connection_keepalive = KeepAlive::Until(now + connection_timeout); + let next_ping = Delay::new(now); PingHandler { - inner: OneShotHandler::default(), - next_ping: Delay::new(Instant::now()), - delay_to_next_ping: Duration::from_secs(15), + config, + next_ping, + connection_timeout, + connection_keepalive, + pending_results: VecDeque::with_capacity(2), + _marker: std::marker::PhantomData } } } -impl Default for PingHandler -where - TSubstream: AsyncRead + AsyncWrite, -{ - #[inline] - fn default() -> Self { - PingHandler::new() - } -} - impl ProtocolsHandler for PingHandler where TSubstream: AsyncRead + AsyncWrite, { - type InEvent = void::Void; - type OutEvent = protocol::PingOutput; + type InEvent = Void; + type OutEvent = PingResult; type Error = ProtocolsHandlerUpgrErr; type Substream = TSubstream; type InboundProtocol = protocol::Ping; type OutboundProtocol = protocol::Ping; type OutboundOpenInfo = (); - fn listen_protocol(&self) -> Self::InboundProtocol { - self.inner.listen_protocol() + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(protocol::Ping) } - fn inject_fully_negotiated_inbound(&mut self, protocol: ()) { - self.inner.inject_fully_negotiated_inbound(protocol) + fn inject_fully_negotiated_inbound(&mut self, _: ()) { + // A ping from a remote peer has been answered. + self.pending_results.push_front((Instant::now(), Ok(PingSuccess::Pong))); } - fn inject_fully_negotiated_outbound(&mut self, duration: Duration, info: Self::OutboundOpenInfo) { - self.inner.inject_fully_negotiated_outbound(duration, info) + fn inject_fully_negotiated_outbound(&mut self, rtt: Duration, _info: ()) { + // A ping initiated by the local peer was answered by the remote. + self.pending_results.push_front((Instant::now(), Ok(PingSuccess::Ping { rtt }))); } - fn inject_event(&mut self, event: void::Void) { - void::unreachable(event) - } + fn inject_event(&mut self, _: Void) {} - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr) { - self.inner.inject_dial_upgrade_error(info, error) + fn inject_dial_upgrade_error(&mut self, _info: (), error: Self::Error) { + self.pending_results.push_front( + (Instant::now(), Err(match error { + ProtocolsHandlerUpgrErr::Timeout => PingFailure::Timeout, + e => PingFailure::Other { error: Box::new(e) } + }))) } fn connection_keep_alive(&self) -> KeepAlive { - self.inner.connection_keep_alive() + self.connection_keepalive } - fn poll( - &mut self, - ) -> Poll< - ProtocolsHandlerEvent, - Self::Error, - > { + fn poll(&mut self) -> Poll, Self::Error> { + if let Some((instant, result)) = self.pending_results.pop_back() { + if result.is_ok() { + self.connection_keepalive = KeepAlive::Until(instant + self.connection_timeout); + let reset = match result { + Ok(PingSuccess::Ping { .. }) => true, + Ok(PingSuccess::Pong) => self.config.policy == PingPolicy::KeepAlive, + _ => false + }; + if reset { + self.next_ping.reset(instant + self.config.interval); + } + } + return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(result))) + } + match self.next_ping.poll() { Ok(Async::Ready(())) => { - self.inner.inject_event(protocol::Ping::default()); - self.next_ping.reset(Instant::now() + Duration::from_secs(3600)); + self.next_ping.reset(Instant::now() + self.config.timeout); + let protocol = SubstreamProtocol::new(protocol::Ping) + .with_timeout(self.config.timeout); + Ok(Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol, + info: (), + })) }, - Ok(Async::NotReady) => (), - Err(_) => (), - }; - - let event = self.inner.poll(); - if let Ok(Async::Ready(ProtocolsHandlerEvent::Custom(protocol::PingOutput::Ping(_)))) = event { - self.next_ping.reset(Instant::now() + self.delay_to_next_ping); + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => Err(ProtocolsHandlerUpgrErr::Timer) } - event } } diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index dd9eb952..ab90f762 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -18,65 +18,74 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Handles the `/ipfs/ping/1.0.0` protocol. This allows pinging a remote node and waiting for an -//! answer. +//! This module implements the `/ipfs/ping/1.0.0` protocol. +//! +//! The ping protocol can be used as an application-layer keep-alive functionality +//! for connections of any [`Transport`] as well as to measure round-trip times. //! //! # Usage //! -//! The `Ping` struct implements the `NetworkBehaviour` trait. When used, it will automatically -//! send a periodic ping to nodes we are connected to. If a remote doesn't answer in time, it gets -//! automatically disconnected. +//! The [`Ping`] struct implements the [`NetworkBehaviour`] trait. When used with a [`Swarm`], +//! it will respond to inbound ping requests and as necessary periodically send outbound +//! ping requests on every established connection. If no pings are received or +//! successfully sent within a configurable time window, [`PingHandler::connection_keep_alive`] +//! eventually indicates to the `Swarm` that the connection should be closed. //! -//! The `Ping` struct is also what handles answering to the pings sent by remotes. +//! The `Ping` network behaviour produces [`PingEvent`]s, which may be consumed from the `Swarm` +//! by an application, e.g. to collect statistics. //! -//! When a ping succeeds, a `PingSuccess` event is generated, indicating the time the ping took. +//! [`Swarm`]: libp2p_core::Swarm +//! [`Transport`]: libp2p_core::Transport pub mod protocol; pub mod handler; +pub use handler::{PingConfig, PingPolicy, PingResult, PingSuccess, PingFailure}; +use handler::PingHandler; + use futures::prelude::*; use libp2p_core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p_core::protocols_handler::ProtocolsHandler; use libp2p_core::{Multiaddr, PeerId}; -use std::{marker::PhantomData, time::Duration}; +use std::collections::VecDeque; +use std::marker::PhantomData; use tokio_io::{AsyncRead, AsyncWrite}; +use void::Void; -/// Network behaviour that handles receiving pings sent by other nodes and periodically pings the -/// nodes we are connected to. +/// `Ping` is a [`NetworkBehaviour`] that responds to inbound pings and +/// periodically sends outbound pings on every established connection. /// /// See the crate root documentation for more information. pub struct Ping { - /// Marker to pin the generics. - marker: PhantomData, - /// Queue of events to report to the user. - events: Vec, + /// Configuration for outbound pings. + config: PingConfig, + /// Queue of events to yield to the swarm. + events: VecDeque, + _marker: PhantomData, } -/// Event generated by the `Ping` behaviour. -pub enum PingEvent { - /// We have successfully pinged a peer we are connected to. - PingSuccess { - /// Id of the peer that we pinged. - peer: PeerId, - /// Time elapsed between when we sent the ping and when we received the response. - time: Duration, - } +/// Event generated by the `Ping` network behaviour. +#[derive(Debug)] +pub struct PingEvent { + /// The peer ID of the remote. + pub peer: PeerId, + /// The result of an inbound or outbound ping. + pub result: PingResult, } impl Ping { - /// Creates a `Ping`. - pub fn new() -> Self { + /// Creates a new `Ping` network behaviour with the given configuration. + pub fn new(config: PingConfig) -> Self { Ping { - marker: PhantomData, - events: Vec::new(), + config, + events: VecDeque::new(), + _marker: PhantomData, } } } impl Default for Ping { - #[inline] fn default() -> Self { - Ping::new() + Ping::new(PingConfig::new()) } } @@ -84,11 +93,11 @@ impl NetworkBehaviour for Ping where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = handler::PingHandler; + type ProtocolsHandler = PingHandler; type OutEvent = PingEvent; fn new_handler(&mut self) -> Self::ProtocolsHandler { - handler::PingHandler::default() + PingHandler::new(self.config.clone()) } fn addresses_of_peer(&mut self, _peer_id: &PeerId) -> Vec { @@ -99,32 +108,16 @@ where fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} - fn inject_node_event( - &mut self, - source: PeerId, - event: ::OutEvent, - ) { - if let protocol::PingOutput::Ping(time) = event { - self.events.push(PingEvent::PingSuccess { - peer: source, - time, - }) - } + fn inject_node_event(&mut self, peer: PeerId, result: PingResult) { + self.events.push_front(PingEvent { peer, result }) } - fn poll( - &mut self, - _: &mut PollParameters<'_>, - ) -> Async< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { - if !self.events.is_empty() { - return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + fn poll(&mut self, _: &mut PollParameters<'_>) -> Async> + { + if let Some(e) = self.events.pop_back() { + Async::Ready(NetworkBehaviourAction::GenerateEvent(e)) + } else { + Async::NotReady } - - Async::NotReady } } diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index bb820e51..4ac0a3f0 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -21,9 +21,9 @@ use futures::{prelude::*, future, try_ready}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated}; use log::debug; -use rand::{distributions::Standard, prelude::*, rngs::EntropyRng}; +use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration, time::Instant}; -use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::{io as nio, AsyncRead, AsyncWrite}; /// Represents a prototype for an upgrade to handle the ping protocol. /// @@ -33,8 +33,14 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// - Listener receives the data and sends it back. /// - Dialer receives the data and verifies that it matches what it sent. /// -/// The dialer produces a `Duration`, which corresponds to the time between when we flushed the -/// substream and when we received back the payload. +/// The dialer produces a `Duration`, which corresponds to the round-trip time +/// of the payload. +/// +/// > **Note**: The round-trip time of a ping may be subject to delays induced +/// > by the underlying transport, e.g. in the case of TCP there is +/// > Nagle's algorithm, delayed acks and similar configuration options +/// > which can affect latencies especially on otherwise low-volume +/// > connections. #[derive(Default, Debug, Copy, Clone)] pub struct Ping; @@ -47,20 +53,33 @@ impl UpgradeInfo for Ping { } } +type RecvPing = nio::ReadExact, [u8; 32]>; +type SendPong = nio::WriteAll, [u8; 32]>; +type Flush = nio::Flush>; +type Shutdown = nio::Shutdown>; + impl InboundUpgrade for Ping where TSocket: AsyncRead + AsyncWrite, { type Output = (); type Error = io::Error; - type Future = future::Map, [u8; 32]>, tokio_io::io::WriteAll, [u8; 32]>, fn((Negotiated, [u8; 32])) -> tokio_io::io::WriteAll, [u8; 32]>>, tokio_io::io::Flush>, fn((Negotiated, [u8; 32])) -> tokio_io::io::Flush>>, tokio_io::io::Shutdown>, fn(Negotiated) -> tokio_io::io::Shutdown>>, fn(Negotiated) -> ()>; + type Future = future::Map< + future::AndThen< + future::AndThen< + future::AndThen< + RecvPing, + SendPong, fn((Negotiated, [u8; 32])) -> SendPong>, + Flush, fn((Negotiated, [u8; 32])) -> Flush>, + Shutdown, fn(Negotiated) -> Shutdown>, + fn(Negotiated) -> ()>; #[inline] fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { - tokio_io::io::read_exact(socket, [0; 32]) - .and_then:: _, _>(|(socket, buffer)| tokio_io::io::write_all(socket, buffer)) - .and_then:: _, _>(|(socket, _)| tokio_io::io::flush(socket)) - .and_then:: _, _>(|socket| tokio_io::io::shutdown(socket)) + nio::read_exact(socket, [0; 32]) + .and_then:: _, _>(|(sock, buf)| nio::write_all(sock, buf)) + .and_then:: _, _>(|(sock, _)| nio::flush(sock)) + .and_then:: _, _>(|sock| nio::shutdown(sock)) .map(|_| ()) } } @@ -75,143 +94,135 @@ where #[inline] fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { - let payload: [u8; 32] = EntropyRng::default().sample(Standard); - debug!("Preparing for ping with payload {:?}", payload); + let payload: [u8; 32] = thread_rng().sample(distributions::Standard); + debug!("Preparing ping payload {:?}", payload); PingDialer { - inner: PingDialerInner::Write { - inner: tokio_io::io::write_all(socket, payload), + state: PingDialerState::Write { + inner: nio::write_all(socket, payload), }, } } } -/// Sends a ping and receives a pong. -/// -/// Implements `Future`. Finishes when the pong has arrived and has been verified. +/// A `PingDialer` is a future that sends a ping and expects to receive a pong. pub struct PingDialer { - inner: PingDialerInner, + state: PingDialerState } -enum PingDialerInner { +enum PingDialerState { Write { - inner: tokio_io::io::WriteAll, + inner: nio::WriteAll, }, Flush { - inner: tokio_io::io::Flush, - ping_payload: [u8; 32], + inner: nio::Flush, + payload: [u8; 32], }, Read { - inner: tokio_io::io::ReadExact, - ping_payload: [u8; 32], + inner: nio::ReadExact, + payload: [u8; 32], started: Instant, }, Shutdown { - inner: tokio_io::io::Shutdown, - ping_time: Duration, + inner: nio::Shutdown, + rtt: Duration, }, } impl Future for PingDialer -where TSocket: AsyncRead + AsyncWrite, +where + TSocket: AsyncRead + AsyncWrite, { type Item = Duration; type Error = io::Error; fn poll(&mut self) -> Poll { loop { - let new_state = match self.inner { - PingDialerInner::Write { ref mut inner } => { - let (socket, ping_payload) = try_ready!(inner.poll()); - PingDialerInner::Flush { - inner: tokio_io::io::flush(socket), - ping_payload, + self.state = match self.state { + PingDialerState::Write { ref mut inner } => { + let (socket, payload) = try_ready!(inner.poll()); + PingDialerState::Flush { + inner: nio::flush(socket), + payload, } }, - PingDialerInner::Flush { ref mut inner, ping_payload } => { + PingDialerState::Flush { ref mut inner, payload } => { let socket = try_ready!(inner.poll()); let started = Instant::now(); - PingDialerInner::Read { - inner: tokio_io::io::read_exact(socket, [0; 32]), - ping_payload, + PingDialerState::Read { + inner: nio::read_exact(socket, [0; 32]), + payload, started, } }, - PingDialerInner::Read { ref mut inner, ping_payload, started } => { - let (socket, obtained) = try_ready!(inner.poll()); - let ping_time = started.elapsed(); - if obtained != ping_payload { - return Err(io::Error::new(io::ErrorKind::InvalidData, - "Received ping payload doesn't match expected")); + PingDialerState::Read { ref mut inner, payload, started } => { + let (socket, payload_received) = try_ready!(inner.poll()); + let rtt = started.elapsed(); + if payload_received != payload { + return Err(io::Error::new( + io::ErrorKind::InvalidData, "Ping payload mismatch")); } - PingDialerInner::Shutdown { - inner: tokio_io::io::shutdown(socket), - ping_time, + PingDialerState::Shutdown { + inner: nio::shutdown(socket), + rtt, } }, - PingDialerInner::Shutdown { ref mut inner, ping_time } => { - let _ = try_ready!(inner.poll()); - return Ok(Async::Ready(ping_time)); + PingDialerState::Shutdown { ref mut inner, rtt } => { + try_ready!(inner.poll()); + return Ok(Async::Ready(rtt)); }, - }; - - self.inner = new_state; + } } } } -/// Enum to merge the output of `Ping` for the dialer and listener. -#[derive(Debug, Copy, Clone)] -pub enum PingOutput { - /// Received a ping and sent back a pong. - Pong, - /// Sent a ping and received back a pong. Contains the ping time. - Ping(Duration), -} - -impl From for PingOutput { - #[inline] - fn from(duration: Duration) -> PingOutput { - PingOutput::Ping(duration) - } -} - -impl From<()> for PingOutput { - #[inline] - fn from(_: ()) -> PingOutput { - PingOutput::Pong - } -} - #[cfg(test)] mod tests { - use tokio_tcp::{TcpListener, TcpStream}; use super::Ping; - use futures::{Future, Stream}; - use libp2p_core::upgrade; - - // TODO: rewrite tests with the MemoryTransport + use futures::prelude::*; + use libp2p_core::{ + upgrade, + multiaddr::multiaddr, + transport::{ + Transport, + ListenerEvent, + memory::MemoryTransport + } + }; + use rand::{thread_rng, Rng}; + use std::time::Duration; #[test] fn ping_pong() { - let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let listener_addr = listener.local_addr().unwrap(); + let mem_addr = multiaddr![Memory(thread_rng().gen::())]; + let mut listener = MemoryTransport.listen_on(mem_addr).unwrap(); + + let listener_addr = + if let Ok(Async::Ready(Some(ListenerEvent::NewAddress(a)))) = listener.poll() { + a + } else { + panic!("MemoryTransport not listening on an address!"); + }; let server = listener - .incoming() .into_future() .map_err(|(e, _)| e) - .and_then(|(c, _)| { - upgrade::apply_inbound(c.unwrap(), Ping::default()).map_err(|_| panic!()) + .and_then(|(listener_event, _)| { + let (listener_upgrade, _) = listener_event.unwrap().into_upgrade().unwrap(); + let conn = listener_upgrade.wait().unwrap(); + upgrade::apply_inbound(conn, Ping::default()) + .map_err(|e| panic!(e)) }); - let client = TcpStream::connect(&listener_addr) + let client = MemoryTransport.dial(listener_addr).unwrap() .and_then(|c| { - upgrade::apply_outbound(c, Ping::default()).map_err(|_| panic!()) - }) - .map(|_| ()); + upgrade::apply_outbound(c, Ping::default()) + .map_err(|e| panic!(e)) + }); let mut runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.block_on(server.select(client).map_err(|_| panic!())).unwrap(); + runtime.spawn(server.map_err(|e| panic!(e))); + let rtt = runtime.block_on(client).expect("RTT"); + assert!(rtt > Duration::from_secs(0)); } } diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs new file mode 100644 index 00000000..11a12d16 --- /dev/null +++ b/protocols/ping/tests/ping.rs @@ -0,0 +1,124 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Integration tests for the `Ping` network behaviour. + +use libp2p_core::{ + Multiaddr, + PeerId, + Swarm, + identity, + muxing::StreamMuxer, + upgrade::{self, OutboundUpgradeExt, InboundUpgradeExt}, + transport::Transport +}; +use libp2p_ping::*; +use libp2p_yamux as yamux; +use libp2p_secio::SecioConfig; +use libp2p_tcp::TcpConfig; +use futures::{future, prelude::*}; +use std::{fmt, time::Duration, sync::mpsc::sync_channel}; +use tokio::runtime::Runtime; + +#[test] +fn ping() { + let (peer1_id, trans) = mk_transport(); + let mut swarm1 = Swarm::new(trans, Ping::default(), peer1_id.clone()); + + let (peer2_id, trans) = mk_transport(); + let mut swarm2 = Swarm::new(trans, Ping::default(), peer2_id.clone()); + + let (tx, rx) = sync_channel::(1); + + let pid1 = peer1_id.clone(); + let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + let mut listening = false; + Swarm::listen_on(&mut swarm1, addr).unwrap(); + let peer1 = future::poll_fn(move || -> Result<_, ()> { + loop { + match swarm1.poll().expect("Error while polling swarm") { + Async::Ready(Some(PingEvent { peer, result })) => match result { + Ok(PingSuccess::Ping { rtt }) => + return Ok(Async::Ready((pid1.clone(), peer, rtt))), + _ => {} + }, + _ => { + if !listening { + for l in Swarm::listeners(&swarm1) { + tx.send(l.clone()).unwrap(); + listening = true; + } + } + return Ok(Async::NotReady) + } + } + } + }); + + let pid2 = peer2_id.clone(); + let mut dialing = false; + let peer2 = future::poll_fn(move || -> Result<_, ()> { + loop { + match swarm2.poll().expect("Error while polling swarm") { + Async::Ready(Some(PingEvent { peer, result })) => match result { + Ok(PingSuccess::Ping { rtt }) => + return Ok(Async::Ready((pid2.clone(), peer, rtt))), + _ => {} + }, + _ => { + if !dialing { + Swarm::dial_addr(&mut swarm2, rx.recv().unwrap()).unwrap(); + dialing = true; + } + return Ok(Async::NotReady) + } + } + } + }); + + let result = peer1.select(peer2).map_err(|e| panic!(e)); + let ((p1, p2, rtt), _) = Runtime::new().unwrap().block_on(result).unwrap(); + assert!(p1 == peer1_id && p2 == peer2_id || p1 == peer2_id && p2 == peer1_id); + assert!(rtt < Duration::from_millis(50)); +} + +fn mk_transport() -> (PeerId, impl Transport< + Output = (PeerId, impl StreamMuxer), + Listener = impl Send, + ListenerUpgrade = impl Send, + Dial = impl Send, + Error = impl fmt::Debug +> + Clone) { + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = id_keys.public().into_peer_id(); + let transport = TcpConfig::new() + .nodelay(true) + .with_upgrade(SecioConfig::new(id_keys)) + .and_then(move |out, endpoint| { + let peer_id = out.remote_key.into_peer_id(); + let peer_id2 = peer_id.clone(); + let upgrade = yamux::Config::default() + .map_outbound(move |muxer| (peer_id, muxer)) + .map_inbound(move |muxer| (peer_id2, muxer)); + upgrade::apply(out.stream, upgrade, endpoint) + }); + (peer_id, transport) +} + diff --git a/src/lib.rs b/src/lib.rs index f06caadb..5c7f3d34 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -283,8 +283,8 @@ impl CommonTransport { #[inline] #[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] pub fn new() -> CommonTransport { - let transport = tcp::TcpConfig::new(); - let transport = dns::DnsConfig::new(transport); + let tcp = tcp::TcpConfig::new().nodelay(true); + let transport = dns::DnsConfig::new(tcp); #[cfg(feature = "libp2p-websocket")] let transport = { let trans_clone = transport.clone();