diff --git a/Cargo.toml b/Cargo.toml index 4374a7b7..ff1d82e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ bytes = "0.4" futures = "0.1" multiaddr = { package = "parity-multiaddr", version = "0.1.0", path = "./misc/multiaddr" } multihash = { package = "parity-multihash", version = "0.1.0", path = "./misc/multihash" } +lazy_static = "1.2" libp2p-mplex = { version = "0.3.0", path = "./muxers/mplex" } libp2p-identify = { version = "0.3.0", path = "./protocols/identify" } libp2p-kad = { version = "0.3.1", path = "./protocols/kad" } @@ -31,6 +32,8 @@ libp2p-secio = { version = "0.3.0", path = "./protocols/secio", default-features libp2p-uds = { version = "0.3.0", path = "./transports/uds" } libp2p-websocket = { version = "0.3.0", path = "./transports/websocket", optional = true } libp2p-yamux = { version = "0.3.0", path = "./muxers/yamux" } +parking_lot = "0.7" +smallvec = "0.6" tokio-codec = "0.1" tokio-executor = "0.1" tokio-io = "0.1" diff --git a/src/bandwidth.rs b/src/bandwidth.rs new file mode 100644 index 00000000..5d6b3692 --- /dev/null +++ b/src/bandwidth.rs @@ -0,0 +1,300 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::{Multiaddr, core::Transport, core::transport::TransportError}; +use futures::{prelude::*, try_ready}; +use lazy_static::lazy_static; +use parking_lot::Mutex; +use smallvec::{smallvec, SmallVec}; +use std::{cmp, io, io::Read, io::Write, sync::Arc, time::Duration, time::Instant}; + +/// Wraps around a `Transport` and logs the bandwidth that goes through all the opened connections. +#[derive(Clone)] +pub struct BandwidthLogging { + inner: TInner, + sinks: Arc, +} + +impl BandwidthLogging { + /// Creates a new `BandwidthLogging` around the transport. + #[inline] + pub fn new(inner: TInner, period: Duration) -> (Self, Arc) { + let mut period_seconds = cmp::min(period.as_secs(), 86400) as u32; + if period.subsec_nanos() > 0 { + period_seconds += 1; + } + + let sink = Arc::new(BandwidthSinks { + download: Mutex::new(BandwidthSink::new(period_seconds)), + upload: Mutex::new(BandwidthSink::new(period_seconds)), + }); + + let trans = BandwidthLogging { + inner, + sinks: sink.clone(), + }; + + (trans, sink) + } +} + +impl Transport for BandwidthLogging +where + TInner: Transport, +{ + type Output = BandwidthConnecLogging; + type Error = TInner::Error; + type Listener = BandwidthListener; + type ListenerUpgrade = BandwidthFuture; + type Dial = BandwidthFuture; + + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError> { + let sinks = self.sinks; + self.inner + .listen_on(addr) + .map(|(inner, new_addr)| (BandwidthListener { inner, sinks }, new_addr)) + } + + fn dial(self, addr: Multiaddr) -> Result> { + let sinks = self.sinks; + self.inner + .dial(addr) + .map(move |fut| BandwidthFuture { + inner: fut, + sinks, + }) + } + + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.nat_traversal(server, observed) + } +} + +/// Wraps around a `Stream` that produces connections. Wraps each connection around a bandwidth +/// counter. +pub struct BandwidthListener { + inner: TInner, + sinks: Arc, +} + +impl Stream for BandwidthListener +where TInner: Stream, +{ + type Item = (BandwidthFuture, Multiaddr); + type Error = TInner::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let (inner, addr) = match try_ready!(self.inner.poll()) { + Some(v) => v, + None => return Ok(Async::Ready(None)) + }; + + let fut = BandwidthFuture { + inner, + sinks: self.sinks.clone(), + }; + + Ok(Async::Ready(Some((fut, addr)))) + } +} + +/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth +/// counter. +pub struct BandwidthFuture { + inner: TInner, + sinks: Arc, +} + +impl Future for BandwidthFuture + where TInner: Future, +{ + type Item = BandwidthConnecLogging; + type Error = TInner::Error; + + fn poll(&mut self) -> Poll { + let inner = try_ready!(self.inner.poll()); + Ok(Async::Ready(BandwidthConnecLogging { + inner, + sinks: self.sinks.clone(), + })) + } +} + +/// Allows obtaining the average bandwidth of the connections created from a `BandwidthLogging`. +pub struct BandwidthSinks { + download: Mutex, + upload: Mutex, +} + +impl BandwidthSinks { + /// Returns the average number of bytes that have been downloaded in the period. + #[inline] + pub fn average_download_per_sec(&self) -> u64 { + self.download.lock().get() + } + + /// Returns the average number of bytes that have been uploaded in the period. + #[inline] + pub fn average_upload_per_sec(&self) -> u64 { + self.upload.lock().get() + } +} + +/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it. +pub struct BandwidthConnecLogging { + inner: TInner, + sinks: Arc, +} + +impl Read for BandwidthConnecLogging + where TInner: Read +{ + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let num_bytes = self.inner.read(buf)?; + self.sinks.download.lock().inject(num_bytes); + Ok(num_bytes) + } +} + +impl tokio_io::AsyncRead for BandwidthConnecLogging + where TInner: tokio_io::AsyncRead +{ +} + +impl Write for BandwidthConnecLogging + where TInner: Write +{ + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + let num_bytes = self.inner.write(buf)?; + self.sinks.upload.lock().inject(num_bytes); + Ok(num_bytes) + } + + #[inline] + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl tokio_io::AsyncWrite for BandwidthConnecLogging + where TInner: tokio_io::AsyncWrite +{ + #[inline] + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.inner.shutdown() + } +} + +/// Returns the number of seconds that have elapsed between an arbitrary EPOCH and now. +#[inline] +fn current_second() -> u32 { + lazy_static! { + static ref EPOCH: Instant = Instant::now(); + } + + EPOCH.elapsed().as_secs() as u32 +} + +/// Structure that calculates the average bandwidth over the last few seconds. +/// +/// If you want to calculate for example both download and upload bandwidths, create two different +/// objects. +struct BandwidthSink { + /// Bytes sent over the past seconds. Contains `rolling_seconds + 1` elements, where + /// `rolling_seconds` is the value passed to `new`. Only the first `rolling_seconds` elements + /// are taken into account for the average, while the last element is the element to be + /// inserted later. + bytes: SmallVec<[u64; 8]>, + /// Number of seconds between `EPOCH` and the moment we have last updated `bytes`. + latest_update: u32, +} + +impl BandwidthSink { + /// Initializes a `BandwidthSink`. + fn new(seconds: u32) -> Self { + BandwidthSink { + bytes: smallvec![0; seconds as usize + 1], + latest_update: current_second(), + } + } + + /// Returns the number of bytes over the last few seconds. The number of seconds is the value + /// configured at initialization. + fn get(&mut self) -> u64 { + self.update(); + let seconds = self.bytes.len() - 1; + self.bytes.iter() + .take(seconds) + .fold(0u64, |a, &b| a.saturating_add(b)) / seconds as u64 + } + + /// Notifies the `BandwidthSink` that a certain number of bytes have been transmitted at this + /// moment. + fn inject(&mut self, bytes: usize) { + self.update(); + if let Some(last) = self.bytes.last_mut() { + *last = last.saturating_add(bytes as u64); + } + } + + /// Updates the state of the `BandwidthSink` so that the last element of `bytes` contains the + /// current second. + fn update(&mut self) { + let current_second = current_second(); + debug_assert!(current_second >= self.latest_update); + let num_iter = cmp::min(current_second - self.latest_update, self.bytes.len() as u32); + for _ in 0..num_iter { + self.bytes.remove(0); + self.bytes.push(0); + } + + self.latest_update = current_second; + } +} + +#[cfg(test)] +mod tests { + use std::{thread, time::Duration}; + use super::*; + + #[test] + fn sink_works() { + let mut sink = BandwidthSink::new(5); + sink.inject(100); + thread::sleep(Duration::from_millis(1000)); + assert_eq!(sink.get(), 20); + sink.inject(100); + thread::sleep(Duration::from_millis(1000)); + assert_eq!(sink.get(), 40); + sink.inject(100); + thread::sleep(Duration::from_millis(1000)); + assert_eq!(sink.get(), 60); + sink.inject(100); + thread::sleep(Duration::from_millis(1000)); + assert_eq!(sink.get(), 80); + sink.inject(100); + thread::sleep(Duration::from_millis(1000)); + assert_eq!(sink.get(), 100); + thread::sleep(Duration::from_millis(1000)); + assert_eq!(sink.get(), 80); + } +} diff --git a/src/lib.rs b/src/lib.rs index c917d18b..24c7fc38 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -179,6 +179,7 @@ pub use libp2p_yamux as yamux; mod transport_ext; +pub mod bandwidth; pub mod simple; pub use self::core::{ diff --git a/src/transport_ext.rs b/src/transport_ext.rs index d2208fa4..ca4cc5b2 100644 --- a/src/transport_ext.rs +++ b/src/transport_ext.rs @@ -20,8 +20,8 @@ //! Provides the `TransportExt` trait. -use crate::{ratelimit::RateLimited, Transport}; -use std::io; +use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, ratelimit::RateLimited, Transport}; +use std::{io, sync::Arc, time::Duration}; use tokio_executor::DefaultExecutor; /// Trait automatically implemented on all objects that implement `Transport`. Provides some @@ -57,6 +57,18 @@ pub trait TransportExt: Transport { ) } + /// Adds a layer on the `Transport` that logs all trafic that passes through the sockets + /// created by it. + /// + /// This method returns an `Arc` that can be used to retreive the bandwidth + /// values. + fn with_bandwidth_logging(self, period: Duration) -> (BandwidthLogging, Arc) + where + Self: Sized + { + BandwidthLogging::new(self, period) + } + // TODO: add methods to easily upgrade for secio/mplex/yamux }