mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-14 10:31:21 +00:00
Remove libp2p-ratelimit 💀 (#1233)
* Remove libp2p-ratelimit 💀
* Fix more
This commit is contained in:
@ -25,7 +25,6 @@ libp2p-kad = { version = "0.12.0", path = "protocols/kad" }
|
||||
libp2p-floodsub = { version = "0.12.0", path = "protocols/floodsub" }
|
||||
libp2p-ping = { version = "0.12.0", path = "protocols/ping" }
|
||||
libp2p-plaintext = { version = "0.12.0", path = "protocols/plaintext" }
|
||||
libp2p-ratelimit = { version = "0.12.0", path = "transports/ratelimit" }
|
||||
libp2p-core = { version = "0.12.0", path = "core" }
|
||||
libp2p-core-derive = { version = "0.12.0", path = "misc/core-derive" }
|
||||
libp2p-secio = { version = "0.12.0", path = "protocols/secio", default-features = false }
|
||||
@ -75,7 +74,6 @@ members = [
|
||||
"protocols/secio",
|
||||
"swarm",
|
||||
"transports/dns",
|
||||
"transports/ratelimit",
|
||||
"transports/tcp",
|
||||
"transports/uds",
|
||||
"transports/websocket",
|
||||
|
@ -188,8 +188,6 @@ pub use libp2p_ping as ping;
|
||||
#[doc(inline)]
|
||||
pub use libp2p_plaintext as plaintext;
|
||||
#[doc(inline)]
|
||||
pub use libp2p_ratelimit as ratelimit;
|
||||
#[doc(inline)]
|
||||
pub use libp2p_secio as secio;
|
||||
#[doc(inline)]
|
||||
pub use libp2p_swarm as swarm;
|
||||
|
@ -20,43 +20,12 @@
|
||||
|
||||
//! Provides the `TransportExt` trait.
|
||||
|
||||
use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, ratelimit::RateLimited, Transport};
|
||||
use std::{io, sync::Arc, time::Duration};
|
||||
use tokio_executor::DefaultExecutor;
|
||||
use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, Transport};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
/// Trait automatically implemented on all objects that implement `Transport`. Provides some
|
||||
/// additional utilities.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// use libp2p::TransportExt;
|
||||
/// use libp2p::tcp::TcpConfig;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let _transport = TcpConfig::new()
|
||||
/// .with_rate_limit(1024 * 1024, 1024 * 1024);
|
||||
/// ```
|
||||
///
|
||||
pub trait TransportExt: Transport {
|
||||
/// Adds a maximum transfer rate to the sockets created with the transport.
|
||||
#[inline]
|
||||
fn with_rate_limit(
|
||||
self,
|
||||
max_read_bytes_per_sec: usize,
|
||||
max_write_bytes_per_sec: usize,
|
||||
) -> io::Result<RateLimited<Self>>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
RateLimited::new(
|
||||
&mut DefaultExecutor::current(),
|
||||
self,
|
||||
max_read_bytes_per_sec,
|
||||
max_write_bytes_per_sec,
|
||||
)
|
||||
}
|
||||
|
||||
/// Adds a layer on the `Transport` that logs all trafic that passes through the sockets
|
||||
/// created by it.
|
||||
///
|
||||
|
@ -1,19 +0,0 @@
|
||||
[package]
|
||||
name = "libp2p-ratelimit"
|
||||
edition = "2018"
|
||||
description = "Transfer rate limiting transport adapter for libp2p"
|
||||
version = "0.12.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
license = "MIT"
|
||||
repository = "https://github.com/libp2p/rust-libp2p"
|
||||
keywords = ["peer-to-peer", "libp2p", "networking"]
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[dependencies]
|
||||
aio-limited = "0.1"
|
||||
bytes = "0.4"
|
||||
futures = "0.1"
|
||||
libp2p-core = { version = "0.12.0", path = "../../core" }
|
||||
log = "0.4"
|
||||
tokio-executor = "0.1"
|
||||
tokio-io = "0.1"
|
@ -1,242 +0,0 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use aio_limited::{Limited, Limiter};
|
||||
use futures::prelude::*;
|
||||
use futures::try_ready;
|
||||
use libp2p_core::{Multiaddr, Transport, transport::{ListenerEvent, TransportError}};
|
||||
use log::error;
|
||||
use std::{error, fmt, io};
|
||||
use tokio_executor::Executor;
|
||||
use tokio_io::{AsyncRead, AsyncWrite, io::{ReadHalf, WriteHalf}};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RateLimited<T> {
|
||||
value: T,
|
||||
rlimiter: Limiter,
|
||||
wlimiter: Limiter,
|
||||
}
|
||||
|
||||
impl<T> RateLimited<T> {
|
||||
pub fn new<E: Executor>(
|
||||
e: &mut E,
|
||||
value: T,
|
||||
max_read: usize,
|
||||
max_write: usize,
|
||||
) -> io::Result<RateLimited<T>> {
|
||||
Ok(RateLimited {
|
||||
value,
|
||||
rlimiter: Limiter::new(e, max_read).map_err(|e| {
|
||||
error!("failed to create read limiter: {}", e);
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
})?,
|
||||
wlimiter: Limiter::new(e, max_write).map_err(|e| {
|
||||
error!("failed to create write limiter: {}", e);
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
})?,
|
||||
})
|
||||
}
|
||||
|
||||
fn from_parts(value: T, r: Limiter, w: Limiter) -> RateLimited<T> {
|
||||
RateLimited {
|
||||
value,
|
||||
rlimiter: r,
|
||||
wlimiter: w,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Error that can be generated by the rate limiter.
|
||||
#[derive(Debug)]
|
||||
pub enum RateLimitedErr<TErr> {
|
||||
/// Error in the underlying transport layer.
|
||||
Underlying(TErr),
|
||||
/// Error while creating a rate limiter.
|
||||
LimiterError(io::Error),
|
||||
}
|
||||
|
||||
impl<TErr> fmt::Display for RateLimitedErr<TErr>
|
||||
where TErr: fmt::Display
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
RateLimitedErr::LimiterError(err) => write!(f, "Limiter initialization error: {}", err),
|
||||
RateLimitedErr::Underlying(err) => write!(f, "{}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TErr> error::Error for RateLimitedErr<TErr>
|
||||
where TErr: error::Error + 'static
|
||||
{
|
||||
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
|
||||
match self {
|
||||
RateLimitedErr::LimiterError(err) => Some(err),
|
||||
RateLimitedErr::Underlying(err) => Some(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A rate-limited connection.
|
||||
pub struct Connection<C: AsyncRead + AsyncWrite> {
|
||||
reader: Limited<ReadHalf<C>>,
|
||||
writer: Limited<WriteHalf<C>>,
|
||||
}
|
||||
|
||||
impl<C: AsyncRead + AsyncWrite> Connection<C> {
|
||||
pub fn new(c: C, rlimiter: Limiter, wlimiter: Limiter) -> io::Result<Connection<C>> {
|
||||
let (r, w) = c.split();
|
||||
Ok(Connection {
|
||||
reader: Limited::new(r, rlimiter).map_err(|e| {
|
||||
error!("failed to create limited reader: {}", e);
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
})?,
|
||||
writer: Limited::new(w, wlimiter).map_err(|e| {
|
||||
error!("failed to create limited writer: {}", e);
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
})?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: AsyncRead + AsyncWrite> io::Read for Connection<C> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.reader.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: AsyncRead + AsyncWrite> io::Write for Connection<C> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.writer.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.writer.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: AsyncRead + AsyncWrite> AsyncRead for Connection<C> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.reader.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
self.reader.read_buf(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: AsyncRead + AsyncWrite> AsyncWrite for Connection<C> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.writer.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Listener<T: Transport>(RateLimited<T::Listener>);
|
||||
|
||||
impl<T: Transport> Stream for Listener<T> {
|
||||
type Item = ListenerEvent<ListenerUpgrade<T>>;
|
||||
type Error = RateLimitedErr<T::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match try_ready!(self.0.value.poll().map_err(RateLimitedErr::Underlying)) {
|
||||
Some(event) => {
|
||||
let event = event.map(|upgrade| {
|
||||
let r = self.0.rlimiter.clone();
|
||||
let w = self.0.wlimiter.clone();
|
||||
ListenerUpgrade(RateLimited::from_parts(upgrade, r, w))
|
||||
});
|
||||
Ok(Async::Ready(Some(event)))
|
||||
}
|
||||
None => Ok(Async::Ready(None)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
pub struct ListenerUpgrade<T: Transport>(RateLimited<T::ListenerUpgrade>);
|
||||
|
||||
impl<T> Future for ListenerUpgrade<T>
|
||||
where
|
||||
T: Transport,
|
||||
T::Output: AsyncRead + AsyncWrite
|
||||
{
|
||||
type Item = Connection<T::Output>;
|
||||
type Error = RateLimitedErr<T::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let conn = try_ready!(self.0.value.poll().map_err(RateLimitedErr::Underlying));
|
||||
let r = self.0.rlimiter.clone();
|
||||
let w = self.0.wlimiter.clone();
|
||||
Ok(Async::Ready(Connection::new(conn, r, w).map_err(RateLimitedErr::LimiterError)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Transport for RateLimited<T>
|
||||
where
|
||||
T: Transport,
|
||||
T::Output: AsyncRead + AsyncWrite,
|
||||
T::Error: 'static,
|
||||
{
|
||||
type Output = Connection<T::Output>;
|
||||
type Error = RateLimitedErr<T::Error>;
|
||||
type Listener = Listener<T>;
|
||||
type ListenerUpgrade = ListenerUpgrade<T>;
|
||||
type Dial = DialFuture<T::Dial>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
|
||||
let r = self.rlimiter;
|
||||
let w = self.wlimiter;
|
||||
self.value
|
||||
.listen_on(addr)
|
||||
.map_err(|err| err.map(RateLimitedErr::Underlying))
|
||||
.map(|listener| {
|
||||
Listener(RateLimited::from_parts(listener, r.clone(), w.clone()))
|
||||
})
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||
let r = self.rlimiter;
|
||||
let w = self.wlimiter;
|
||||
let dial = self.value.dial(addr).map_err(|err| err.map(RateLimitedErr::Underlying))?;
|
||||
Ok(DialFuture { r, w, f: dial })
|
||||
}
|
||||
}
|
||||
|
||||
/// Future to avoid boxing.
|
||||
pub struct DialFuture<T> {
|
||||
r: Limiter,
|
||||
w: Limiter,
|
||||
f: T
|
||||
}
|
||||
|
||||
impl<T> Future for DialFuture<T>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Item = Connection<T::Item>;
|
||||
type Error = RateLimitedErr<T::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let item = try_ready!(self.f.poll().map_err(RateLimitedErr::Underlying));
|
||||
Ok(Async::Ready(Connection::new(item, self.r.clone(), self.w.clone())
|
||||
.map_err(RateLimitedErr::LimiterError)?))
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user