Reimplement the websocket transport. (#1150)

* Begin reimplementing the websocket transport.

* Add TLS support.

* Add support for redirects during handshake.

* Cosmetics.

* Remove unused error cases in tls module.

Left-overs from a previous implementation.

* No libp2p-websocket for wasm targets.

* Change tls::Config to make the server optional.

* Update transports/websocket/src/lib.rs

Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>

* Duplicate config methods.

As per PR review feedback.
This commit is contained in:
Toralf Wittner
2019-06-04 11:47:20 +02:00
committed by Pierre Krieger
parent 34e7e35310
commit e56c4c10ed
11 changed files with 889 additions and 799 deletions

View File

@ -32,7 +32,6 @@ libp2p-core-derive = { version = "0.8.0", path = "./misc/core-derive" }
libp2p-secio = { version = "0.8.0", path = "./protocols/secio", default-features = false } libp2p-secio = { version = "0.8.0", path = "./protocols/secio", default-features = false }
libp2p-uds = { version = "0.8.0", path = "./transports/uds" } libp2p-uds = { version = "0.8.0", path = "./transports/uds" }
libp2p-wasm-ext = { version = "0.1.0", path = "./transports/wasm-ext" } libp2p-wasm-ext = { version = "0.1.0", path = "./transports/wasm-ext" }
libp2p-websocket = { version = "0.8.0", path = "./transports/websocket", optional = true }
libp2p-yamux = { version = "0.8.0", path = "./muxers/yamux" } libp2p-yamux = { version = "0.8.0", path = "./muxers/yamux" }
parking_lot = "0.8" parking_lot = "0.8"
smallvec = "0.6" smallvec = "0.6"
@ -46,6 +45,7 @@ libp2p-dns = { version = "0.8.0", path = "./transports/dns" }
libp2p-mdns = { version = "0.8.0", path = "./misc/mdns" } libp2p-mdns = { version = "0.8.0", path = "./misc/mdns" }
libp2p-noise = { version = "0.6.0", path = "./protocols/noise" } libp2p-noise = { version = "0.6.0", path = "./protocols/noise" }
libp2p-tcp = { version = "0.8.0", path = "./transports/tcp" } libp2p-tcp = { version = "0.8.0", path = "./transports/tcp" }
libp2p-websocket = { version = "0.8.0", path = "./transports/websocket", optional = true }
[dev-dependencies] [dev-dependencies]
env_logger = "0.6.0" env_logger = "0.6.0"

View File

@ -88,7 +88,6 @@ where
A: Read, A: Read,
B: Read, B: Read,
{ {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
match self { match self {
EitherOutput::First(a) => a.read(buf), EitherOutput::First(a) => a.read(buf),
@ -102,7 +101,6 @@ where
A: AsyncWrite, A: AsyncWrite,
B: AsyncWrite, B: AsyncWrite,
{ {
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> { fn shutdown(&mut self) -> Poll<(), IoError> {
match self { match self {
EitherOutput::First(a) => a.shutdown(), EitherOutput::First(a) => a.shutdown(),
@ -116,7 +114,6 @@ where
A: Write, A: Write,
B: Write, B: Write,
{ {
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> { fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
match self { match self {
EitherOutput::First(a) => a.write(buf), EitherOutput::First(a) => a.write(buf),
@ -124,7 +121,6 @@ where
} }
} }
#[inline]
fn flush(&mut self) -> Result<(), IoError> { fn flush(&mut self) -> Result<(), IoError> {
match self { match self {
EitherOutput::First(a) => a.flush(), EitherOutput::First(a) => a.flush(),
@ -302,7 +298,6 @@ where
type Item = ListenerEvent<EitherFuture<AInner, BInner>>; type Item = ListenerEvent<EitherFuture<AInner, BInner>>;
type Error = EitherError<AStream::Error, BStream::Error>; type Error = EitherError<AStream::Error, BStream::Error>;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self { match self {
EitherListenStream::First(a) => a.poll() EitherListenStream::First(a) => a.poll()
@ -331,7 +326,6 @@ where
type Item = EitherOutput<AInner, BInner>; type Item = EitherOutput<AInner, BInner>;
type Error = EitherError<AFuture::Error, BFuture::Error>; type Error = EitherError<AFuture::Error, BFuture::Error>;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self { match self {
EitherFuture::First(a) => a.poll().map(|v| v.map(EitherOutput::First)).map_err(EitherError::A), EitherFuture::First(a) => a.poll().map(|v| v.map(EitherOutput::First)).map_err(EitherError::A),

View File

@ -51,7 +51,6 @@ where
S::Item: IntoBuf, S::Item: IntoBuf,
{ {
/// Wraps around `inner`. /// Wraps around `inner`.
#[inline]
pub fn new(inner: S) -> RwStreamSink<S> { pub fn new(inner: S) -> RwStreamSink<S> {
RwStreamSink { inner, current_item: None } RwStreamSink { inner, current_item: None }
} }
@ -102,7 +101,6 @@ where
S::SinkItem: for<'r> From<&'r [u8]>, S::SinkItem: for<'r> From<&'r [u8]>,
S::Item: IntoBuf, S::Item: IntoBuf,
{ {
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> { fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
let len = buf.len(); let len = buf.len();
match self.inner.start_send(buf.into())? { match self.inner.start_send(buf.into())? {
@ -111,7 +109,6 @@ where
} }
} }
#[inline]
fn flush(&mut self) -> Result<(), IoError> { fn flush(&mut self) -> Result<(), IoError> {
match self.inner.poll_complete()? { match self.inner.poll_complete()? {
Async::Ready(()) => Ok(()), Async::Ready(()) => Ok(()),
@ -126,7 +123,6 @@ where
S::SinkItem: for<'r> From<&'r [u8]>, S::SinkItem: for<'r> From<&'r [u8]>,
S::Item: IntoBuf, S::Item: IntoBuf,
{ {
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> { fn shutdown(&mut self) -> Poll<(), IoError> {
self.inner.close() self.inner.close()
} }

View File

@ -197,7 +197,7 @@ pub use libp2p_tcp as tcp;
pub use libp2p_uds as uds; pub use libp2p_uds as uds;
#[doc(inline)] #[doc(inline)]
pub use libp2p_wasm_ext as wasm_ext; pub use libp2p_wasm_ext as wasm_ext;
#[cfg(feature = "libp2p-websocket")] #[cfg(all(feature = "libp2p-websocket", not(any(target_os = "emscripten", target_os = "unknown"))))]
#[doc(inline)] #[doc(inline)]
pub use libp2p_websocket as websocket; pub use libp2p_websocket as websocket;
#[doc(inline)] #[doc(inline)]
@ -270,9 +270,7 @@ struct CommonTransport {
type InnerImplementation = core::transport::OrTransport<dns::DnsConfig<tcp::TcpConfig>, websocket::WsConfig<dns::DnsConfig<tcp::TcpConfig>>>; type InnerImplementation = core::transport::OrTransport<dns::DnsConfig<tcp::TcpConfig>, websocket::WsConfig<dns::DnsConfig<tcp::TcpConfig>>>;
#[cfg(all(not(any(target_os = "emscripten", target_os = "unknown")), not(feature = "libp2p-websocket")))] #[cfg(all(not(any(target_os = "emscripten", target_os = "unknown")), not(feature = "libp2p-websocket")))]
type InnerImplementation = dns::DnsConfig<tcp::TcpConfig>; type InnerImplementation = dns::DnsConfig<tcp::TcpConfig>;
#[cfg(all(any(target_os = "emscripten", target_os = "unknown"), feature = "libp2p-websocket"))] #[cfg(any(target_os = "emscripten", target_os = "unknown"))]
type InnerImplementation = websocket::BrowserWsConfig;
#[cfg(all(any(target_os = "emscripten", target_os = "unknown"), not(feature = "libp2p-websocket")))]
type InnerImplementation = core::transport::dummy::DummyTransport; type InnerImplementation = core::transport::dummy::DummyTransport;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -298,16 +296,7 @@ impl CommonTransport {
} }
/// Initializes the `CommonTransport`. /// Initializes the `CommonTransport`.
#[cfg(all(any(target_os = "emscripten", target_os = "unknown"), feature = "libp2p-websocket"))] #[cfg(any(target_os = "emscripten", target_os = "unknown"))]
pub fn new() -> CommonTransport {
let inner = websocket::BrowserWsConfig::new();
CommonTransport {
inner: CommonTransportInner { inner }
}
}
/// Initializes the `CommonTransport`.
#[cfg(all(any(target_os = "emscripten", target_os = "unknown"), not(feature = "libp2p-websocket")))]
pub fn new() -> CommonTransport { pub fn new() -> CommonTransport {
let inner = core::transport::dummy::DummyTransport::new(); let inner = core::transport::dummy::DummyTransport::new();
CommonTransport { CommonTransport {

View File

@ -10,20 +10,18 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
[dependencies] [dependencies]
libp2p-core = { version = "0.8.0", path = "../../core" } bytes = "0.4.6"
futures = "0.1" futures = "0.1"
libp2p-core = { version = "0.8.0", path = "../../core" }
log = "0.4.1" log = "0.4.1"
rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" } rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" }
tokio-io = "0.1" tokio-codec = "0.1.1"
tokio-io = "0.1.12"
tokio-rustls = "0.10.0-alpha.3"
soketto = { git = "https://github.com/paritytech/soketto.git" }
url = "1.7.2"
webpki-roots = "0.16.0"
[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] [dev-dependencies]
websocket = { version = "0.22.2", default-features = false, features = ["async", "async-ssl"] }
[target.'cfg(any(target_os = "emscripten", target_os = "unknown"))'.dependencies]
bytes = "0.4"
stdweb = { version = "0.4", default-features = false }
wasm-bindgen = "0.2.42"
[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dev-dependencies]
libp2p-tcp = { version = "0.8.0", path = "../tcp" } libp2p-tcp = { version = "0.8.0", path = "../tcp" }
tokio = "0.1" tokio = "0.1.20"

View File

@ -1,345 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use log::debug;
use futures::{future, stream};
use futures::stream::Then as StreamThen;
use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, Poll, Stream};
use rw_stream_sink::RwStreamSink;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use stdweb::web::TypedArray;
use stdweb::{self, Reference};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::{ListenerEvent, TransportError}
};
use tokio_io::{AsyncRead, AsyncWrite};
/// Represents the configuration for a websocket transport capability for libp2p.
///
/// This implementation of `Transport` accepts any address that looks like
/// `/ip4/.../tcp/.../ws`, `/ip6/.../tcp/.../ws`, `/dns4/.../ws` or `/dns6/.../ws`, and connect to
/// the corresponding IP and port.
///
/// If the underlying multiaddress uses `/dns4` or `/dns6`, then the domain name will be passed in
/// the headers of the request. This is important is the listener is behind an HTTP proxy.
#[derive(Debug, Clone)]
pub struct BrowserWsConfig;
impl BrowserWsConfig {
/// Creates a new configuration object for websocket.
#[inline]
pub fn new() -> BrowserWsConfig {
BrowserWsConfig
}
}
impl Transport for BrowserWsConfig {
type Output = BrowserWsConn;
type Error = IoError; // TODO: better error type?
type Listener = stream::Empty<ListenerEvent<Self::ListenerUpgrade>, IoError>;
type ListenerUpgrade = future::Empty<Self::Output, IoError>;
type Dial = Box<Future<Item = Self::Output, Error = IoError> + Send>;
fn listen_on(self, a: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
// Listening is never supported.
Err(TransportError::MultiaddrNotSupported(a))
}
fn dial(self, original_addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
// Making sure we are initialized before we dial. Initialization is protected by a simple
// boolean static variable, so it's not a problem to call it multiple times and the cost
// is negligible.
stdweb::initialize();
// Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as
// a string) on success.
let inner_addr = match multiaddr_to_target(&original_addr) {
Ok(a) => a,
Err(_) => return Err(TransportError::MultiaddrNotSupported(original_addr)),
};
debug!("Dialing {}", original_addr);
// Create the JS `WebSocket` object.
let websocket = {
let val = js! {
try {
return new WebSocket(@{inner_addr});
} catch(e) {
return false;
}
};
match val.into_reference() {
Some(ws) => ws,
// TODO: more descriptive error
None => return Err(TransportError::Other(IoErrorKind::Other.into())), // `false` was returned by `js!`
}
};
// Create a `message` channel that will be used for both bytes messages and errors, and a
// `message_cb` used for the `message` event on the WebSocket.
// `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed
// by `open_cb`.
let (message_tx, message_rx) = mpsc::unbounded::<Result<Vec<u8>, IoError>>();
let message_tx = Arc::new(message_tx);
let mut message_rx = Some(message_rx);
let message_cb = {
let message_tx = message_tx.clone();
move |message_data: Reference| {
if let Some(buffer) = message_data.downcast::<TypedArray<u8>>() {
let _ = message_tx.unbounded_send(Ok(buffer.to_vec()));
} else {
let _ = message_tx.unbounded_send(Err(IoError::new(
IoErrorKind::InvalidData,
"received ws message of unknown type",
)));
}
}
};
// Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents
// the open dialing websocket. Also create a `open_cb` callback that will be used for the
// `open` message of the websocket.
let (open_tx, open_rx) = oneshot::channel::<Result<BrowserWsConn, IoError>>();
let open_tx = Arc::new(Mutex::new(Some(open_tx)));
let websocket_clone = websocket.clone();
let open_cb = {
let open_tx = open_tx.clone();
move || {
// Note that `open_tx` can be empty (and a panic happens) if the `open` event
// is triggered twice, or is triggered after the `close` event. We never reuse the
// same websocket twice, so this is not supposed to happen.
let tx = open_tx
.lock()
.unwrap()
.take()
.expect("the websocket can only open once");
// `message_rx` can be empty if the `open` event is triggered twice, which again
// is not supposed to happen.
let message_rx = message_rx.take().expect("the websocket can only open once");
// Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that
// would happen the future has been dropped by the user.
let _ = tx.send(Ok(BrowserWsConn {
websocket: websocket_clone.clone(),
incoming_data: RwStreamSink::new(message_rx.then(|result| {
// An `Err` happens here if `message_tx` has been dropped. However
// `message_tx` is grabbed by the websocket, which stays alive for as
// long as the `BrowserWsConn` is alive.
match result {
Ok(r) => r,
Err(_) => {
unreachable!("the message channel outlives the BrowserWsConn")
}
}
})),
}));
}
};
// Used for the `close` message of the websocket.
// The websocket can be closed either before or after being opened, so we send an error
// to both the `open` and `message` channels if that happens.
let close_cb = move || {
if let Some(tx) = open_tx.lock().unwrap().take() {
let _ = tx.send(Err(IoError::new(
IoErrorKind::ConnectionRefused,
"close event on the websocket",
)));
}
let _ = message_tx.unbounded_send(Err(IoError::new(
IoErrorKind::ConnectionRefused,
"close event on the websocket",
)));
};
js! {
var socket = @{websocket};
var open_cb = @{open_cb};
var message_cb = @{message_cb};
var close_cb = @{close_cb};
socket.addEventListener("open", function(event) {
open_cb();
});
socket.addEventListener("message", function(event) {
var reader = new FileReader();
reader.addEventListener("loadend", function() {
var typed = new Uint8Array(reader.result);
message_cb(typed);
});
reader.readAsArrayBuffer(event.data);
});
socket.addEventListener("close", function(event) {
close_cb();
});
};
Ok(Box::new(open_rx.then(|result| {
match result {
Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(e),
// `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by
// the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself
// captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should
// never be destroyed.
// TODO: how do we break this cyclic dependency? difficult question
Err(_) => unreachable!("the sending side will only close when we drop the future"),
}
})) as Box<_>)
}
}
pub struct BrowserWsConn {
websocket: Reference,
// Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`.
incoming_data: RwStreamSink<
StreamThen<
mpsc::UnboundedReceiver<Result<Vec<u8>, IoError>>,
fn(Result<Result<Vec<u8>, IoError>, ()>) -> Result<Vec<u8>, IoError>,
Result<Vec<u8>, IoError>,
>,
>,
}
impl Drop for BrowserWsConn {
#[inline]
fn drop(&mut self) {
// TODO: apparently there's a memory leak related to callbacks?
js! { @{&self.websocket}.close(); }
}
}
impl AsyncRead for BrowserWsConn {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.incoming_data.prepare_uninitialized_buffer(buf)
}
fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, IoError> {
self.incoming_data.read_buf(buf)
}
}
impl Read for BrowserWsConn {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
self.incoming_data.read(buf)
}
}
impl AsyncWrite for BrowserWsConn {
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> {
Ok(Async::Ready(()))
}
}
impl Write for BrowserWsConn {
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
let typed_array = TypedArray::from(buf);
// `send` can throw if the websocket isn't open (which can happen if it was closed by the
// remote).
let returned = js! {
try {
@{&self.websocket}.send(@{typed_array.buffer()});
return true;
} catch(e) {
return false;
}
};
match returned {
stdweb::Value::Bool(true) => Ok(buf.len()),
stdweb::Value::Bool(false) => Err(IoError::new(
IoErrorKind::BrokenPipe,
"websocket has been closed by the remote",
)),
_ => unreachable!(),
}
}
#[inline]
fn flush(&mut self) -> Result<(), IoError> {
// Everything is always considered flushed.
Ok(())
}
}
// Tries to interpret the `Multiaddr` as a `/ipN/.../tcp/.../ws` multiaddress, and if so returns
// the corresponding `ws://.../` URL.
fn multiaddr_to_target(addr: &Multiaddr) -> Result<String, ()> {
let protocols: Vec<_> = addr.iter().collect();
if protocols.len() != 3 {
return Err(());
}
match (&protocols[0], &protocols[1], &protocols[2]) {
(&Protocol::Ip4(ref ip), &Protocol::Tcp(port), &Protocol::Ws(ref ws_path)) => {
if ip.is_unspecified() || port == 0 {
return Err(());
}
Ok(format!("ws://{}:{}{}", ip, port, ws_path))
}
(&Protocol::Ip6(ref ip), &Protocol::Tcp(port), &Protocol::Ws(ref ws_path)) => {
if ip.is_unspecified() || port == 0 {
return Err(());
}
Ok(format!("ws://[{}]:{}{}", ip, port, ws_path))
}
(&Protocol::Ip4(ref ip), &Protocol::Tcp(port), &Protocol::Wss(ref ws_path)) => {
if ip.is_unspecified() || port == 0 {
return Err(());
}
Ok(format!("wss://{}:{}{}", ip, port, ws_path))
}
(&Protocol::Ip6(ref ip), &Protocol::Tcp(port), &Protocol::Wss(ref ws_path)) => {
if ip.is_unspecified() || port == 0 {
return Err(());
}
Ok(format!("wss://[{}]:{}{}", ip, port, ws_path))
}
(&Protocol::Dns4(ref ns), &Protocol::Tcp(port), &Protocol::Ws(ref ws_path)) => {
Ok(format!("ws://{}:{}{}", ns, port, ws_path))
}
(&Protocol::Dns6(ref ns), &Protocol::Tcp(port), &Protocol::Ws(ref ws_path)) => {
Ok(format!("ws://{}:{}{}", ns, port, ws_path))
}
(&Protocol::Dns4(ref ns), &Protocol::Tcp(port), &Protocol::Wss(ref ws_path)) => {
Ok(format!("wss://{}:{}{}", ns, port, ws_path))
}
(&Protocol::Dns6(ref ns), &Protocol::Tcp(port), &Protocol::Wss(ref ws_path)) => {
Ok(format!("wss://{}:{}{}", ns, port, ws_path))
}
_ => Err(()),
}
}
// TODO: write tests (tests are very difficult to write with emscripten)
// - remote refuses connection
// - remote closes connection before we receive
// - remote closes connection before we send
// - remote sends text data instead of binary

View File

@ -1,357 +0,0 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::{Future, IntoFuture, Sink, Stream};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::{ListenerEvent, TransportError}
};
use log::{debug, trace};
use rw_stream_sink::RwStreamSink;
use std::{error, fmt};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use tokio_io::{AsyncRead, AsyncWrite};
use websocket::client::builder::ClientBuilder;
use websocket::message::OwnedMessage;
use websocket::server::upgrade::r#async::IntoWs;
use websocket::stream::r#async::Stream as AsyncStream;
/// Represents the configuration for a websocket transport capability for libp2p. Must be put on
/// top of another `Transport`.
///
/// This implementation of `Transport` accepts any address that ends with `/ws` or `/wss`, and will
/// try to pass the underlying multiaddress to the underlying `Transport`.
///
/// Note that the underlying multiaddr is `/dns4/...` or `/dns6/...`, then this library will
/// pass the domain name in the headers of the request. This is important is the listener is behind
/// an HTTP proxy.
///
/// > **Note**: `/wss` is only supported for dialing and not listening.
#[derive(Debug, Clone)]
pub struct WsConfig<T> {
transport: T,
}
impl<T> WsConfig<T> {
/// Creates a new configuration object for websocket.
///
/// The websockets will run on top of the `Transport` you pass as parameter.
#[inline]
pub fn new(inner: T) -> WsConfig<T> {
WsConfig { transport: inner }
}
}
impl<T> Transport for WsConfig<T>
where
// TODO: this 'static is pretty arbitrary and is necessary because of the websocket library
T: Transport + 'static,
T::Error: Send,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
// TODO: this Send is pretty arbitrary and is necessary because of the websocket library
T::Output: AsyncRead + AsyncWrite + Send,
{
type Output = Box<dyn AsyncStream + Send>;
type Error = WsError<T::Error>;
type Listener = Box<dyn Stream<Item = ListenerEvent<Self::ListenerUpgrade>, Error = Self::Error> + Send>;
type ListenerUpgrade = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
type Dial = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
fn listen_on(self, original_addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let mut inner_addr = original_addr.clone();
match inner_addr.pop() {
Some(Protocol::Ws(ref path)) if path == "/" || path.is_empty() => {},
_ => return Err(TransportError::MultiaddrNotSupported(original_addr)),
};
let inner_listen = self.transport.listen_on(inner_addr)
.map_err(|err| err.map(WsError::Underlying))?;
let listen = inner_listen.map_err(WsError::Underlying).map(move |event| {
match event {
ListenerEvent::NewAddress(mut a) => {
a = a.with(Protocol::Ws(From::from("/")));
debug!("Listening on {}", a);
ListenerEvent::NewAddress(a)
}
ListenerEvent::AddressExpired(mut a) => {
a = a.with(Protocol::Ws(From::from("/")));
ListenerEvent::AddressExpired(a)
}
ListenerEvent::Upgrade { upgrade, mut listen_addr, mut remote_addr } => {
listen_addr = listen_addr.with(Protocol::Ws(From::from("/")));
remote_addr = remote_addr.with(Protocol::Ws(From::from("/")));
// Upgrade the listener to websockets like the websockets library requires us to do.
let upgraded = upgrade.map_err(WsError::Underlying).and_then(move |stream| {
debug!("Incoming connection");
stream.into_ws()
.map_err(|e| WsError::WebSocket(Box::new(e.3)))
.and_then(|stream| {
// Accept the next incoming connection.
stream
.accept()
.map_err(|e| WsError::WebSocket(Box::new(e)))
.map(|(client, _http_headers)| {
debug!("Upgraded incoming connection to websockets");
// Plug our own API on top of the `websockets` API.
let framed_data = client
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.sink_map_err(|err| IoError::new(IoErrorKind::Other, err))
.with(|data| Ok(OwnedMessage::Binary(data)))
.and_then(|recv| {
match recv {
OwnedMessage::Binary(data) => Ok(Some(data)),
OwnedMessage::Text(data) => Ok(Some(data.into_bytes())),
OwnedMessage::Close(_) => Ok(None),
// TODO: handle pings and pongs, which is freaking hard
// for now we close the socket when that happens
_ => Ok(None)
}
})
// TODO: is there a way to merge both lines into one?
.take_while(|v| Ok(v.is_some()))
.map(|v| v.expect("we only take while this is Some"));
let read_write = RwStreamSink::new(framed_data);
Box::new(read_write) as Box<dyn AsyncStream + Send>
})
})
.map(|s| Box::new(Ok(s).into_future()) as Box<dyn Future<Item = _, Error = _> + Send>)
.into_future()
.flatten()
});
ListenerEvent::Upgrade {
upgrade: Box::new(upgraded) as Box<dyn Future<Item = _, Error = _> + Send>,
listen_addr,
remote_addr
}
}
}
});
Ok(Box::new(listen) as Box<_>)
}
fn dial(self, original_addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let mut inner_addr = original_addr.clone();
let (ws_path, is_wss) = match inner_addr.pop() {
Some(Protocol::Ws(path)) => (path.into_owned(), false),
Some(Protocol::Wss(path)) => (path.into_owned(), true),
_ => {
trace!(
"Ignoring dial attempt for {} because it is not a websocket multiaddr",
original_addr
);
return Err(TransportError::MultiaddrNotSupported(original_addr));
}
};
debug!("Dialing {} through inner transport", inner_addr);
let ws_addr = client_addr_to_ws(&inner_addr, &ws_path, is_wss);
let inner_dial = self.transport.dial(inner_addr)
.map_err(|err| err.map(WsError::Underlying))?;
let dial = inner_dial
.map_err(WsError::Underlying)
.into_future()
.and_then(move |connec| {
ClientBuilder::new(&ws_addr)
.expect("generated ws address is always valid")
.async_connect_on(connec)
.map_err(|e| WsError::WebSocket(Box::new(e)))
.map(|(client, _)| {
debug!("Upgraded outgoing connection to websockets");
// Plug our own API on top of the API of the websockets library.
let framed_data = client
.map_err(|err| IoError::new(IoErrorKind::Other, err))
.sink_map_err(|err| IoError::new(IoErrorKind::Other, err))
.with(|data| Ok(OwnedMessage::Binary(data)))
.and_then(|recv| {
match recv {
OwnedMessage::Binary(data) => Ok(data),
OwnedMessage::Text(data) => Ok(data.into_bytes()),
// TODO: pings and pongs and close messages need to be
// answered; and this is really hard; for now we produce
// an error when that happens
_ => Err(IoError::new(IoErrorKind::Other, "unimplemented")),
}
});
let read_write = RwStreamSink::new(framed_data);
Box::new(read_write) as Box<dyn AsyncStream + Send>
})
});
Ok(Box::new(dial) as Box<_>)
}
}
/// Error in WebSockets.
#[derive(Debug)]
pub enum WsError<TErr> {
/// Error in the WebSocket layer.
WebSocket(Box<dyn error::Error + Send + Sync>),
/// Error in the transport layer underneath.
Underlying(TErr),
}
impl<TErr> fmt::Display for WsError<TErr>
where TErr: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
WsError::WebSocket(err) => write!(f, "{}", err),
WsError::Underlying(err) => write!(f, "{}", err),
}
}
}
impl<TErr> error::Error for WsError<TErr>
where TErr: error::Error + 'static
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
WsError::WebSocket(err) => Some(&**err),
WsError::Underlying(err) => Some(err),
}
}
}
fn client_addr_to_ws(client_addr: &Multiaddr, ws_path: &str, is_wss: bool) -> String {
let inner = {
let protocols: Vec<_> = client_addr.iter().collect();
if protocols.len() != 2 {
"127.0.0.1".to_owned()
} else {
match (&protocols[0], &protocols[1]) {
(&Protocol::Ip4(ref ip), &Protocol::Tcp(port)) => {
format!("{}:{}", ip, port)
}
(&Protocol::Ip6(ref ip), &Protocol::Tcp(port)) => {
format!("[{}]:{}", ip, port)
}
(&Protocol::Dns4(ref ns), &Protocol::Tcp(port)) => {
format!("{}:{}", ns, port)
}
(&Protocol::Dns6(ref ns), &Protocol::Tcp(port)) => {
format!("{}:{}", ns, port)
}
_ => "127.0.0.1".to_owned(),
}
}
};
if is_wss {
format!("wss://{}{}", inner, ws_path)
} else {
format!("ws://{}{}", inner, ws_path)
}
}
#[cfg(test)]
mod tests {
use libp2p_tcp as tcp;
use tokio::runtime::current_thread::Runtime;
use futures::{Future, Stream};
use libp2p_core::{
Transport,
multiaddr::Protocol,
transport::ListenerEvent
};
use super::WsConfig;
#[test]
fn dialer_connects_to_listener_ipv4() {
let ws_config = WsConfig::new(tcp::TcpConfig::new());
let mut listener = ws_config.clone()
.listen_on("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap())
.unwrap();
let addr = listener.by_ref().wait()
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2));
assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1));
let listener = listener
.filter_map(ListenerEvent::into_upgrade)
.into_future()
.map_err(|(e, _)| e)
.and_then(|(c, _)| c.unwrap().0);
let dialer = ws_config.clone().dial(addr.clone()).unwrap();
let future = listener
.select(dialer)
.map_err(|(e, _)| e)
.and_then(|(_, n)| n);
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
}
#[test]
fn dialer_connects_to_listener_ipv6() {
let ws_config = WsConfig::new(tcp::TcpConfig::new());
let mut listener = ws_config.clone()
.listen_on("/ip6/::1/tcp/0/ws".parse().unwrap())
.unwrap();
let addr = listener.by_ref().wait()
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2));
assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1));
let listener = listener
.filter_map(ListenerEvent::into_upgrade)
.into_future()
.map_err(|(e, _)| e)
.and_then(|(c, _)| c.unwrap().0);
let dialer = ws_config.clone().dial(addr.clone()).unwrap();
let future = listener
.select(dialer)
.map_err(|(e, _)| e)
.and_then(|(_, n)| n);
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
}
}

View File

@ -0,0 +1,76 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use libp2p_core::Multiaddr;
use crate::tls;
use std::{error, fmt};
/// Error in WebSockets.
#[derive(Debug)]
pub enum Error<E> {
/// Error in the transport layer underneath.
Transport(E),
/// A TLS related error.
Tls(tls::Error),
/// Websocket handshake error.
Handshake(Box<dyn error::Error + Send>),
/// The configured maximum of redirects have been made.
TooManyRedirects,
/// A multi-address is not supported.
InvalidMultiaddr(Multiaddr),
/// The location header URL was invalid.
InvalidRedirectLocation,
/// Websocket base framing error.
Base(Box<dyn error::Error + Send>)
}
impl<E: fmt::Display> fmt::Display for Error<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::Transport(err) => write!(f, "{}", err),
Error::Tls(err) => write!(f, "{}", err),
Error::Handshake(err) => write!(f, "{}", err),
Error::InvalidMultiaddr(ma) => write!(f, "invalid multi-address: {}", ma),
Error::TooManyRedirects => f.write_str("too many redirects"),
Error::InvalidRedirectLocation => f.write_str("invalid redirect location"),
Error::Base(err) => write!(f, "{}", err)
}
}
}
impl<E: error::Error + 'static> error::Error for Error<E> {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
Error::Transport(err) => Some(err),
Error::Tls(err) => Some(err),
Error::Handshake(err) => Some(&**err),
Error::Base(err) => Some(&**err),
Error::InvalidMultiaddr(_)
| Error::TooManyRedirects
| Error::InvalidRedirectLocation => None
}
}
}
impl<E> From<tls::Error> for Error<E> {
fn from(e: tls::Error) -> Self {
Error::Tls(e)
}
}

View File

@ -0,0 +1,431 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use bytes::BytesMut;
use crate::{error::Error, tls};
use futures::{future::{self, Either, Loop}, prelude::*, try_ready};
use libp2p_core::{
Transport,
either::EitherOutput,
multiaddr::{Protocol, Multiaddr},
transport::{ListenerEvent, TransportError}
};
use log::{debug, trace};
use tokio_rustls::{client, server};
use soketto::{base, connection::{Connection, Mode}, handshake::{self, Redirect, Response}};
use std::io;
use tokio_codec::{Framed, FramedParts};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_rustls::webpki;
use url::Url;
/// Max. number of payload bytes of a single frame.
const MAX_DATA_SIZE: u64 = 256 * 1024 * 1024;
/// A Websocket transport whose output type is a [`Stream`] and [`Sink`] of
/// frame payloads which does not implement [`AsyncRead`] or
/// [`AsyncWrite`]. See [`crate::WsConfig`] if you require the latter.
#[derive(Debug, Clone)]
pub struct WsConfig<T> {
transport: T,
max_data_size: u64,
tls_config: tls::Config,
max_redirects: u8
}
impl<T> WsConfig<T> {
/// Create a new websocket transport based on another transport.
pub fn new(transport: T) -> Self {
WsConfig {
transport,
max_data_size: MAX_DATA_SIZE,
tls_config: tls::Config::client(),
max_redirects: 0
}
}
/// Return the configured maximum number of redirects.
pub fn max_redirects(&self) -> u8 {
self.max_redirects
}
/// Set max. number of redirects to follow.
pub fn set_max_redirects(&mut self, max: u8) -> &mut Self {
self.max_redirects = max;
self
}
/// Get the max. frame data size we support.
pub fn max_data_size(&self) -> u64 {
self.max_data_size
}
/// Set the max. frame data size we support.
pub fn set_max_data_size(&mut self, size: u64) -> &mut Self {
self.max_data_size = size;
self
}
/// Set the TLS configuration if TLS support is desired.
pub fn set_tls_config(&mut self, c: tls::Config) -> &mut Self {
self.tls_config = c;
self
}
}
impl<T> Transport for WsConfig<T>
where
T: Transport + Send + Clone + 'static,
T::Error: Send + 'static,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Output: AsyncRead + AsyncWrite + Send + 'static
{
type Output = BytesConnection<T::Output>;
type Error = Error<T::Error>;
type Listener = Box<dyn Stream<Item = ListenerEvent<Self::ListenerUpgrade>, Error = Self::Error> + Send>;
type ListenerUpgrade = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
type Dial = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
let mut inner_addr = addr.clone();
let (use_tls, proto) = match inner_addr.pop() {
Some(p@Protocol::Wss(_)) =>
if self.tls_config.server.is_some() {
(true, p)
} else {
debug!("/wss address but TLS server support is not configured");
return Err(TransportError::MultiaddrNotSupported(addr))
}
Some(p@Protocol::Ws(_)) => (false, p),
_ => {
debug!("{} is not a websocket multiaddr", addr);
return Err(TransportError::MultiaddrNotSupported(addr))
}
};
let tls_config = self.tls_config;
let max_size = self.max_data_size;
let listen = self.transport.listen_on(inner_addr)
.map_err(|e| e.map(Error::Transport))?
.map_err(Error::Transport)
.map(move |event| match event {
ListenerEvent::NewAddress(mut a) => {
a = a.with(proto.clone());
debug!("Listening on {}", a);
ListenerEvent::NewAddress(a)
}
ListenerEvent::AddressExpired(mut a) => {
a = a.with(proto.clone());
ListenerEvent::AddressExpired(a)
}
ListenerEvent::Upgrade { upgrade, mut listen_addr, mut remote_addr } => {
listen_addr = listen_addr.with(proto.clone());
remote_addr = remote_addr.with(proto.clone());
let remote1 = remote_addr.clone(); // used for logging
let remote2 = remote_addr.clone(); // used for logging
let tls_config = tls_config.clone();
let upgraded = upgrade.map_err(Error::Transport)
.and_then(move |stream| {
trace!("incoming connection from {}", remote1);
if use_tls { // begin TLS session
let server = tls_config.server.expect("for use_tls we checked server");
trace!("awaiting TLS handshake with {}", remote1);
let future = server.accept(stream)
.map_err(move |e| {
debug!("TLS handshake with {} failed: {}", remote1, e);
Error::Tls(tls::Error::from(e))
})
.map(|s| EitherOutput::First(EitherOutput::Second(s)));
Either::A(future)
} else { // continue with plain stream
Either::B(future::ok(EitherOutput::Second(stream)))
}
})
.and_then(move |stream| {
trace!("receiving websocket handshake request from {}", remote2);
Framed::new(stream, handshake::Server::new())
.into_future()
.map_err(|(e, _framed)| Error::Handshake(Box::new(e)))
.and_then(move |(request, framed)| {
if let Some(r) = request {
trace!("accepting websocket handshake request from {}", remote2);
let key = Vec::from(r.key());
Either::A(framed.send(Ok(handshake::Accept::new(key)))
.map_err(|e| Error::Base(Box::new(e)))
.map(move |f| {
trace!("websocket handshake with {} successful", remote2);
let c = new_connection(f, max_size, Mode::Server);
BytesConnection { inner: c }
}))
} else {
debug!("connection to {} terminated during handshake", remote2);
let e: io::Error = io::ErrorKind::ConnectionAborted.into();
Either::B(future::err(Error::Handshake(Box::new(e))))
}
})
});
ListenerEvent::Upgrade {
upgrade: Box::new(upgraded) as Box<dyn Future<Item = _, Error = _> + Send>,
listen_addr,
remote_addr
}
}
});
Ok(Box::new(listen) as Box<_>)
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
// Quick sanity check of the provided Multiaddr.
if let Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) = addr.iter().last() {
// ok
} else {
debug!("{} is not a websocket multiaddr", addr);
return Err(TransportError::MultiaddrNotSupported(addr))
}
// We are looping here in order to follow redirects (if any):
let max_redirects = self.max_redirects;
let future = future::loop_fn((addr, self, max_redirects), |(addr, cfg, remaining)| {
dial(addr, cfg.clone()).and_then(move |result| match result {
Either::A(redirect) => {
if remaining == 0 {
debug!("too many redirects");
return Err(Error::TooManyRedirects)
}
let a = location_to_multiaddr(redirect.location())?;
Ok(Loop::Continue((a, cfg, remaining - 1)))
}
Either::B(conn) => Ok(Loop::Break(conn))
})
});
Ok(Box::new(future) as Box<_>)
}
}
/// Attempty to dial the given address and perform a websocket handshake.
fn dial<T>(address: Multiaddr, config: WsConfig<T>)
-> impl Future<Item = Either<Redirect, BytesConnection<T::Output>>, Error = Error<T::Error>>
where
T: Transport,
T::Output: AsyncRead + AsyncWrite
{
trace!("dial address: {}", address);
let WsConfig { transport, max_data_size, tls_config, .. } = config;
let (host_port, dns_name) = match host_and_dnsname(&address) {
Ok(x) => x,
Err(e) => return Either::A(future::err(e))
};
let mut inner_addr = address.clone();
let (use_tls, path) = match inner_addr.pop() {
Some(Protocol::Ws(path)) => (false, path),
Some(Protocol::Wss(path)) => {
if dns_name.is_none() {
debug!("no DNS name in {}", address);
return Either::A(future::err(Error::InvalidMultiaddr(address)))
}
(true, path)
}
_ => {
debug!("{} is not a websocket multiaddr", address);
return Either::A(future::err(Error::InvalidMultiaddr(address)))
}
};
let dial = match transport.dial(inner_addr) {
Ok(dial) => dial,
Err(TransportError::MultiaddrNotSupported(a)) =>
return Either::A(future::err(Error::InvalidMultiaddr(a))),
Err(TransportError::Other(e)) =>
return Either::A(future::err(Error::Transport(e)))
};
let address1 = address.clone(); // used for logging
let address2 = address.clone(); // used for logging
let future = dial.map_err(Error::Transport)
.and_then(move |stream| {
trace!("connected to {}", address);
if use_tls { // begin TLS session
let dns_name = dns_name.expect("for use_tls we have checked that dns_name is some");
trace!("starting TLS handshake with {}", address);
let future = tls_config.client.connect(dns_name.as_ref(), stream)
.map_err(move |e| {
debug!("TLS handshake with {} failed: {}", address, e);
Error::Tls(tls::Error::from(e))
})
.map(|s| EitherOutput::First(EitherOutput::First(s)));
return Either::A(future)
}
// continue with plain stream
Either::B(future::ok(EitherOutput::Second(stream)))
})
.and_then(move |stream| {
trace!("sending websocket handshake request to {}", address1);
let client = handshake::Client::new(host_port, path);
Framed::new(stream, client)
.send(())
.map_err(|e| Error::Handshake(Box::new(e)))
.and_then(move |framed| {
trace!("awaiting websocket handshake response form {}", address2);
framed.into_future().map_err(|(e, _)| Error::Base(Box::new(e)))
})
.and_then(move |(response, framed)| {
match response {
None => {
debug!("connection to {} terminated during handshake", address1);
let e: io::Error = io::ErrorKind::ConnectionAborted.into();
return Err(Error::Handshake(Box::new(e)))
}
Some(Response::Redirect(r)) => {
debug!("received {}", r);
return Ok(Either::A(r))
}
Some(Response::Accepted(_)) => {
trace!("websocket handshake with {} successful", address1)
}
}
let c = new_connection(framed, max_data_size, Mode::Client);
Ok(Either::B(BytesConnection { inner: c }))
})
});
Either::B(future)
}
// Extract host, port and optionally the DNS name from the given [`Multiaddr`].
fn host_and_dnsname<T>(addr: &Multiaddr) -> Result<(String, Option<webpki::DNSName>), Error<T>> {
let mut iter = addr.iter();
match (iter.next(), iter.next()) {
(Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", ip, port), None)),
(Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", ip, port), None)),
(Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))),
(Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) =>
Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))),
_ => {
debug!("multi-address format not supported: {}", addr);
Err(Error::InvalidMultiaddr(addr.clone()))
}
}
}
// Given a location URL, build a new websocket [`Multiaddr`].
fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
match Url::parse(location) {
Ok(url) => {
let mut a = Multiaddr::empty();
match url.host() {
Some(url::Host::Domain(h)) => {
a.push(Protocol::Dns4(h.into()))
}
Some(url::Host::Ipv4(ip)) => {
a.push(Protocol::Ip4(ip))
}
Some(url::Host::Ipv6(ip)) => {
a.push(Protocol::Ip6(ip))
}
None => return Err(Error::InvalidRedirectLocation)
}
if let Some(p) = url.port() {
a.push(Protocol::Tcp(p))
}
let s = url.scheme();
if s.eq_ignore_ascii_case("https") | s.eq_ignore_ascii_case("wss") {
a.push(Protocol::Wss(url.path().into()))
} else if s.eq_ignore_ascii_case("http") | s.eq_ignore_ascii_case("ws") {
a.push(Protocol::Ws(url.path().into()))
} else {
debug!("unsupported scheme: {}", s);
return Err(Error::InvalidRedirectLocation)
}
Ok(a)
}
Err(e) => {
debug!("failed to parse url as multi-address: {:?}", e);
Err(Error::InvalidRedirectLocation)
}
}
}
/// Create a `Connection` from an existing `Framed` value.
fn new_connection<T, C>(framed: Framed<T, C>, max_size: u64, mode: Mode) -> Connection<T>
where
T: AsyncRead + AsyncWrite
{
let mut codec = base::Codec::new();
codec.set_max_data_size(max_size);
let old = framed.into_parts();
let mut new = FramedParts::new(old.io, codec);
new.read_buf = old.read_buf;
new.write_buf = old.write_buf;
let framed = Framed::from_parts(new);
Connection::from_framed(framed, mode)
}
// BytesConnection ////////////////////////////////////////////////////////////////////////////////
/// A [`Stream`] and [`Sink`] that produces and consumes [`BytesMut`] values
/// which correspond to the payload data of websocket frames.
#[derive(Debug)]
pub struct BytesConnection<T> {
inner: Connection<EitherOutput<EitherOutput<client::TlsStream<T>, server::TlsStream<T>>, T>>
}
impl<T: AsyncRead + AsyncWrite> Stream for BytesConnection<T> {
type Item = BytesMut;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let data = try_ready!(self.inner.poll().map_err(|e| io::Error::new(io::ErrorKind::Other, e)));
Ok(Async::Ready(data.map(base::Data::into_bytes)))
}
}
impl<T: AsyncRead + AsyncWrite> Sink for BytesConnection<T> {
type SinkItem = BytesMut;
type SinkError = io::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
let result = self.inner.start_send(base::Data::Binary(item))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
if let AsyncSink::NotReady(data) = result? {
Ok(AsyncSink::NotReady(data.into_bytes()))
} else {
Ok(AsyncSink::Ready)
}
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete().map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.inner.close().map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
}

