From e54a8e716e1d8b1be6a023b27c9e19fcf2e229aa Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 22 Nov 2017 10:58:06 +0100 Subject: [PATCH] Implement ping --- Cargo.toml | 1 + README.md | 2 + libp2p-ping/Cargo.toml | 17 +++ libp2p-ping/src/lib.rs | 290 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 310 insertions(+) create mode 100644 libp2p-ping/Cargo.toml create mode 100644 libp2p-ping/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 507ae53f..800cca3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "datastore", "example", "libp2p-peerstore", + "libp2p-ping", "libp2p-secio", "libp2p-swarm", "libp2p-tcp-transport", diff --git a/README.md b/README.md index c7de3c20..eb0d0316 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ Architecture of the crates of this repository: - `libp2p-peerstore`: Generic storage for information about remote peers (their multiaddresses and their public key), with multiple possible backends. Each multiaddress also has a time-to-live. Used by `libp2p-swarm`. +- `libp2p-ping`: Implementation of the `ping` protocol (the exact protocol is specific to libp2p). + Implements the `ConnectionUpgrade` trait of `libp2p-swarm`. - `libp2p-secio`: Implementation of the `secio` protocol. Encrypts communications. Implements the `ConnectionUpgrade` trait of `libp2p-swarm`. - `libp2p-swarm`: Core library that contains all the traits of *libp2p* and plugs things together. diff --git a/libp2p-ping/Cargo.toml b/libp2p-ping/Cargo.toml new file mode 100644 index 00000000..dd8c831a --- /dev/null +++ b/libp2p-ping/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "libp2p-ping" +version = "0.1.0" +authors = ["pierre "] + +[dependencies] +bytes = "0.4" +libp2p-swarm = { path = "../libp2p-swarm" } +multiaddr = "0.2.0" +multistream-select = { path = "../multistream-select" } +futures = "0.1" +parking_lot = "0.5" +rand = "0.3" +tokio-io = "0.1" + +[dev-dependencies] +tokio-core = "0.1" diff --git a/libp2p-ping/src/lib.rs b/libp2p-ping/src/lib.rs new file mode 100644 index 00000000..8d87a7b2 --- /dev/null +++ b/libp2p-ping/src/lib.rs @@ -0,0 +1,290 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Handles the `/ipfs/ping/1.0.0` protocol. This allows pinging a remote node and waiting for an +//! answer. +//! +//! # Usage +//! +//! Create a `Ping` struct, which implements the `ConnectionUpgrade` trait. When used as a +//! connection upgrade, it will produce a tuple of type `(Pinger, impl Future)` which +//! are named the *pinger* and the *ponger*. +//! +//! The *pinger* has a method named `ping` which will send a ping to the remote, while the *ponger* +//! is a future that will process the data received on the socket and will be signalled only when +//! the connection closes. +//! +//! # About timeouts +//! +//! For technical reasons, this crate doesn't handle timeouts. The action of pinging returns a +//! future that is signalled only when the remote answers. If the remote is not responsive, the +//! future will never be signalled. +//! +//! For implementation reasons, resources allocated for a ping are only ever fully reclaimed after +//! a pong has been received by the remote. Therefore if you repeatidely ping a non-responsive +//! remote you will end up using more and memory memory (albeit the amount is very very small every +//! time), even if you destroy the future returned by `ping`. +//! +//! This is probably not a problem in practice, because the nature of the ping protocol is to +//! determine whether a remote is still alive, and any reasonable user of this crate will close +//! connections to non-responsive remotes. +//! + +extern crate bytes; +extern crate futures; +extern crate libp2p_swarm; +extern crate multistream_select; +extern crate parking_lot; +extern crate rand; +extern crate tokio_io; + +use bytes::{Bytes, BytesMut, BufMut}; +use futures::{Future, Sink, Stream}; +use futures::future::{FutureResult, IntoFuture, loop_fn, Loop}; +use futures::sync::{mpsc, oneshot}; +use libp2p_swarm::transport::ConnectionUpgrade; +use parking_lot::Mutex; +use rand::Rand; +use rand::os::OsRng; +use std::collections::HashMap; +use std::error::Error; +use std::io::Error as IoError; +use std::iter; +use std::sync::Arc; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::{Encoder, Decoder}; + +/// Represents a prototype for an upgrade to handle the ping protocol. +/// +/// According to the design of libp2p, this struct would normally contain the configuration options +/// for the protocol, but in the case of `Ping` no configuration is required. +#[derive(Debug, Copy, Clone, Default)] +pub struct Ping; + +impl ConnectionUpgrade for Ping + where C: AsyncRead + AsyncWrite + 'static +{ + type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once(("/ipfs/ping/1.0.0".into(), ())) + } + + type Output = (Pinger, Box>); + type Future = FutureResult; + + #[inline] + fn upgrade(self, socket: C, _: Self::UpgradeIdentifier) -> Self::Future { + // # How does it work? + // + // All the actual processing is performed by the *ponger*. + // We use a channel in order to send ping requests from the pinger to the ponger. + + let (tx, rx) = mpsc::channel(8); + // Ignore the errors if `tx` closed. `tx` is only ever closed if the ponger is closed, + // which means that the connection to the remote is closed. Therefore we make the `rx` + // never produce anything. + let rx = rx.then(|r| Ok(r.ok())).filter_map(|a| a); + + let os_rng = match OsRng::new() { + Ok(r) => r, + Err(err) => return Err(err).into_future(), + }; + + let pinger = Pinger { + send: tx, + os_rng: os_rng, + }; + + // Hashmap that associates outgoing payloads to one-shot senders. + // TODO: can't figure out how to make it work without using an Arc/Mutex + let expected_pongs = Arc::new(Mutex::new(HashMap::with_capacity(4))); + + let sink_stream = socket.framed(Codec).map(|msg| Message::Received(msg.freeze())); + let (sink, stream) = sink_stream.split(); + + let future = loop_fn((sink, stream.select(rx)), move |(sink, stream)| { + let expected_pongs = expected_pongs.clone(); + + stream.into_future().map_err(|(err, _)| err).and_then(move |(message, stream)| { + let mut expected_pongs = expected_pongs.lock(); + + if let Some(message) = message { + match message { + Message::Ping(payload, finished) => { + // Ping requested by the user through the `Pinger`. + expected_pongs.insert(payload.clone(), finished); + Box::new( + sink.send(payload).map(|sink| Loop::Continue((sink, stream))), + ) as Box> + } + Message::Received(payload) => { + // Received a payload from the remote. + if let Some(fut) = expected_pongs.remove(&payload) { + // Payload was ours. Signalling future. + // Errors can happen if the user closed the receiving end of + // the future, which is fine to ignore. + let _ = fut.send(()); + Box::new(Ok(Loop::Continue((sink, stream))).into_future()) as + Box> + } else { + // Payload was not ours. Sending it back. + Box::new( + sink.send(payload).map(|sink| Loop::Continue((sink, stream))), + ) as Box> + } + } + } + + } else { + Box::new(Ok(Loop::Break(())).into_future()) as Box> + } + }) + }); + + Ok((pinger, Box::new(future) as Box<_>)).into_future() + } +} + +/// Controller for the ping service. Makes it possible to send pings to the remote. +pub struct Pinger { + send: mpsc::Sender, + os_rng: OsRng, +} + +impl Pinger { + /// Sends a ping. Returns a future that is signaled when a pong is received. + /// + /// **Note**: Please be aware that there is no timeout on the ping. You should handle the + /// timeout yourself when you call this function. + pub fn ping(&mut self) -> Box>> { + let (tx, rx) = oneshot::channel(); + let payload: [u8; 32] = Rand::rand(&mut self.os_rng); + // Ignore errors if the ponger has been already destroyed. The returned future will never + // be signalled. + let fut = self.send.clone().send(Message::Ping(Bytes::from(payload.to_vec()), tx)) + .from_err() + .and_then(|_| rx.from_err()); + Box::new(fut) as Box<_> + } +} + +enum Message { + Ping(Bytes, oneshot::Sender<()>), + Received(Bytes), +} + +// Implementation of the `Codec` trait of tokio-io. Splits frames into groups of 32 bytes. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +struct Codec; + +impl Decoder for Codec { + type Item = BytesMut; + type Error = IoError; + + #[inline] + fn decode(&mut self, buf: &mut BytesMut) -> Result, IoError> { + if buf.len() >= 32 { Ok(Some(buf.split_to(32))) } else { Ok(None) } + } +} + +impl Encoder for Codec { + type Item = Bytes; + type Error = IoError; + + #[inline] + fn encode(&mut self, mut data: Bytes, buf: &mut BytesMut) -> Result<(), IoError> { + if data.len() != 0 { + let split = 32 * (1 + ((data.len() - 1) / 32)); + buf.put(data.split_to(split)); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + extern crate tokio_core; + + use self::tokio_core::net::TcpListener; + use self::tokio_core::net::TcpStream; + use self::tokio_core::reactor::Core; + use super::Ping; + use futures::future::join_all; + use futures::Future; + use futures::Stream; + use libp2p_swarm::transport::ConnectionUpgrade; + + #[test] + fn ping_pong() { + let mut core = Core::new().unwrap(); + + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = listener.incoming() + .into_future() + .map_err(|(e, _)| e.into()) + .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, ())) + .and_then(|(mut pinger, service)| { + pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) + }); + + let client = TcpStream::connect(&listener_addr, &core.handle()) + .map_err(|e| e.into()) + .and_then(|c| Ping.upgrade(c, ())) + .and_then(|(mut pinger, service)| { + pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) + }); + + core.run(server.join(client)).unwrap(); + } + + #[test] + fn multipings() { + // Check that we can send multiple pings in a row and it will still work. + let mut core = Core::new().unwrap(); + + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let listener_addr = listener.local_addr().unwrap(); + + let server = listener.incoming() + .into_future() + .map_err(|(e, _)| e.into()) + .and_then(|(c, _)| Ping.upgrade(c.unwrap().0, ())) + .and_then(|(_, service)| service.map_err(|_| panic!())); + + let client = TcpStream::connect(&listener_addr, &core.handle()) + .map_err(|e| e.into()) + .and_then(|c| Ping.upgrade(c, ())) + .and_then(|(mut pinger, service)| { + let pings = (0 .. 20).map(move |_| { + pinger.ping().map_err(|_| ()) + }); + + join_all(pings).map(|_| ()).map_err(|_| panic!()) + .select(service).map(|_| ()).map_err(|_| panic!()) + }); + + core.run(server.select(client)).unwrap_or_else(|_| panic!()); + } +}