diff --git a/Cargo.toml b/Cargo.toml index 812d12c8..c6617bf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ libp2p-kad = { version = "0.12.0", path = "protocols/kad" } libp2p-floodsub = { version = "0.12.0", path = "protocols/floodsub" } libp2p-ping = { version = "0.12.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.12.0", path = "protocols/plaintext" } -libp2p-ratelimit = { version = "0.12.0", path = "transports/ratelimit" } libp2p-core = { version = "0.12.0", path = "core" } libp2p-core-derive = { version = "0.12.0", path = "misc/core-derive" } libp2p-secio = { version = "0.12.0", path = "protocols/secio", default-features = false } @@ -75,7 +74,6 @@ members = [ "protocols/secio", "swarm", "transports/dns", - "transports/ratelimit", "transports/tcp", "transports/uds", "transports/websocket", diff --git a/src/lib.rs b/src/lib.rs index 62496cfb..c6795abb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,8 +188,6 @@ pub use libp2p_ping as ping; #[doc(inline)] pub use libp2p_plaintext as plaintext; #[doc(inline)] -pub use libp2p_ratelimit as ratelimit; -#[doc(inline)] pub use libp2p_secio as secio; #[doc(inline)] pub use libp2p_swarm as swarm; diff --git a/src/transport_ext.rs b/src/transport_ext.rs index ca4cc5b2..ab35b31e 100644 --- a/src/transport_ext.rs +++ b/src/transport_ext.rs @@ -20,43 +20,12 @@ //! Provides the `TransportExt` trait. -use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, ratelimit::RateLimited, Transport}; -use std::{io, sync::Arc, time::Duration}; -use tokio_executor::DefaultExecutor; +use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, Transport}; +use std::{sync::Arc, time::Duration}; /// Trait automatically implemented on all objects that implement `Transport`. Provides some /// additional utilities. -/// -/// # Example -/// -/// ``` -/// use libp2p::TransportExt; -/// use libp2p::tcp::TcpConfig; -/// use std::time::Duration; -/// -/// let _transport = TcpConfig::new() -/// .with_rate_limit(1024 * 1024, 1024 * 1024); -/// ``` -/// pub trait TransportExt: Transport { - /// Adds a maximum transfer rate to the sockets created with the transport. - #[inline] - fn with_rate_limit( - self, - max_read_bytes_per_sec: usize, - max_write_bytes_per_sec: usize, - ) -> io::Result> - where - Self: Sized, - { - RateLimited::new( - &mut DefaultExecutor::current(), - self, - max_read_bytes_per_sec, - max_write_bytes_per_sec, - ) - } - /// Adds a layer on the `Transport` that logs all trafic that passes through the sockets /// created by it. /// diff --git a/transports/ratelimit/Cargo.toml b/transports/ratelimit/Cargo.toml deleted file mode 100644 index 2fae88fa..00000000 --- a/transports/ratelimit/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "libp2p-ratelimit" -edition = "2018" -description = "Transfer rate limiting transport adapter for libp2p" -version = "0.12.0" -authors = ["Parity Technologies "] -license = "MIT" -repository = "https://github.com/libp2p/rust-libp2p" -keywords = ["peer-to-peer", "libp2p", "networking"] -categories = ["network-programming", "asynchronous"] - -[dependencies] -aio-limited = "0.1" -bytes = "0.4" -futures = "0.1" -libp2p-core = { version = "0.12.0", path = "../../core" } -log = "0.4" -tokio-executor = "0.1" -tokio-io = "0.1" diff --git a/transports/ratelimit/src/lib.rs b/transports/ratelimit/src/lib.rs deleted file mode 100644 index aecaca77..00000000 --- a/transports/ratelimit/src/lib.rs +++ /dev/null @@ -1,242 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use aio_limited::{Limited, Limiter}; -use futures::prelude::*; -use futures::try_ready; -use libp2p_core::{Multiaddr, Transport, transport::{ListenerEvent, TransportError}}; -use log::error; -use std::{error, fmt, io}; -use tokio_executor::Executor; -use tokio_io::{AsyncRead, AsyncWrite, io::{ReadHalf, WriteHalf}}; - -#[derive(Clone)] -pub struct RateLimited { - value: T, - rlimiter: Limiter, - wlimiter: Limiter, -} - -impl RateLimited { - pub fn new( - e: &mut E, - value: T, - max_read: usize, - max_write: usize, - ) -> io::Result> { - Ok(RateLimited { - value, - rlimiter: Limiter::new(e, max_read).map_err(|e| { - error!("failed to create read limiter: {}", e); - io::Error::new(io::ErrorKind::Other, e) - })?, - wlimiter: Limiter::new(e, max_write).map_err(|e| { - error!("failed to create write limiter: {}", e); - io::Error::new(io::ErrorKind::Other, e) - })?, - }) - } - - fn from_parts(value: T, r: Limiter, w: Limiter) -> RateLimited { - RateLimited { - value, - rlimiter: r, - wlimiter: w, - } - } -} - -/// Error that can be generated by the rate limiter. -#[derive(Debug)] -pub enum RateLimitedErr { - /// Error in the underlying transport layer. - Underlying(TErr), - /// Error while creating a rate limiter. - LimiterError(io::Error), -} - -impl fmt::Display for RateLimitedErr -where TErr: fmt::Display -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RateLimitedErr::LimiterError(err) => write!(f, "Limiter initialization error: {}", err), - RateLimitedErr::Underlying(err) => write!(f, "{}", err), - } - } -} - -impl error::Error for RateLimitedErr -where TErr: error::Error + 'static -{ - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - RateLimitedErr::LimiterError(err) => Some(err), - RateLimitedErr::Underlying(err) => Some(err), - } - } -} - -/// A rate-limited connection. -pub struct Connection { - reader: Limited>, - writer: Limited>, -} - -impl Connection { - pub fn new(c: C, rlimiter: Limiter, wlimiter: Limiter) -> io::Result> { - let (r, w) = c.split(); - Ok(Connection { - reader: Limited::new(r, rlimiter).map_err(|e| { - error!("failed to create limited reader: {}", e); - io::Error::new(io::ErrorKind::Other, e) - })?, - writer: Limited::new(w, wlimiter).map_err(|e| { - error!("failed to create limited writer: {}", e); - io::Error::new(io::ErrorKind::Other, e) - })?, - }) - } -} - -impl io::Read for Connection { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.reader.read(buf) - } -} - -impl io::Write for Connection { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.writer.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.writer.flush() - } -} - -impl AsyncRead for Connection { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.reader.prepare_uninitialized_buffer(buf) - } - - fn read_buf(&mut self, buf: &mut B) -> Poll { - self.reader.read_buf(buf) - } -} - -impl AsyncWrite for Connection { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.writer.shutdown() - } -} - -pub struct Listener(RateLimited); - -impl Stream for Listener { - type Item = ListenerEvent>; - type Error = RateLimitedErr; - - fn poll(&mut self) -> Poll, Self::Error> { - match try_ready!(self.0.value.poll().map_err(RateLimitedErr::Underlying)) { - Some(event) => { - let event = event.map(|upgrade| { - let r = self.0.rlimiter.clone(); - let w = self.0.wlimiter.clone(); - ListenerUpgrade(RateLimited::from_parts(upgrade, r, w)) - }); - Ok(Async::Ready(Some(event))) - } - None => Ok(Async::Ready(None)), - } - } -} - -#[must_use = "futures do nothing unless polled"] -pub struct ListenerUpgrade(RateLimited); - -impl Future for ListenerUpgrade -where - T: Transport, - T::Output: AsyncRead + AsyncWrite -{ - type Item = Connection; - type Error = RateLimitedErr; - - fn poll(&mut self) -> Poll { - let conn = try_ready!(self.0.value.poll().map_err(RateLimitedErr::Underlying)); - let r = self.0.rlimiter.clone(); - let w = self.0.wlimiter.clone(); - Ok(Async::Ready(Connection::new(conn, r, w).map_err(RateLimitedErr::LimiterError)?)) - } -} - -impl Transport for RateLimited -where - T: Transport, - T::Output: AsyncRead + AsyncWrite, - T::Error: 'static, -{ - type Output = Connection; - type Error = RateLimitedErr; - type Listener = Listener; - type ListenerUpgrade = ListenerUpgrade; - type Dial = DialFuture; - - fn listen_on(self, addr: Multiaddr) -> Result> { - let r = self.rlimiter; - let w = self.wlimiter; - self.value - .listen_on(addr) - .map_err(|err| err.map(RateLimitedErr::Underlying)) - .map(|listener| { - Listener(RateLimited::from_parts(listener, r.clone(), w.clone())) - }) - } - - fn dial(self, addr: Multiaddr) -> Result> { - let r = self.rlimiter; - let w = self.wlimiter; - let dial = self.value.dial(addr).map_err(|err| err.map(RateLimitedErr::Underlying))?; - Ok(DialFuture { r, w, f: dial }) - } -} - -/// Future to avoid boxing. -pub struct DialFuture { - r: Limiter, - w: Limiter, - f: T -} - -impl Future for DialFuture -where - T: Future, - T::Item: AsyncRead + AsyncWrite, -{ - type Item = Connection; - type Error = RateLimitedErr; - - fn poll(&mut self) -> Poll { - let item = try_ready!(self.f.poll().map_err(RateLimitedErr::Underlying)); - Ok(Async::Ready(Connection::new(item, self.r.clone(), self.w.clone()) - .map_err(RateLimitedErr::LimiterError)?)) - } -}