Add support for yamux multiplexing. (#284)

* Add support for yamux multiplexing.

* Address review comments.
This commit is contained in:
Toralf Wittner
2018-07-10 14:59:24 +02:00
committed by Pierre Krieger
parent fd01b18e53
commit d5602e7687
8 changed files with 300 additions and 5 deletions

View File

@ -23,4 +23,5 @@ members = [
"transport-timeout",
"varint-rs",
"websocket",
"yamux"
]

View File

@ -22,6 +22,7 @@ libp2p-relay = { path = "../relay" }
libp2p-core = { path = "../core" }
libp2p-transport-timeout = { path = "../transport-timeout" }
libp2p-websocket = { path = "../websocket" }
libp2p-yamux = { path = "../yamux" }
tokio-codec = "0.1"
tokio-io = "0.1"

View File

@ -50,6 +50,7 @@ extern crate bytes;
extern crate env_logger;
extern crate futures;
extern crate libp2p;
extern crate libp2p_yamux;
extern crate rand;
#[macro_use]
extern crate structopt;
@ -128,7 +129,9 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box<Error>> {
}
let transport = {
let tcp = TcpConfig::new(core.handle());
let tcp = TcpConfig::new(core.handle())
.with_upgrade(libp2p_yamux::Config::default())
.into_connection_reuse();
RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing()
};
@ -161,7 +164,10 @@ fn run_listener(opts: ListenerOpts) -> Result<(), Box<Error>> {
store.peer_or_create(&p).add_addr(a, Duration::from_secs(600))
}
let transport = TcpConfig::new(core.handle()).with_dummy_muxing();
let transport = TcpConfig::new(core.handle())
.with_upgrade(libp2p_yamux::Config::default())
.into_connection_reuse();
let relay = RelayConfig::new(opts.me, transport.clone(), store);
let echo = SimpleProtocol::new("/echo/1.0.0", |socket| {

138
relay/src/copy.rs Normal file
View File

@ -0,0 +1,138 @@
// Copyright (c) 2018 Tokio Contributors
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
// Based on copy.rs from tokio-io in https://github.com/tokio-rs/tokio
use std::io;
use futures::{Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
/// A future which will copy all data from a reader into a writer.
///
/// Created by the [`copy`] function, this future will resolve to the number of
/// bytes copied or an error if one happens.
///
/// [`copy`]: fn.copy.html
#[derive(Debug)]
pub struct Copy<R, W> {
reader: Option<R>,
read_done: bool,
flush_done: bool,
writer: Option<W>,
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
}
/// Creates a future which represents copying all the bytes from one object to
/// another.
///
/// The returned future will copy all the bytes read from `reader` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned and the `reader` and `writer` are
/// consumed. On error the error is returned and the I/O objects are consumed as
/// well.
pub fn flushing_copy<R, W>(reader: R, writer: W) -> Copy<R, W>
where R: AsyncRead,
W: AsyncWrite,
{
Copy {
reader: Some(reader),
read_done: false,
flush_done: true,
writer: Some(writer),
amt: 0,
pos: 0,
cap: 0,
buf: Box::new([0; 2048]),
}
}
impl<R, W> Future for Copy<R, W>
where R: AsyncRead,
W: AsyncWrite,
{
type Item = (u64, R, W);
type Error = io::Error;
fn poll(&mut self) -> Poll<(u64, R, W), io::Error> {
debug_assert!(self.reader.is_some() && self.writer.is_some(),
"poll() has been called again after returning Ok");
loop {
// Still not finished flushing
if !self.flush_done {
try_ready!(self.writer.as_mut().unwrap().poll_flush());
self.flush_done = true
}
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
let reader = self.reader.as_mut().unwrap();
let n = try_ready!(reader.poll_read(&mut self.buf));
if n == 0 {
self.read_done = true;
} else {
self.pos = 0;
self.cap = n;
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let writer = self.writer.as_mut().unwrap();
let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap]));
if i == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero,
"write zero byte into writer"));
} else {
self.pos += i;
self.amt += i as u64;
}
}
// The buffered data has been written, let's flush it!
if self.pos == self.cap && !self.read_done {
self.flush_done = false;
continue
}
// Everything has been copied.
if self.pos == self.cap && self.read_done {
try_ready!(self.writer.as_mut().unwrap().poll_flush());
let reader = self.reader.take().unwrap();
let writer = self.writer.take().unwrap();
return Ok((self.amt, reader, writer).into())
}
}
}
}

View File

