From d5602e76875ea56c3e60a7fd144586d591ba519e Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 10 Jul 2018 14:59:24 +0200 Subject: [PATCH] Add support for yamux multiplexing. (#284) * Add support for yamux multiplexing. * Address review comments. --- Cargo.toml | 1 + libp2p/Cargo.toml | 1 + libp2p/examples/relay.rs | 10 ++- relay/src/copy.rs | 138 +++++++++++++++++++++++++++++++++++++++ relay/src/lib.rs | 2 + relay/src/protocol.rs | 7 +- yamux/Cargo.toml | 12 ++++ yamux/src/lib.rs | 134 +++++++++++++++++++++++++++++++++++++ 8 files changed, 300 insertions(+), 5 deletions(-) create mode 100644 relay/src/copy.rs create mode 100644 yamux/Cargo.toml create mode 100644 yamux/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 70f2ae76..68382b27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,4 +23,5 @@ members = [ "transport-timeout", "varint-rs", "websocket", + "yamux" ] diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index aa003158..15ff507c 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -22,6 +22,7 @@ libp2p-relay = { path = "../relay" } libp2p-core = { path = "../core" } libp2p-transport-timeout = { path = "../transport-timeout" } libp2p-websocket = { path = "../websocket" } +libp2p-yamux = { path = "../yamux" } tokio-codec = "0.1" tokio-io = "0.1" diff --git a/libp2p/examples/relay.rs b/libp2p/examples/relay.rs index 90a91a9a..90332556 100644 --- a/libp2p/examples/relay.rs +++ b/libp2p/examples/relay.rs @@ -50,6 +50,7 @@ extern crate bytes; extern crate env_logger; extern crate futures; extern crate libp2p; +extern crate libp2p_yamux; extern crate rand; #[macro_use] extern crate structopt; @@ -128,7 +129,9 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box> { } let transport = { - let tcp = TcpConfig::new(core.handle()); + let tcp = TcpConfig::new(core.handle()) + .with_upgrade(libp2p_yamux::Config::default()) + .into_connection_reuse(); RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing() }; @@ -161,7 +164,10 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box> { store.peer_or_create(&p).add_addr(a, Duration::from_secs(600)) } - let transport = TcpConfig::new(core.handle()).with_dummy_muxing(); + let transport = TcpConfig::new(core.handle()) + .with_upgrade(libp2p_yamux::Config::default()) + .into_connection_reuse(); + let relay = RelayConfig::new(opts.me, transport.clone(), store); let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { diff --git a/relay/src/copy.rs b/relay/src/copy.rs new file mode 100644 index 00000000..cfa19427 --- /dev/null +++ b/relay/src/copy.rs @@ -0,0 +1,138 @@ +// Copyright (c) 2018 Tokio Contributors +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +// Based on copy.rs from tokio-io in https://github.com/tokio-rs/tokio + +use std::io; +use futures::{Future, Poll}; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the [`copy`] function, this future will resolve to the number of +/// bytes copied or an error if one happens. +/// +/// [`copy`]: fn.copy.html +#[derive(Debug)] +pub struct Copy { + reader: Option, + read_done: bool, + flush_done: bool, + writer: Option, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn flushing_copy(reader: R, writer: W) -> Copy + where R: AsyncRead, + W: AsyncWrite, +{ + Copy { + reader: Some(reader), + read_done: false, + flush_done: true, + writer: Some(writer), + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl Future for Copy + where R: AsyncRead, + W: AsyncWrite, +{ + type Item = (u64, R, W); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(u64, R, W), io::Error> { + debug_assert!(self.reader.is_some() && self.writer.is_some(), + "poll() has been called again after returning Ok"); + + loop { + // Still not finished flushing + if !self.flush_done { + try_ready!(self.writer.as_mut().unwrap().poll_flush()); + self.flush_done = true + } + + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let reader = self.reader.as_mut().unwrap(); + let n = try_ready!(reader.poll_read(&mut self.buf)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let writer = self.writer.as_mut().unwrap(); + let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap])); + if i == 0 { + return Err(io::Error::new(io::ErrorKind::WriteZero, + "write zero byte into writer")); + } else { + self.pos += i; + self.amt += i as u64; + } + } + + // The buffered data has been written, let's flush it! + if self.pos == self.cap && !self.read_done { + self.flush_done = false; + continue + } + + // Everything has been copied. + if self.pos == self.cap && self.read_done { + try_ready!(self.writer.as_mut().unwrap().poll_flush()); + let reader = self.reader.take().unwrap(); + let writer = self.writer.take().unwrap(); + return Ok((self.amt, reader, writer).into()) + } + } + } +} + diff --git a/relay/src/lib.rs b/relay/src/lib.rs index f77c59de..9beaf7ff 100644 --- a/relay/src/lib.rs +++ b/relay/src/lib.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. extern crate bytes; +#[macro_use] extern crate futures; extern crate libp2p_peerstore as peerstore; extern crate libp2p_core as core; @@ -31,6 +32,7 @@ extern crate tokio_codec; extern crate tokio_io; extern crate varint; +mod copy; mod message; mod protocol; mod transport; diff --git a/relay/src/protocol.rs b/relay/src/protocol.rs index eedf9767..fc8c5779 100644 --- a/relay/src/protocol.rs +++ b/relay/src/protocol.rs @@ -19,12 +19,13 @@ // DEALINGS IN THE SOFTWARE. use bytes::Bytes; +use copy; use core::{ConnectionUpgrade, Endpoint, Transport}; use futures::{stream, future::{self, Either::{A, B}, FutureResult}, prelude::*}; use message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Status, CircuitRelay_Type}; use peerstore::{PeerAccess, PeerId, Peerstore}; use std::{io, iter, ops::Deref}; -use tokio_io::{io as aio, AsyncRead, AsyncWrite}; +use tokio_io::{AsyncRead, AsyncWrite}; use utility::{io_err, is_success, status, Io, Peer}; #[derive(Debug, Clone)] @@ -187,8 +188,8 @@ where let future = { let (src_r, src_w) = src.split(); let (dst_r, dst_w) = dst.split(); - let a = aio::copy(src_r, dst_w).map(|_| ()); - let b = aio::copy(dst_r, src_w).map(|_| ()); + let a = copy::flushing_copy(src_r, dst_w).map(|_| ()); + let b = copy::flushing_copy(dst_r, src_w).map(|_| ()); a.select(b).map(|_| ()).map_err(|(e, _)| e) }; Ok(future) diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml new file mode 100644 index 00000000..b100c4fc --- /dev/null +++ b/yamux/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "libp2p-yamux" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +bytes = "0.4" +futures = "0.1" +libp2p-core = { path = "../core" } +log = "0.4" +tokio-io = "0.1" +yamux = { git = "https://github.com/paritytech/yamux" } diff --git a/yamux/src/lib.rs b/yamux/src/lib.rs new file mode 100644 index 00000000..2c3cc2ee --- /dev/null +++ b/yamux/src/lib.rs @@ -0,0 +1,134 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +extern crate bytes; +extern crate futures; +#[macro_use] +extern crate log; +extern crate libp2p_core as core; +extern crate tokio_io; +extern crate yamux; + +use bytes::Bytes; +use core::Endpoint; +use futures::{future::{self, FutureResult}, prelude::*}; +use std::{io, iter}; +use tokio_io::{AsyncRead, AsyncWrite}; + + +pub struct Yamux(yamux::Connection); + +impl Clone for Yamux { + fn clone(&self) -> Self { + Yamux(self.0.clone()) + } +} + +impl Yamux +where + C: AsyncRead + AsyncWrite + 'static +{ + pub fn new(c: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { + Yamux(yamux::Connection::new(c, cfg, mode)) + } +} + +impl core::StreamMuxer for Yamux +where + C: AsyncRead + AsyncWrite + 'static +{ + type Substream = yamux::StreamHandle; + type InboundSubstream = InboundFuture; + type OutboundSubstream = FutureResult, io::Error>; + + fn inbound(self) -> Self::InboundSubstream { + InboundFuture(self.0) + } + + fn outbound(self) -> Self::OutboundSubstream { + let stream = self.0.open_stream().map_err(|e| io::Error::new(io::ErrorKind::Other, e)); + future::result(stream) + } +} + + +pub struct InboundFuture(yamux::Connection); + +impl Future for InboundFuture +where + C: AsyncRead + AsyncWrite + 'static +{ + type Item = Option>; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + match self.0.poll() { + Err(e) => { + error!("connection error: {}", e); + Err(io::Error::new(io::ErrorKind::Other, e)) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::Ready(Some(stream))) => Ok(Async::Ready(Some(stream))) + } + } +} + + +#[derive(Clone)] +pub struct Config(yamux::Config); + +impl Config { + pub fn new(cfg: yamux::Config) -> Self { + Config(cfg) + } +} + +impl Default for Config { + fn default() -> Self { + Config(yamux::Config::default()) + } +} + +impl core::ConnectionUpgrade for Config +where + C: AsyncRead + AsyncWrite + 'static, + M: 'static +{ + type UpgradeIdentifier = (); + type NamesIter = iter::Once<(Bytes, ())>; + + fn protocol_names(&self) -> Self::NamesIter { + iter::once((Bytes::from("/yamux/1.0.0"), ())) + } + + type Output = Yamux; + type MultiaddrFuture = M; + type Future = FutureResult<(Yamux, M), io::Error>; + + fn upgrade(self, i: C, _: (), end: Endpoint, remote: M) -> Self::Future { + let mode = match end { + Endpoint::Dialer => yamux::Mode::Client, + Endpoint::Listener => yamux::Mode::Server + }; + future::ok((Yamux::new(i, self.0, mode), remote)) + } +} +