diff --git a/core/src/lib.rs b/core/src/lib.rs index 552d08ff..a164dc67 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -135,7 +135,7 @@ //! extern crate tokio_current_thread; //! //! use futures::Future; -//! use libp2p_ping::Ping; +//! use libp2p_ping::{Ping, PingOutput}; //! use libp2p_core::Transport; //! //! # fn main() { @@ -145,8 +145,14 @@ //! // TODO: right now the only available protocol is ping, but we want to replace it with //! // something that is more simple to use //! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) -//! .and_then(|((mut pinger, service), _)| { -//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) +//! .and_then(|(out, _)| { +//! match out { +//! PingOutput::Ponger(processing) => Box::new(processing) as Box>, +//! PingOutput::Pinger { mut pinger, processing } => { +//! let f = pinger.ping().map_err(|_| panic!()).select(processing).map(|_| ()).map_err(|(err, _)| err); +//! Box::new(f) as Box> +//! }, +//! } //! }); //! //! // Runs until the ping arrives. @@ -175,7 +181,7 @@ //! extern crate tokio_current_thread; //! //! use futures::Future; -//! use libp2p_ping::Ping; +//! use libp2p_ping::{Ping, PingOutput}; //! use libp2p_core::Transport; //! //! # fn main() { @@ -183,10 +189,14 @@ //! .with_dummy_muxing(); //! //! let (swarm_controller, swarm_future) = libp2p_core::swarm(transport.with_upgrade(Ping), -//! |(mut pinger, service), client_addr| { -//! pinger.ping().map_err(|_| panic!()) -//! .select(service).map_err(|_| panic!()) -//! .map(|_| ()) +//! |out, client_addr| { +//! match out { +//! PingOutput::Ponger(processing) => Box::new(processing) as Box>, +//! PingOutput::Pinger { mut pinger, processing } => { +//! let f = pinger.ping().map_err(|_| panic!()).select(processing).map(|_| ()).map_err(|(err, _)| err); +//! Box::new(f) as Box> +//! }, +//! } //! }); //! //! // The `swarm_controller` can then be used to do some operations. diff --git a/libp2p/examples/ping-client.rs b/libp2p/examples/ping-client.rs index 60c448ee..8ca38c91 100644 --- a/libp2p/examples/ping-client.rs +++ b/libp2p/examples/ping-client.rs @@ -78,15 +78,19 @@ fn main() { let mut tx = Some(tx); let (swarm_controller, swarm_future) = libp2p::core::swarm( transport.clone().with_upgrade(libp2p::ping::Ping), - |(mut pinger, future), _client_addr| { - let tx = tx.take(); - let ping = pinger.ping().map_err(|_| unreachable!()).inspect(move |_| { - println!("Received pong from the remote"); - if let Some(tx) = tx { - let _ = tx.send(()); - } - }); - ping.select(future).map(|_| ()).map_err(|(e, _)| e) + |out, _client_addr| { + if let libp2p::ping::PingOutput::Pinger { mut pinger, processing } = out { + let tx = tx.take(); + let ping = pinger.ping().map_err(|_| unreachable!()).inspect(move |_| { + println!("Received pong from the remote"); + if let Some(tx) = tx { + let _ = tx.send(()); + } + }); + ping.select(processing).map(|_| ()).map_err(|(e, _)| e) + } else { + unreachable!() + } }, ); diff --git a/ping/src/lib.rs b/ping/src/lib.rs index 2a03d1e3..2ca484ec 100644 --- a/ping/src/lib.rs +++ b/ping/src/lib.rs @@ -59,15 +59,21 @@ //! extern crate tokio_current_thread; //! //! use futures::Future; -//! use libp2p_ping::Ping; +//! use libp2p_ping::{Ping, PingOutput}; //! use libp2p_core::Transport; //! //! # fn main() { //! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new() //! .with_upgrade(Ping) //! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) -//! .and_then(|((mut pinger, service), _)| { -//! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) +//! .and_then(|(out, _)| { +//! match out { +//! PingOutput::Ponger(processing) => Box::new(processing) as Box>, +//! PingOutput::Pinger { mut pinger, processing } => { +//! let f = pinger.ping().map_err(|_| panic!()).select(processing).map(|_| ()).map_err(|(err, _)| err); +//! Box::new(f) as Box> +//! }, +//! } //! }); //! //! // Runs until the ping arrives. @@ -93,7 +99,7 @@ use futures::sync::{mpsc, oneshot}; use futures::{Future, Sink, Stream}; use libp2p_core::{ConnectionUpgrade, Endpoint}; use parking_lot::Mutex; -use rand::{prelude::*, rngs::EntropyRng, distributions::Standard}; +use rand::{distributions::Standard, prelude::*, rngs::EntropyRng}; use std::collections::HashMap; use std::error::Error; use std::io::Error as IoError; @@ -109,6 +115,18 @@ use tokio_io::{AsyncRead, AsyncWrite}; #[derive(Debug, Copy, Clone, Default)] pub struct Ping; +pub enum PingOutput { + /// We are on the dialer side. + Pinger { + /// Object to use in order to ping the remote. + pinger: Pinger, + /// Future that drives the processing of the pings. + processing: Box>, + }, + /// We are on the listening side. + Ponger(Box>), +} + impl ConnectionUpgrade for Ping where C: AsyncRead + AsyncWrite + 'static, @@ -121,7 +139,7 @@ where iter::once(("/ipfs/ping/1.0.0".into(), ())) } - type Output = (Pinger, Box>); + type Output = PingOutput; type MultiaddrFuture = Maf; type Future = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; @@ -130,86 +148,119 @@ where self, socket: C, _: Self::UpgradeIdentifier, - _: Endpoint, + endpoint: Endpoint, remote_addr: Maf, ) -> Self::Future { - // # How does it work? - // - // All the actual processing is performed by the *ponger*. - // We use a channel in order to send ping requests from the pinger to the ponger. - - let (tx, rx) = mpsc::channel(8); - // Ignore the errors if `tx` closed. `tx` is only ever closed if the ponger is closed, - // which means that the connection to the remote is closed. Therefore we make the `rx` - // never produce anything. - let rx = rx.then(|r| Ok(r.ok())).filter_map(|a| a); - - let pinger = Pinger { - send: tx, - rng: EntropyRng::default(), + let out = match endpoint { + Endpoint::Dialer => upgrade_as_dialer(socket), + Endpoint::Listener => upgrade_as_listener(socket), }; - // Hashmap that associates outgoing payloads to one-shot senders. - // TODO: can't figure out how to make it work without using an Arc/Mutex - let expected_pongs = Arc::new(Mutex::new(HashMap::with_capacity(4))); + Ok((out, remote_addr)).into_future() + } +} - let sink_stream = Framed::new(socket, Codec).map(|msg| Message::Received(msg.freeze())); - let (sink, stream) = sink_stream.split(); +/// Upgrades a connection from the dialer side. +fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutput { + // # How does it work? + // + // All the actual processing is performed by the *ponger*. + // We use a channel in order to send ping requests from the pinger to the ponger. - let future = loop_fn((sink, stream.select(rx)), move |(sink, stream)| { - let expected_pongs = expected_pongs.clone(); + let (tx, rx) = mpsc::channel(8); + // Ignore the errors if `tx` closed. + let rx = rx.then(|r| Ok(r.ok())).filter_map(|a| a); - stream - .into_future() - .map_err(|(err, _)| err) - .and_then(move |(message, stream)| { - let mut expected_pongs = expected_pongs.lock(); + let pinger = Pinger { + send: tx, + rng: EntropyRng::default(), + }; - if let Some(message) = message { - match message { - Message::Ping(payload, finished) => { - // Ping requested by the user through the `Pinger`. - debug!("Sending ping with payload {:?}", payload); + // Hashmap that associates outgoing payloads to one-shot senders. + // TODO: can't figure out how to make it work without using an Arc/Mutex + let expected_pongs = Arc::new(Mutex::new(HashMap::with_capacity(4))); - expected_pongs.insert(payload.clone(), finished); - Box::new( - sink.send(payload) - .map(|sink| Loop::Continue((sink, stream))), - ) + let sink_stream = Framed::new(socket, Codec).map(|msg| Message::Received(msg.freeze())); + let (sink, stream) = sink_stream.split(); + + let future = loop_fn((sink, stream.select(rx)), move |(sink, stream)| { + let expected_pongs = expected_pongs.clone(); + + stream + .into_future() + .map_err(|(err, _)| err) + .and_then(move |(message, stream)| { + let mut expected_pongs = expected_pongs.lock(); + + if let Some(message) = message { + match message { + Message::Ping(payload, finished) => { + // Ping requested by the user through the `Pinger`. + debug!("Sending ping with payload {:?}", payload); + + expected_pongs.insert(payload.clone(), finished); + Box::new( + sink.send(payload) + .map(|sink| Loop::Continue((sink, stream))), + ) as Box> + } + Message::Received(payload) => { + // Received a payload from the remote. + if let Some(fut) = expected_pongs.remove(&payload) { + // Payload was ours. Signalling future. + // Errors can happen if the user closed the receiving end of + // the future, which is fine to ignore. + debug!("Received pong (payload={:?}) ; ping fufilled", payload); + let _ = fut.send(()); + Box::new(Ok(Loop::Continue((sink, stream))).into_future()) + as Box> + } else { + // Payload was unexpected. Closing connection. + debug!("Received invalid payload ({:?}) ; closing", payload); + Box::new(Ok(Loop::Break(())).into_future()) as Box> } - Message::Received(payload) => { - // Received a payload from the remote. - if let Some(fut) = expected_pongs.remove(&payload) { - // Payload was ours. Signalling future. - // Errors can happen if the user closed the receiving end of - // the future, which is fine to ignore. - debug!("Received pong (payload={:?}) ; ping fufilled", payload); - let _ = fut.send(()); - Box::new(Ok(Loop::Continue((sink, stream))).into_future()) - as Box> - } else { - // Payload was not ours. Sending it back. - debug!("Received ping (payload={:?}) ; sending back", payload); - Box::new( - sink.send(payload) - .map(|sink| Loop::Continue((sink, stream))), - ) - as Box> - } - } } - } else { - Box::new(Ok(Loop::Break(())).into_future()) - as Box> } - }) - }); + } else { + Box::new(Ok(Loop::Break(())).into_future()) as Box> + } + }) + }); - Ok(((pinger, Box::new(future) as Box<_>), remote_addr)).into_future() + PingOutput::Pinger { + pinger, + processing: Box::new(future) as Box<_>, } } +/// Upgrades a connection from the listener side. +fn upgrade_as_listener(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutput { + let sink_stream = Framed::new(socket, Codec); + let (sink, stream) = sink_stream.split(); + + let future = loop_fn((sink, stream), move |(sink, stream)| { + stream + .into_future() + .map_err(|(err, _)| err) + .and_then(move |(payload, stream)| { + if let Some(payload) = payload { + // Received a payload from the remote. + debug!("Received ping (payload={:?}) ; sending back", payload); + Box::new( + sink.send(payload.freeze()) + .map(|sink| Loop::Continue((sink, stream))), + ) as Box> + } else { + // Connection was closed + Box::new(Ok(Loop::Break(())).into_future()) as Box> + } + }) + }); + + PingOutput::Ponger(Box::new(future) as Box<_>) +} + /// Controller for the ping service. Makes it possible to send pings to the remote. pub struct Pinger { send: mpsc::Sender, @@ -228,7 +279,8 @@ impl Pinger { debug!("Preparing for ping with payload {:?}", payload); // Ignore errors if the ponger has been already destroyed. The returned future will never // be signalled. - let fut = self.send + let fut = self + .send .clone() .send(Message::Ping(Bytes::from(payload.to_vec()), tx)) .from_err() @@ -291,13 +343,15 @@ mod tests { use self::tokio_tcp::TcpListener; use self::tokio_tcp::TcpStream; - use super::Ping; + use super::{Ping, PingOutput}; use futures::future::{self, join_all}; use futures::Future; use futures::Stream; use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr}; use std::io::Error as IoError; + // TODO: rewrite tests with the MemoryTransport + #[test] fn ping_pong() { let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); @@ -315,12 +369,9 @@ mod tests { future::ok::("/ip4/127.0.0.1/tcp/10000".parse().unwrap()), ) }) - .and_then(|((mut pinger, service), _)| { - pinger - .ping() - .map_err(|_| panic!()) - .select(service) - .map_err(|_| panic!()) + .and_then(|(out, _)| match out { + PingOutput::Ponger(service) => service, + _ => unreachable!(), }); let client = TcpStream::connect(&listener_addr) @@ -333,15 +384,20 @@ mod tests { future::ok::("/ip4/127.0.0.1/tcp/10000".parse().unwrap()), ) }) - .and_then(|((mut pinger, service), _)| { - pinger + .and_then(|(out, _)| match out { + PingOutput::Pinger { + mut pinger, + processing, + } => pinger .ping() .map_err(|_| panic!()) - .select(service) - .map_err(|_| panic!()) - }); + .select(processing) + .map_err(|_| panic!()), + _ => unreachable!(), + }) + .map(|_| ()); - tokio_current_thread::block_on_all(server.join(client)).unwrap(); + tokio_current_thread::block_on_all(server.select(client).map_err(|_| panic!())).unwrap(); } #[test] @@ -362,7 +418,10 @@ mod tests { future::ok::("/ip4/127.0.0.1/tcp/10000".parse().unwrap()), ) }) - .and_then(|((_, service), _)| service.map_err(|_| panic!())); + .and_then(|(out, _)| match out { + PingOutput::Ponger(service) => service, + _ => unreachable!(), + }); let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) @@ -374,15 +433,21 @@ mod tests { future::ok::("/ip4/127.0.0.1/tcp/10000".parse().unwrap()), ) }) - .and_then(|((mut pinger, service), _)| { - let pings = (0..20).map(move |_| pinger.ping().map_err(|_| ())); + .and_then(|(out, _)| match out { + PingOutput::Pinger { + mut pinger, + processing, + } => { + let pings = (0..20).map(move |_| pinger.ping().map_err(|_| ())); - join_all(pings) - .map(|_| ()) - .map_err(|_| panic!()) - .select(service) - .map(|_| ()) - .map_err(|_| panic!()) + join_all(pings) + .map(|_| ()) + .map_err(|_| panic!()) + .select(processing) + .map(|_| ()) + .map_err(|_| panic!()) + } + _ => unreachable!(), }); tokio_current_thread::block_on_all(server.select(client)).unwrap_or_else(|_| panic!());