@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE.
extern crate bytes;
#[macro_use]
extern crate futures;
extern crate libp2p_peerstore as peerstore;
extern crate libp2p_core as core;
@ -31,6 +32,7 @@ extern crate tokio_codec;
extern crate tokio_io;
extern crate varint;
mod copy;
mod message;
mod protocol;
mod transport;

View File

@ -19,12 +19,13 @@
// DEALINGS IN THE SOFTWARE.
use bytes::Bytes;
use copy;
use core::{ConnectionUpgrade, Endpoint, Transport};
use futures::{stream, future::{self, Either::{A, B}, FutureResult}, prelude::*};
use message::{CircuitRelay, CircuitRelay_Peer, CircuitRelay_Status, CircuitRelay_Type};
use peerstore::{PeerAccess, PeerId, Peerstore};
use std::{io, iter, ops::Deref};
use tokio_io::{io as aio, AsyncRead, AsyncWrite};
use tokio_io::{AsyncRead, AsyncWrite};
use utility::{io_err, is_success, status, Io, Peer};
#[derive(Debug, Clone)]
@ -187,8 +188,8 @@ where
let future = {
let (src_r, src_w) = src.split();
let (dst_r, dst_w) = dst.split();
let a = aio::copy(src_r, dst_w).map(|_| ());
let b = aio::copy(dst_r, src_w).map(|_| ());
let a = copy::flushing_copy(src_r, dst_w).map(|_| ());
let b = copy::flushing_copy(dst_r, src_w).map(|_| ());
a.select(b).map(|_| ()).map_err(|(e, _)| e)
};
Ok(future)

12
yamux/Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "libp2p-yamux"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
bytes = "0.4"
futures = "0.1"
libp2p-core = { path = "../core" }
log = "0.4"
tokio-io = "0.1"
yamux = { git = "https://github.com/paritytech/yamux" }

134
yamux/src/lib.rs Normal file
View File

@ -0,0 +1,134 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
extern crate bytes;
extern crate futures;
#[macro_use]
extern crate log;
extern crate libp2p_core as core;
extern crate tokio_io;
extern crate yamux;
use bytes::Bytes;
use core::Endpoint;
use futures::{future::{self, FutureResult}, prelude::*};
use std::{io, iter};
use tokio_io::{AsyncRead, AsyncWrite};
pub struct Yamux<C>(yamux::Connection<C>);
impl<C> Clone for Yamux<C> {
fn clone(&self) -> Self {
Yamux(self.0.clone())
}
}
impl<C> Yamux<C>
where
C: AsyncRead + AsyncWrite + 'static
{
pub fn new(c: C, cfg: yamux::Config, mode: yamux::Mode) -> Self {
Yamux(yamux::Connection::new(c, cfg, mode))
}
}
impl<C> core::StreamMuxer for Yamux<C>
where
C: AsyncRead + AsyncWrite + 'static
{
type Substream = yamux::StreamHandle<C>;
type InboundSubstream = InboundFuture<C>;
type OutboundSubstream = FutureResult<Option<Self::Substream>, io::Error>;
fn inbound(self) -> Self::InboundSubstream {
InboundFuture(self.0)
}
fn outbound(self) -> Self::OutboundSubstream {
let stream = self.0.open_stream().map_err(|e| io::Error::new(io::ErrorKind::Other, e));
future::result(stream)
}
}
pub struct InboundFuture<C>(yamux::Connection<C>);
impl<C> Future for InboundFuture<C>
where
C: AsyncRead + AsyncWrite + 'static
{
type Item = Option<yamux::StreamHandle<C>>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.poll() {
Err(e) => {
error!("connection error: {}", e);
Err(io::Error::new(io::ErrorKind::Other, e))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::Ready(Some(stream))) => Ok(Async::Ready(Some(stream)))
}
}
}
#[derive(Clone)]
pub struct Config(yamux::Config);
impl Config {
pub fn new(cfg: yamux::Config) -> Self {
Config(cfg)
}
}
impl Default for Config {
fn default() -> Self {
Config(yamux::Config::default())
}
}
impl<C, M> core::ConnectionUpgrade<C, M> for Config
where
C: AsyncRead + AsyncWrite + 'static,
M: 'static
{
type UpgradeIdentifier = ();
type NamesIter = iter::Once<(Bytes, ())>;
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/yamux/1.0.0"), ()))
}
type Output = Yamux<C>;
type MultiaddrFuture = M;
type Future = FutureResult<(Yamux<C>, M), io::Error>;
fn upgrade(self, i: C, _: (), end: Endpoint, remote: M) -> Self::Future {
let mode = match end {
Endpoint::Dialer => yamux::Mode::Client,
Endpoint::Listener => yamux::Mode::Server
};
future::ok((Yamux::new(i, self.0, mode), remote))
}
}