From 8c4945ffb5f633ed77bd3da89af6426bb78a84ad Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 25 Jun 2018 15:12:39 +0200 Subject: [PATCH] Add a TransportTimeout wrapper (#234) * Add a TransportTimeout wrapper * Add outgoing/ingoing only timeouts --- Cargo.toml | 1 + libp2p/Cargo.toml | 1 + libp2p/src/lib.rs | 2 + transport-timeout/Cargo.toml | 10 ++ transport-timeout/src/lib.rs | 230 +++++++++++++++++++++++++++++++++++ 5 files changed, 244 insertions(+) create mode 100644 transport-timeout/Cargo.toml create mode 100644 transport-timeout/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 7e509df2..70f2ae76 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "secio", "core", "tcp-transport", + "transport-timeout", "varint-rs", "websocket", ] diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 544ac6b1..aa003158 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -20,6 +20,7 @@ libp2p-ping = { path = "../ping" } libp2p-ratelimit = { path = "../ratelimit" } libp2p-relay = { path = "../relay" } libp2p-core = { path = "../core" } +libp2p-transport-timeout = { path = "../transport-timeout" } libp2p-websocket = { path = "../websocket" } tokio-codec = "0.1" tokio-io = "0.1" diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 6db48604..9baf3d72 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -41,6 +41,7 @@ pub extern crate libp2p_relay as relay; pub extern crate libp2p_secio as secio; #[cfg(not(target_os = "emscripten"))] pub extern crate libp2p_tcp_transport as tcp; +pub extern crate libp2p_transport_timeout as transport_timeout; pub extern crate libp2p_websocket as websocket; pub mod simple; @@ -48,6 +49,7 @@ pub mod simple; pub use self::core::{Transport, ConnectionUpgrade, PeerId, swarm}; pub use self::multiaddr::Multiaddr; pub use self::simple::SimpleProtocol; +pub use self::transport_timeout::TransportTimeout; /// Implementation of `Transport` that supports the most common protocols. /// diff --git a/transport-timeout/Cargo.toml b/transport-timeout/Cargo.toml new file mode 100644 index 00000000..bfd9d82c --- /dev/null +++ b/transport-timeout/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "libp2p-transport-timeout" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +futures = "0.1" +libp2p-core = { path = "../core" } +log = "0.4.1" +tokio-timer = "0.2.0" diff --git a/transport-timeout/src/lib.rs b/transport-timeout/src/lib.rs new file mode 100644 index 00000000..7f4b74a0 --- /dev/null +++ b/transport-timeout/src/lib.rs @@ -0,0 +1,230 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Wraps around a `Transport` and adds a timeout to all the incoming and outgoing connections. +//! +//! The timeout includes the upgrading process. +// TODO: add example + +#[macro_use] +extern crate futures; +extern crate libp2p_core; +#[macro_use] +extern crate log; +extern crate tokio_timer; + +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::time::{Duration, Instant}; +use futures::{Future, Poll, Async, Stream}; +use libp2p_core::{Transport, Multiaddr, MuxedTransport}; +use tokio_timer::{Deadline, DeadlineError}; + +/// Wraps around a `Transport` and adds a timeout to all the incoming and outgoing connections. +/// +/// The timeout includes the upgrade. There is no timeout on the listener or on stream of incoming +/// substreams. +#[derive(Debug, Copy, Clone)] +pub struct TransportTimeout { + inner: InnerTrans, + outgoing_timeout: Duration, + incoming_timeout: Duration, +} + +impl TransportTimeout { + /// Wraps around a `Transport` to add timeouts to all the sockets created by it. + #[inline] + pub fn new(trans: InnerTrans, timeout: Duration) -> Self { + TransportTimeout { + inner: trans, + outgoing_timeout: timeout, + incoming_timeout: timeout, + } + } + + /// Wraps around a `Transport` to add timeouts to the outgoing connections. + #[inline] + pub fn with_outgoing_timeout(trans: InnerTrans, timeout: Duration) -> Self { + TransportTimeout { + inner: trans, + outgoing_timeout: timeout, + incoming_timeout: Duration::from_secs(100 * 365 * 24 * 3600), // 100 years + } + } + + /// Wraps around a `Transport` to add timeouts to the ingoing connections. + #[inline] + pub fn with_ingoing_timeout(trans: InnerTrans, timeout: Duration) -> Self { + TransportTimeout { + inner: trans, + outgoing_timeout: Duration::from_secs(100 * 365 * 24 * 3600), // 100 years + incoming_timeout: timeout, + } + } +} + +impl Transport for TransportTimeout +where InnerTrans: Transport, +{ + type Output = InnerTrans::Output; + type MultiaddrFuture = InnerTrans::MultiaddrFuture; + type Listener = TimeoutListener; + type ListenerUpgrade = TokioTimerMapErr>; + type Dial = TokioTimerMapErr>; + + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + match self.inner.listen_on(addr) { + Ok((listener, addr)) => { + let listener = TimeoutListener { + inner: listener, + timeout: self.incoming_timeout, + }; + + Ok((listener, addr)) + }, + Err((inner, addr)) => { + let transport = TransportTimeout { + inner, + outgoing_timeout: self.outgoing_timeout, + incoming_timeout: self.incoming_timeout, + }; + + Err((transport, addr)) + } + } + } + + fn dial(self, addr: Multiaddr) -> Result { + match self.inner.dial(addr) { + Ok(dial) => { + Ok(TokioTimerMapErr { + inner: Deadline::new(dial, Instant::now() + self.outgoing_timeout) + }) + }, + Err((inner, addr)) => { + let transport = TransportTimeout { + inner, + outgoing_timeout: self.outgoing_timeout, + incoming_timeout: self.incoming_timeout, + }; + + Err((transport, addr)) + } + } + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.nat_traversal(server, observed) + } +} + +impl MuxedTransport for TransportTimeout +where InnerTrans: MuxedTransport +{ + type Incoming = TimeoutIncoming; + type IncomingUpgrade = TokioTimerMapErr>; + + #[inline] + fn next_incoming(self) -> Self::Incoming { + TimeoutIncoming { + inner: self.inner.next_incoming(), + timeout: self.incoming_timeout, + } + } +} + +// TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable +// in Rust (https://github.com/rust-lang/rust/issues/34511) +pub struct TimeoutListener { + inner: InnerStream, + timeout: Duration, +} + +impl Stream for TimeoutListener +where InnerStream: Stream, +{ + type Item = TokioTimerMapErr>; + type Error = InnerStream::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let inner_fut = try_ready!(self.inner.poll()); + if let Some(inner_fut) = inner_fut { + let fut = TokioTimerMapErr { + inner: Deadline::new(inner_fut, Instant::now() + self.timeout) + }; + Ok(Async::Ready(Some(fut))) + } else { + Ok(Async::Ready(None)) + } + } +} + +// TODO: can be removed and replaced with an `impl Future` once impl Trait is fully stable +// in Rust (https://github.com/rust-lang/rust/issues/34511) +pub struct TimeoutIncoming { + inner: InnerFut, + timeout: Duration, +} + +impl Future for TimeoutIncoming +where InnerFut: Future, +{ + type Item = TokioTimerMapErr>; + type Error = InnerFut::Error; + + fn poll(&mut self) -> Poll { + let inner_fut = try_ready!(self.inner.poll()); + let fut = TokioTimerMapErr { + inner: Deadline::new(inner_fut, Instant::now() + self.timeout) + }; + Ok(Async::Ready(fut)) + } +} + +/// Wraps around a `Future`. Turns the error type from `DeadlineError` to `IoError`. +// TODO: can be replaced with `impl Future` once `impl Trait` are fully stable in Rust +// (https://github.com/rust-lang/rust/issues/34511) +pub struct TokioTimerMapErr { + inner: InnerFut, +} + +impl Future for TokioTimerMapErr +where InnerFut: Future> +{ + type Item = InnerFut::Item; + type Error = IoError; + + fn poll(&mut self) -> Poll { + self.inner.poll() + .map_err(|err: DeadlineError| { + if err.is_inner() { + err.into_inner().expect("ensured by is_inner()") + } else if err.is_elapsed() { + debug!("timeout elapsed for connection"); + IoErrorKind::TimedOut.into() + } else { + assert!(err.is_timer()); + debug!("tokio timer error in timeout wrapper"); + let err = err.into_timer().expect("ensure by is_timer()"); + IoError::new(IoErrorKind::Other, err) + } + }) + } +}