View File

@ -1,4 +1,4 @@
// Copyright 2017 Parity Technologies (UK) Ltd. // Copyright 2017-2019 Parity Technologies (UK) Ltd.
// //
// Permission is hereby granted, free of charge, to any person obtaining a // Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"), // copy of this software and associated documentation files (the "Software"),
@ -18,66 +18,198 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
#![recursion_limit = "512"]
//! Implementation of the libp2p `Transport` trait for Websockets. //! Implementation of the libp2p `Transport` trait for Websockets.
//!
//! See the documentation of `swarm` and of libp2p in general to learn how to use the `Transport`
//! trait.
//!
//! This library is used in a different way depending on whether you are compiling for emscripten
//! or for a different operating system.
//!
//! # Emscripten
//!
//! On emscripten, you can create a `BrowserWsConfig` object with `BrowserWsConfig::new()`. It can
//! then be used as a transport.
//!
//! Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress
//! which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object.
//!
//! ```ignore
//! use libp2p_websocket::BrowserWsConfig;
//!
//! let ws_config = BrowserWsConfig::new();
//! // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap());
//! ```
//!
//! # Other operating systems
//!
//! On other operating systems, this library doesn't open any socket by itself. Instead it must be
//! plugged on top of another implementation of `Transport` such as TCP/IP.
//!
//! This underlying transport must be put inside a `WsConfig` object through the
//! `WsConfig::new()` function.
//!
//! ```
//! extern crate libp2p_core;
//! extern crate libp2p_tcp;
//! extern crate libp2p_websocket;
//!
//! use libp2p_core::{Multiaddr, Transport};
//! use libp2p_tcp::TcpConfig;
//! use libp2p_websocket::WsConfig;
//!
//! # fn main() {
//! let ws_config = WsConfig::new(TcpConfig::new());
//! # return;
//! let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap());
//! # }
//! ```
//!
#[cfg(any(target_os = "emscripten", target_os = "unknown"))] pub mod error;
#[macro_use] pub mod framed;
extern crate stdweb; pub mod tls;
#[cfg(any(target_os = "emscripten", target_os = "unknown"))] use error::Error;
mod browser; use framed::BytesConnection;
#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] use futures::prelude::*;
mod desktop; use libp2p_core::{
ConnectedPoint,
Transport,
multiaddr::Multiaddr,
transport::{map::{MapFuture, MapStream}, ListenerEvent, TransportError}
};
use rw_stream_sink::RwStreamSink;
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(any(target_os = "emscripten", target_os = "unknown"))] /// A Websocket transport.
pub use self::browser::{BrowserWsConfig, BrowserWsConn}; #[derive(Debug, Clone)]
#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] pub struct WsConfig<T> {
pub use self::desktop::WsConfig; transport: framed::WsConfig<T>
}
impl<T> WsConfig<T> {
/// Create a new websocket transport based on the given transport.
pub fn new(transport: T) -> Self {
framed::WsConfig::new(transport).into()
}
/// Return the configured maximum number of redirects.
pub fn max_redirects(&self) -> u8 {
self.transport.max_redirects()
}
/// Set max. number of redirects to follow.
pub fn set_max_redirects(&mut self, max: u8) -> &mut Self {
self.transport.set_max_redirects(max);
self
}
/// Get the max. frame data size we support.
pub fn max_data_size(&self) -> u64 {
self.transport.max_data_size()
}
/// Set the max. frame data size we support.
pub fn set_max_data_size(&mut self, size: u64) -> &mut Self {
self.transport.set_max_data_size(size);
self
}
/// Set the TLS configuration if TLS support is desired.
pub fn set_tls_config(&mut self, c: tls::Config) -> &mut Self {
self.transport.set_tls_config(c);
self
}
}
impl<T> From<framed::WsConfig<T>> for WsConfig<T> {
fn from(framed: framed::WsConfig<T>) -> Self {
WsConfig {
transport: framed
}
}
}
impl<T> Transport for WsConfig<T>
where
T: Transport + Send + Clone + 'static,
T::Error: Send + 'static,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Output: AsyncRead + AsyncWrite + Send + 'static
{
type Output = RwStreamSink<BytesConnection<T::Output>>;
type Error = Error<T::Error>;
type Listener = MapStream<InnerStream<T::Output, T::Error>, WrapperFn<T::Output>>;
type ListenerUpgrade = MapFuture<InnerFuture<T::Output, T::Error>, WrapperFn<T::Output>>;
type Dial = MapFuture<InnerFuture<T::Output, T::Error>, WrapperFn<T::Output>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.transport.map(wrap_connection as WrapperFn<T::Output>).listen_on(addr)
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.transport.map(wrap_connection as WrapperFn<T::Output>).dial(addr)
}
}
/// Type alias corresponding to `framed::WsConfig::Listener`.
pub type InnerStream<T, E> =
Box<(dyn Stream<Error = Error<E>, Item = ListenerEvent<InnerFuture<T, E>>> + Send)>;
/// Type alias corresponding to `framed::WsConfig::Dial` and `framed::WsConfig::ListenerUpgrade`.
pub type InnerFuture<T, E> =
Box<(dyn Future<Item = BytesConnection<T>, Error = Error<E>> + Send)>;
/// Function type that wraps a websocket connection (see. `wrap_connection`).
pub type WrapperFn<T> =
fn(BytesConnection<T>, ConnectedPoint) -> RwStreamSink<BytesConnection<T>>;
/// Wrap a websocket connection producing data frames into a `RwStreamSink`
/// implementing `AsyncRead` + `AsyncWrite`.
fn wrap_connection<T>(c: BytesConnection<T>, _: ConnectedPoint) -> RwStreamSink<BytesConnection<T>>
where
T: AsyncRead + AsyncWrite
{
RwStreamSink::new(c)
}
// Tests //////////////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]
mod tests {
use libp2p_tcp as tcp;
use tokio::runtime::current_thread::Runtime;
use futures::{Future, Stream};
use libp2p_core::{
Transport,
multiaddr::Protocol,
transport::ListenerEvent
};
use super::WsConfig;
#[test]
fn dialer_connects_to_listener_ipv4() {
let ws_config = WsConfig::new(tcp::TcpConfig::new());
let mut listener = ws_config.clone()
.listen_on("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap())
.unwrap();
let addr = listener.by_ref().wait()
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2));
assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1));
let listener = listener
.filter_map(ListenerEvent::into_upgrade)
.into_future()
.map_err(|(e, _)| e)
.and_then(|(c, _)| c.unwrap().0);
let dialer = ws_config.clone().dial(addr.clone()).unwrap();
let future = listener
.select(dialer)
.map_err(|(e, _)| e)
.and_then(|(_, n)| n);
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
}
#[test]
fn dialer_connects_to_listener_ipv6() {
let ws_config = WsConfig::new(tcp::TcpConfig::new());
let mut listener = ws_config.clone()
.listen_on("/ip6/::1/tcp/0/ws".parse().unwrap())
.unwrap();
let addr = listener.by_ref().wait()
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2));
assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1));
let listener = listener
.filter_map(ListenerEvent::into_upgrade)
.into_future()
.map_err(|(e, _)| e)
.and_then(|(c, _)| c.unwrap().0);
let dialer = ws_config.clone().dial(addr.clone()).unwrap();
let future = listener
.select(dialer)
.map_err(|(e, _)| e)
.and_then(|(_, n)| n);
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
}
}

