diff --git a/core/src/lib.rs b/core/src/lib.rs index 8bc09c7d..9ce17dac 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -131,22 +131,23 @@ //! extern crate libp2p_tcp_transport; //! extern crate tokio_current_thread; //! -//! use futures::Future; +//! use futures::{Future, Stream}; //! use libp2p_ping::{Ping, PingOutput}; //! use libp2p_core::Transport; //! //! # fn main() { //! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new() //! // We have a `TcpConfig` struct that implements `Transport`, and apply a `Ping` upgrade on it. -//! .with_upgrade(Ping) +//! .with_upgrade(Ping::default()) //! // TODO: right now the only available protocol is ping, but we want to replace it with //! // something that is more simple to use //! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) //! .and_then(|(out, _)| { //! match out { //! PingOutput::Ponger(processing) => Box::new(processing) as Box>, -//! PingOutput::Pinger { mut pinger, processing } => { -//! let f = pinger.ping().map_err(|_| panic!()).select(processing).map(|_| ()).map_err(|(err, _)| err); +//! PingOutput::Pinger(mut pinger) => { +//! pinger.ping(()); +//! let f = pinger.into_future().map(|_| ()).map_err(|(err, _)| err); //! Box::new(f) as Box> //! }, //! } @@ -185,12 +186,13 @@ //! let transport = libp2p_tcp_transport::TcpConfig::new() //! .with_dummy_muxing(); //! -//! let (swarm_controller, swarm_future) = libp2p_core::swarm(transport.with_upgrade(Ping), +//! let (swarm_controller, swarm_future) = libp2p_core::swarm(transport.with_upgrade(Ping::default()), //! |out, client_addr| { //! match out { //! PingOutput::Ponger(processing) => Box::new(processing) as Box>, -//! PingOutput::Pinger { mut pinger, processing } => { -//! let f = pinger.ping().map_err(|_| panic!()).select(processing).map(|_| ()).map_err(|(err, _)| err); +//! PingOutput::Pinger(mut pinger) => { +//! pinger.ping(()); +//! let f = pinger.into_future().map(|_| ()).map_err(|(err, _)| err); //! Box::new(f) as Box> //! }, //! } diff --git a/examples/ping-client.rs b/examples/ping-client.rs index 6317811c..5ebe5c33 100644 --- a/examples/ping-client.rs +++ b/examples/ping-client.rs @@ -79,17 +79,21 @@ fn main() { let (tx, rx) = oneshot::channel(); let mut tx = Some(tx); let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(libp2p::ping::Ping), + transport.clone().with_upgrade(libp2p::ping::Ping::default()), |out, _client_addr| { - if let libp2p::ping::PingOutput::Pinger { mut pinger, processing } = out { + if let libp2p::ping::PingOutput::Pinger(mut pinger) = out { let tx = tx.take(); - let ping = pinger.ping().map_err(|_| unreachable!()).inspect(move |_| { - println!("Received pong from the remote"); - if let Some(tx) = tx { - let _ = tx.send(()); - } - }); - ping.select(processing).map(|_| ()).map_err(|(e, _)| e) + pinger.ping(()); + pinger + .into_future() + .map(move |_| { + println!("Received pong from the remote"); + if let Some(tx) = tx { + let _ = tx.send(()); + } + () + }) + .map_err(|(e, _)| e) } else { unreachable!() } @@ -99,7 +103,7 @@ fn main() { // We now use the controller to dial to the address. swarm_controller .dial(target_addr.parse().expect("invalid multiaddr"), - transport.with_upgrade(libp2p::ping::Ping)) + transport.with_upgrade(libp2p::ping::Ping::default())) // If the multiaddr protocol exists but is not supported, then we get an error containing // the original multiaddress. .expect("unsupported multiaddr"); diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index a9cec286..e3f9f3c2 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -55,19 +55,20 @@ //! extern crate libp2p_tcp_transport; //! extern crate tokio_current_thread; //! -//! use futures::Future; +//! use futures::{Future, Stream}; //! use libp2p_ping::{Ping, PingOutput}; //! use libp2p_core::Transport; //! //! # fn main() { //! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new() -//! .with_upgrade(Ping) +//! .with_upgrade(Ping::default()) //! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) //! .and_then(|(out, _)| { //! match out { //! PingOutput::Ponger(processing) => Box::new(processing) as Box + Send>, -//! PingOutput::Pinger { mut pinger, processing } => { -//! let f = pinger.ping().map_err(|_| panic!()).select(processing).map(|_| ()).map_err(|(err, _)| err); +//! PingOutput::Pinger(mut pinger) => { +//! pinger.ping(()); +//! let f = pinger.into_future().map(|_| ()).map_err(|(err, _)| err); //! Box::new(f) as Box + Send> //! }, //! } @@ -91,17 +92,12 @@ extern crate tokio_codec; extern crate tokio_io; use bytes::{BufMut, Bytes, BytesMut}; -use futures::future::{loop_fn, FutureResult, IntoFuture, Loop}; -use futures::sync::{mpsc, oneshot}; -use futures::{Future, Sink, Stream}; +use futures::{prelude::*, future::{FutureResult, IntoFuture}, task}; use libp2p_core::{ConnectionUpgrade, Endpoint}; -use parking_lot::Mutex; use rand::{distributions::Standard, prelude::*, rngs::EntropyRng}; -use std::collections::HashMap; -use std::error::Error; +use std::collections::VecDeque; use std::io::Error as IoError; -use std::iter; -use std::sync::Arc; +use std::{iter, marker::PhantomData, mem}; use tokio_codec::{Decoder, Encoder, Framed}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -110,23 +106,19 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// According to the design of libp2p, this struct would normally contain the configuration options /// for the protocol, but in the case of `Ping` no configuration is required. #[derive(Debug, Copy, Clone, Default)] -pub struct Ping; +pub struct Ping(PhantomData); -pub enum PingOutput { - /// We are on the dialer side. - Pinger { - /// Object to use in order to ping the remote. - pinger: Pinger, - /// Future that drives the processing of the pings. - processing: Box + Send>, - }, +/// Output of a `Ping` upgrade. +pub enum PingOutput { + /// We are on the dialing side. + Pinger(PingDialer), /// We are on the listening side. - Ponger(Box + Send>), + Ponger(PingListener), } -impl ConnectionUpgrade for Ping +impl ConnectionUpgrade for Ping where - C: AsyncRead + AsyncWrite + Send + 'static, + TSocket: AsyncRead + AsyncWrite, { type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; type UpgradeIdentifier = (); @@ -136,14 +128,14 @@ where iter::once(("/ipfs/ping/1.0.0".into(), ())) } - type Output = PingOutput; + type Output = PingOutput; type MultiaddrFuture = Maf; type Future = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; #[inline] fn upgrade( self, - socket: C, + socket: TSocket, _: Self::UpgradeIdentifier, endpoint: Endpoint, remote_addr: Maf, @@ -158,146 +150,205 @@ where } /// Upgrades a connection from the dialer side. -fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + Send + 'static) -> PingOutput { - // # How does it work? - // - // All the actual processing is performed by the *ponger*. - // We use a channel in order to send ping requests from the pinger to the ponger. - - let (tx, rx) = mpsc::channel(8); - // Ignore the errors if `tx` closed. - let rx = rx.then(|r| Ok(r.ok())).filter_map(|a| a); - - let pinger = Pinger { - send: tx, +fn upgrade_as_dialer(socket: TSocket) -> PingOutput +where TSocket: AsyncRead + AsyncWrite, +{ + let dialer = PingDialer { + inner: Framed::new(socket, Codec), + need_writer_flush: false, + needs_close: false, + sent_pings: VecDeque::with_capacity(4), rng: EntropyRng::default(), + pings_to_send: VecDeque::with_capacity(4), + to_notify: None, }; - // Hashmap that associates outgoing payloads to one-shot senders. - // TODO: can't figure out how to make it work without using an Arc/Mutex - let expected_pongs = Arc::new(Mutex::new(HashMap::with_capacity(4))); - - let sink_stream = Framed::new(socket, Codec).map(|msg| Message::Received(msg.freeze())); - let (sink, stream) = sink_stream.split(); - - let future = loop_fn((sink, stream.select(rx)), move |(sink, stream)| { - let expected_pongs = expected_pongs.clone(); - - stream - .into_future() - .map_err(|(err, _)| err) - .and_then(move |(message, stream)| { - let mut expected_pongs = expected_pongs.lock(); - - if let Some(message) = message { - match message { - Message::Ping(payload, finished) => { - // Ping requested by the user through the `Pinger`. - debug!("Sending ping with payload {:?}", payload); - - expected_pongs.insert(payload.clone(), finished); - Box::new( - sink.send(payload) - .map(|sink| Loop::Continue((sink, stream))), - ) as Box + Send> - } - Message::Received(payload) => { - // Received a payload from the remote. - if let Some(fut) = expected_pongs.remove(&payload) { - // Payload was ours. Signalling future. - // Errors can happen if the user closed the receiving end of - // the future, which is fine to ignore. - debug!("Received pong (payload={:?}) ; ping fufilled", payload); - let _ = fut.send(()); - Box::new(Ok(Loop::Continue((sink, stream))).into_future()) - as Box + Send> - } else { - // Payload was unexpected. Closing connection. - debug!("Received invalid payload ({:?}) ; closing", payload); - Box::new(Ok(Loop::Break(())).into_future()) - as Box + Send> - } - } - } - } else { - Box::new(Ok(Loop::Break(())).into_future()) as Box + Send> - } - }) - }); - - PingOutput::Pinger { - pinger, - processing: Box::new(future) as Box<_>, - } + PingOutput::Pinger(dialer) } /// Upgrades a connection from the listener side. -fn upgrade_as_listener(socket: impl AsyncRead + AsyncWrite + Send + 'static) -> PingOutput { - let sink_stream = Framed::new(socket, Codec); - let (sink, stream) = sink_stream.split(); +fn upgrade_as_listener(socket: TSocket) -> PingOutput +where TSocket: AsyncRead + AsyncWrite, +{ + let listener = PingListener { + inner: Framed::new(socket, Codec), + state: PingListenerState::Listening, + }; - let future = loop_fn((sink, stream), move |(sink, stream)| { - stream - .into_future() - .map_err(|(err, _)| err) - .and_then(move |(payload, stream)| { - if let Some(payload) = payload { - // Received a payload from the remote. - debug!("Received ping (payload={:?}) ; sending back", payload); - Box::new( - sink.send(payload.freeze()) - .map(|sink| Loop::Continue((sink, stream))), - ) as Box + Send> - } else { - // Connection was closed - Box::new(Ok(Loop::Break(())).into_future()) as Box + Send> - } - }) - }); - - PingOutput::Ponger(Box::new(future) as Box<_>) + PingOutput::Ponger(listener) } -/// Controller for the ping service. Makes it possible to send pings to the remote. -pub struct Pinger { - send: mpsc::Sender, +/// Sends pings and receives the pongs. +/// +/// Implements `Stream`. The stream indicates when we receive a pong. +pub struct PingDialer { + /// The underlying socket. + inner: Framed, + /// If true, need to flush the sink. + need_writer_flush: bool, + /// If true, need to close the sink. + needs_close: bool, + /// List of pings that have been sent to the remote and that are waiting for an answer. + sent_pings: VecDeque<(Bytes, TUserData)>, + /// Random number generator for the ping payload. rng: EntropyRng, + /// List of pings to send to the remote. + pings_to_send: VecDeque<(Bytes, TUserData)>, + /// Task to notify when we add an element to `pings_to_send`. + to_notify: Option, } -impl Pinger { - /// Sends a ping. Returns a future that is signaled when a pong is received. +impl PingDialer { + /// Sends a ping to the remote. /// - /// **Note**: Please be aware that there is no timeout on the ping. You should handle the - /// timeout yourself when you call this function. - pub fn ping(&mut self) -> Box> + Send> { - let (tx, rx) = oneshot::channel(); - + /// The stream will produce an event containing the user data when we receive the pong. + pub fn ping(&mut self, user_data: TUserData) { let payload: [u8; 32] = self.rng.sample(Standard); debug!("Preparing for ping with payload {:?}", payload); - // Ignore errors if the ponger has been already destroyed. The returned future will never - // be signalled. - let fut = self - .send - .clone() - .send(Message::Ping(Bytes::from(payload.to_vec()), tx)) - .from_err() - .and_then(|_| rx.from_err()); - Box::new(fut) as Box<_> - } -} - -impl Clone for Pinger { - fn clone(&self) -> Pinger { - Pinger { - send: self.send.clone(), - rng: EntropyRng::default(), + self.pings_to_send.push_back((Bytes::from(payload.to_vec()), user_data)); + if let Some(to_notify) = self.to_notify.take() { + to_notify.notify(); } } } -enum Message { - Ping(Bytes, oneshot::Sender<()>), - Received(Bytes), +impl Stream for PingDialer +where TSocket: AsyncRead + AsyncWrite, +{ + type Item = TUserData; + type Error = IoError; + + fn poll(&mut self) -> Poll, Self::Error> { + self.to_notify = Some(task::current()); + + while let Some((ping, user_data)) = self.pings_to_send.pop_front() { + match self.inner.start_send(ping.clone()) { + Ok(AsyncSink::Ready) => self.need_writer_flush = true, + Ok(AsyncSink::NotReady(_)) => { + self.pings_to_send.push_front((ping, user_data)); + break; + }, + Err(err) => return Err(err), + } + + self.sent_pings.push_back((ping, user_data)); + } + + if self.need_writer_flush { + match self.inner.poll_complete() { + Ok(Async::Ready(())) => self.need_writer_flush = false, + Ok(Async::NotReady) => (), + Err(err) => return Err(err), + } + } + + loop { + match self.inner.poll() { + Ok(Async::Ready(Some(pong))) => { + if let Some(pos) = self.sent_pings.iter().position(|&(ref p, _)| p == &pong) { + let (_, user_data) = self.sent_pings.remove(pos) + .expect("Grabbed a valid position just above"); + return Ok(Async::Ready(Some(user_data))); + } else { + debug!("Received pong that doesn't match what we went: {:?}", pong); + } + }, + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) => { + // Notify the current task so that we poll again. + self.needs_close = true; + task::current().notify(); + return Ok(Async::NotReady); + } + Err(err) => return Err(err), + } + } + + Ok(Async::NotReady) + } +} + +/// Listens to incoming pings and answers them. +/// +/// Implements `Future`. The future terminates when the underlying socket closes. +pub struct PingListener { + /// The underlying socket. + inner: Framed, + /// State of the listener. + state: PingListenerState, +} + +#[derive(Debug)] +enum PingListenerState { + /// We are waiting for the next ping on the socket. + Listening, + /// We are trying to send a pong. + Sending(Bytes), + /// We are flusing the underlying sink. + Flushing, + /// We are shutting down everything. + Closing, + /// A panic happened during the processing. + Poisoned, +} + +impl Future for PingListener +where TSocket: AsyncRead + AsyncWrite +{ + type Item = (); + type Error = IoError; + + fn poll(&mut self) -> Poll { + loop { + match mem::replace(&mut self.state, PingListenerState::Poisoned) { + PingListenerState::Listening => { + match self.inner.poll() { + Ok(Async::Ready(Some(payload))) => { + debug!("Received ping (payload={:?}) ; sending back", payload); + self.state = PingListenerState::Sending(payload.freeze()) + }, + Ok(Async::Ready(None)) => self.state = PingListenerState::Closing, + Ok(Async::NotReady) => { + self.state = PingListenerState::Listening; + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + PingListenerState::Sending(data) => { + match self.inner.start_send(data) { + Ok(AsyncSink::Ready) => self.state = PingListenerState::Flushing, + Ok(AsyncSink::NotReady(data)) => { + self.state = PingListenerState::Sending(data); + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + PingListenerState::Flushing => { + match self.inner.poll_complete() { + Ok(Async::Ready(())) => self.state = PingListenerState::Listening, + Ok(Async::NotReady) => { + self.state = PingListenerState::Flushing; + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + PingListenerState::Closing => { + match self.inner.close() { + Ok(Async::Ready(())) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => { + self.state = PingListenerState::Closing; + return Ok(Async::NotReady); + }, + Err(err) => return Err(err), + } + }, + PingListenerState::Poisoned => panic!("Poisoned or errored PingListener"), + } + } + } } // Implementation of the `Codec` trait of tokio-io. Splits frames into groups of 32 bytes. @@ -324,7 +375,7 @@ impl Encoder for Codec { #[inline] fn encode(&mut self, mut data: Bytes, buf: &mut BytesMut) -> Result<(), IoError> { - if data.len() != 0 { + if !data.is_empty() { let split = 32 * (1 + ((data.len() - 1) / 32)); buf.reserve(split); buf.put(data.split_to(split)); @@ -341,9 +392,7 @@ mod tests { use self::tokio_tcp::TcpListener; use self::tokio_tcp::TcpStream; use super::{Ping, PingOutput}; - use futures::future::{self, join_all}; - use futures::Future; - use futures::Stream; + use futures::{future, Future, Stream}; use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr}; use std::io::Error as IoError; @@ -359,7 +408,7 @@ mod tests { .into_future() .map_err(|(e, _)| e.into()) .and_then(|(c, _)| { - Ping.upgrade( + Ping::<()>::default().upgrade( c.unwrap(), (), Endpoint::Listener, @@ -374,7 +423,7 @@ mod tests { let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) .and_then(|c| { - Ping.upgrade( + Ping::<()>::default().upgrade( c, (), Endpoint::Dialer, @@ -382,14 +431,10 @@ mod tests { ) }) .and_then(|(out, _)| match out { - PingOutput::Pinger { - mut pinger, - processing, - } => pinger - .ping() - .map_err(|_| panic!()) - .select(processing) - .map_err(|_| panic!()), + PingOutput::Pinger(mut pinger) => { + pinger.ping(()); + pinger.into_future().map(|_| ()).map_err(|_| panic!()) + }, _ => unreachable!(), }) .map(|_| ()); @@ -408,7 +453,7 @@ mod tests { .into_future() .map_err(|(e, _)| e.into()) .and_then(|(c, _)| { - Ping.upgrade( + Ping::::default().upgrade( c.unwrap(), (), Endpoint::Listener, @@ -423,7 +468,7 @@ mod tests { let client = TcpStream::connect(&listener_addr) .map_err(|e| e.into()) .and_then(|c| { - Ping.upgrade( + Ping::::default().upgrade( c, (), Endpoint::Dialer, @@ -431,19 +476,17 @@ mod tests { ) }) .and_then(|(out, _)| match out { - PingOutput::Pinger { - mut pinger, - processing, - } => { - let pings = (0..20).map(move |_| pinger.ping().map_err(|_| ())); + PingOutput::Pinger(mut pinger) => { + for n in 0..20 { + pinger.ping(n); + } - join_all(pings) - .map(|_| ()) + pinger + .take(20) + .collect() + .map(|val| { assert_eq!(val, (0..20).collect::>()); }) .map_err(|_| panic!()) - .select(processing) - .map(|_| ()) - .map_err(|_| panic!()) - } + }, _ => unreachable!(), });