View File

@ -0,0 +1,176 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::{fmt, io, sync::Arc};
use tokio_rustls::{
TlsConnector,
TlsAcceptor,
rustls,
webpki
};
/// TLS configuration.
#[derive(Clone)]
pub struct Config {
pub(crate) client: TlsConnector,
pub(crate) server: Option<TlsAcceptor>
}
impl fmt::Debug for Config {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Config")
}
}
/// Private key, DER-encoded ASN.1 in either PKCS#8 or PKCS#1 format.
#[derive(Clone)]
pub struct PrivateKey(rustls::PrivateKey);
impl PrivateKey {
/// Assert the given bytes are DER-encoded ASN.1 in either PKCS#8 or PKCS#1 format.
pub fn new(bytes: Vec<u8>) -> Self {
PrivateKey(rustls::PrivateKey(bytes))
}
}
/// Certificate, DER-encoded X.509 format.
#[derive(Debug, Clone)]
pub struct Certificate(rustls::Certificate);
impl Certificate {
/// Assert the given bytes are in DER-encoded X.509 format.
pub fn new(bytes: Vec<u8>) -> Self {
Certificate(rustls::Certificate(bytes))
}
}
impl Config {
/// Create a new TLS configuration with the given server key and certificate chain.
pub fn new<I>(key: PrivateKey, certs: I) -> Result<Self, Error>
where
I: IntoIterator<Item = Certificate>
{
let mut builder = Config::builder();
builder.server(key, certs)?;
Ok(builder.finish())
}
/// Create a client-only configuration.
pub fn client() -> Self {
Config {
client: Arc::new(client_config()).into(),
server: None
}
}
/// Create a new TLS configuration builder.
pub fn builder() -> Builder {
Builder { client: client_config(), server: None }
}
}
/// Setup the rustls client configuration.
fn client_config() -> rustls::ClientConfig {
let mut client = rustls::ClientConfig::new();
client.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
client
}
/// TLS configuration builder.
pub struct Builder {
client: rustls::ClientConfig,
server: Option<rustls::ServerConfig>
}
impl Builder {
/// Set server key and certificate chain.
pub fn server<I>(&mut self, key: PrivateKey, certs: I) -> Result<&mut Self, Error>
where
I: IntoIterator<Item = Certificate>
{
let mut server = rustls::ServerConfig::new(rustls::NoClientAuth::new());
let certs = certs.into_iter().map(|c| c.0).collect();
server.set_single_cert(certs, key.0).map_err(|e| Error::Tls(Box::new(e)))?;
self.server = Some(server);
Ok(self)
}
/// Add an additional trust anchor.
pub fn add_trust(&mut self, cert: &Certificate) -> Result<&mut Self, Error> {
self.client.root_store.add(&cert.0).map_err(|e| Error::Tls(Box::new(e)))?;
Ok(self)
}
/// Finish configuration.
pub fn finish(self) -> Config {
Config {
client: Arc::new(self.client).into(),
server: self.server.map(|s| Arc::new(s).into())
}
}
}
pub(crate) fn dns_name_ref(name: &str) -> Result<webpki::DNSNameRef<'_>, Error> {
webpki::DNSNameRef::try_from_ascii_str(name).map_err(|()| Error::InvalidDnsName(name.into()))
}
// Error //////////////////////////////////////////////////////////////////////////////////////////
/// TLS related errors.
#[derive(Debug)]
pub enum Error {
/// An underlying I/O error.
Io(io::Error),
/// Actual TLS error.
Tls(Box<dyn std::error::Error + Send>),
/// The DNS name was invalid.
InvalidDnsName(String),
#[doc(hidden)]
__Nonexhaustive
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Io(e) => write!(f, "i/o error: {}", e),
Error::Tls(e) => write!(f, "tls error: {}", e),
Error::InvalidDnsName(n) => write!(f, "invalid DNS name: {}", n),
Error::__Nonexhaustive => f.write_str("__Nonexhaustive")
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Error::Io(e) => Some(e),
Error::Tls(e) => Some(&**e),
Error::InvalidDnsName(_) | Error::__Nonexhaustive => None
}
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}