diff --git a/Cargo.toml b/Cargo.toml index 19617f49..be275e6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,6 @@ libp2p-core-derive = { version = "0.8.0", path = "./misc/core-derive" } libp2p-secio = { version = "0.8.0", path = "./protocols/secio", default-features = false } libp2p-uds = { version = "0.8.0", path = "./transports/uds" } libp2p-wasm-ext = { version = "0.1.0", path = "./transports/wasm-ext" } -libp2p-websocket = { version = "0.8.0", path = "./transports/websocket", optional = true } libp2p-yamux = { version = "0.8.0", path = "./muxers/yamux" } parking_lot = "0.8" smallvec = "0.6" @@ -46,6 +45,7 @@ libp2p-dns = { version = "0.8.0", path = "./transports/dns" } libp2p-mdns = { version = "0.8.0", path = "./misc/mdns" } libp2p-noise = { version = "0.6.0", path = "./protocols/noise" } libp2p-tcp = { version = "0.8.0", path = "./transports/tcp" } +libp2p-websocket = { version = "0.8.0", path = "./transports/websocket", optional = true } [dev-dependencies] env_logger = "0.6.0" diff --git a/core/src/either.rs b/core/src/either.rs index 5a40c624..df992f92 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -88,7 +88,6 @@ where A: Read, B: Read, { - #[inline] fn read(&mut self, buf: &mut [u8]) -> Result { match self { EitherOutput::First(a) => a.read(buf), @@ -102,7 +101,6 @@ where A: AsyncWrite, B: AsyncWrite, { - #[inline] fn shutdown(&mut self) -> Poll<(), IoError> { match self { EitherOutput::First(a) => a.shutdown(), @@ -116,7 +114,6 @@ where A: Write, B: Write, { - #[inline] fn write(&mut self, buf: &[u8]) -> Result { match self { EitherOutput::First(a) => a.write(buf), @@ -124,7 +121,6 @@ where } } - #[inline] fn flush(&mut self) -> Result<(), IoError> { match self { EitherOutput::First(a) => a.flush(), @@ -302,7 +298,6 @@ where type Item = ListenerEvent>; type Error = EitherError; - #[inline] fn poll(&mut self) -> Poll, Self::Error> { match self { EitherListenStream::First(a) => a.poll() @@ -331,7 +326,6 @@ where type Item = EitherOutput; type Error = EitherError; - #[inline] fn poll(&mut self) -> Poll { match self { EitherFuture::First(a) => a.poll().map(|v| v.map(EitherOutput::First)).map_err(EitherError::A), diff --git a/misc/rw-stream-sink/src/lib.rs b/misc/rw-stream-sink/src/lib.rs index 1d06458c..d73cb5d6 100644 --- a/misc/rw-stream-sink/src/lib.rs +++ b/misc/rw-stream-sink/src/lib.rs @@ -51,7 +51,6 @@ where S::Item: IntoBuf, { /// Wraps around `inner`. - #[inline] pub fn new(inner: S) -> RwStreamSink { RwStreamSink { inner, current_item: None } } @@ -102,7 +101,6 @@ where S::SinkItem: for<'r> From<&'r [u8]>, S::Item: IntoBuf, { - #[inline] fn write(&mut self, buf: &[u8]) -> Result { let len = buf.len(); match self.inner.start_send(buf.into())? { @@ -111,7 +109,6 @@ where } } - #[inline] fn flush(&mut self) -> Result<(), IoError> { match self.inner.poll_complete()? { Async::Ready(()) => Ok(()), @@ -126,7 +123,6 @@ where S::SinkItem: for<'r> From<&'r [u8]>, S::Item: IntoBuf, { - #[inline] fn shutdown(&mut self) -> Poll<(), IoError> { self.inner.close() } diff --git a/src/lib.rs b/src/lib.rs index a2075e99..ca00d2f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -197,7 +197,7 @@ pub use libp2p_tcp as tcp; pub use libp2p_uds as uds; #[doc(inline)] pub use libp2p_wasm_ext as wasm_ext; -#[cfg(feature = "libp2p-websocket")] +#[cfg(all(feature = "libp2p-websocket", not(any(target_os = "emscripten", target_os = "unknown"))))] #[doc(inline)] pub use libp2p_websocket as websocket; #[doc(inline)] @@ -270,9 +270,7 @@ struct CommonTransport { type InnerImplementation = core::transport::OrTransport, websocket::WsConfig>>; #[cfg(all(not(any(target_os = "emscripten", target_os = "unknown")), not(feature = "libp2p-websocket")))] type InnerImplementation = dns::DnsConfig; -#[cfg(all(any(target_os = "emscripten", target_os = "unknown"), feature = "libp2p-websocket"))] -type InnerImplementation = websocket::BrowserWsConfig; -#[cfg(all(any(target_os = "emscripten", target_os = "unknown"), not(feature = "libp2p-websocket")))] +#[cfg(any(target_os = "emscripten", target_os = "unknown"))] type InnerImplementation = core::transport::dummy::DummyTransport; #[derive(Debug, Clone)] @@ -298,16 +296,7 @@ impl CommonTransport { } /// Initializes the `CommonTransport`. - #[cfg(all(any(target_os = "emscripten", target_os = "unknown"), feature = "libp2p-websocket"))] - pub fn new() -> CommonTransport { - let inner = websocket::BrowserWsConfig::new(); - CommonTransport { - inner: CommonTransportInner { inner } - } - } - - /// Initializes the `CommonTransport`. - #[cfg(all(any(target_os = "emscripten", target_os = "unknown"), not(feature = "libp2p-websocket")))] + #[cfg(any(target_os = "emscripten", target_os = "unknown"))] pub fn new() -> CommonTransport { let inner = core::transport::dummy::DummyTransport::new(); CommonTransport { diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index b6ff575c..5a4213a9 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -10,20 +10,18 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-core = { version = "0.8.0", path = "../../core" } +bytes = "0.4.6" futures = "0.1" +libp2p-core = { version = "0.8.0", path = "../../core" } log = "0.4.1" rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" } -tokio-io = "0.1" +tokio-codec = "0.1.1" +tokio-io = "0.1.12" +tokio-rustls = "0.10.0-alpha.3" +soketto = { git = "https://github.com/paritytech/soketto.git" } +url = "1.7.2" +webpki-roots = "0.16.0" -[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] -websocket = { version = "0.22.2", default-features = false, features = ["async", "async-ssl"] } - -[target.'cfg(any(target_os = "emscripten", target_os = "unknown"))'.dependencies] -bytes = "0.4" -stdweb = { version = "0.4", default-features = false } -wasm-bindgen = "0.2.42" - -[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dev-dependencies] +[dev-dependencies] libp2p-tcp = { version = "0.8.0", path = "../tcp" } -tokio = "0.1" +tokio = "0.1.20" diff --git a/transports/websocket/src/browser.rs b/transports/websocket/src/browser.rs deleted file mode 100644 index e9998c96..00000000 --- a/transports/websocket/src/browser.rs +++ /dev/null @@ -1,345 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use log::debug; -use futures::{future, stream}; -use futures::stream::Then as StreamThen; -use futures::sync::{mpsc, oneshot}; -use futures::{Async, Future, Poll, Stream}; -use rw_stream_sink::RwStreamSink; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::io::{Read, Write}; -use std::sync::{Arc, Mutex}; -use stdweb::web::TypedArray; -use stdweb::{self, Reference}; -use libp2p_core::{ - Transport, - multiaddr::{Protocol, Multiaddr}, - transport::{ListenerEvent, TransportError} -}; -use tokio_io::{AsyncRead, AsyncWrite}; - -/// Represents the configuration for a websocket transport capability for libp2p. -/// -/// This implementation of `Transport` accepts any address that looks like -/// `/ip4/.../tcp/.../ws`, `/ip6/.../tcp/.../ws`, `/dns4/.../ws` or `/dns6/.../ws`, and connect to -/// the corresponding IP and port. -/// -/// If the underlying multiaddress uses `/dns4` or `/dns6`, then the domain name will be passed in -/// the headers of the request. This is important is the listener is behind an HTTP proxy. -#[derive(Debug, Clone)] -pub struct BrowserWsConfig; - -impl BrowserWsConfig { - /// Creates a new configuration object for websocket. - #[inline] - pub fn new() -> BrowserWsConfig { - BrowserWsConfig - } -} - -impl Transport for BrowserWsConfig { - type Output = BrowserWsConn; - type Error = IoError; // TODO: better error type? - type Listener = stream::Empty, IoError>; - type ListenerUpgrade = future::Empty; - type Dial = Box + Send>; - - fn listen_on(self, a: Multiaddr) -> Result> { - // Listening is never supported. - Err(TransportError::MultiaddrNotSupported(a)) - } - - fn dial(self, original_addr: Multiaddr) -> Result> { - // Making sure we are initialized before we dial. Initialization is protected by a simple - // boolean static variable, so it's not a problem to call it multiple times and the cost - // is negligible. - stdweb::initialize(); - - // Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as - // a string) on success. - let inner_addr = match multiaddr_to_target(&original_addr) { - Ok(a) => a, - Err(_) => return Err(TransportError::MultiaddrNotSupported(original_addr)), - }; - - debug!("Dialing {}", original_addr); - - // Create the JS `WebSocket` object. - let websocket = { - let val = js! { - try { - return new WebSocket(@{inner_addr}); - } catch(e) { - return false; - } - }; - match val.into_reference() { - Some(ws) => ws, - // TODO: more descriptive error - None => return Err(TransportError::Other(IoErrorKind::Other.into())), // `false` was returned by `js!` - } - }; - - // Create a `message` channel that will be used for both bytes messages and errors, and a - // `message_cb` used for the `message` event on the WebSocket. - // `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed - // by `open_cb`. - let (message_tx, message_rx) = mpsc::unbounded::, IoError>>(); - let message_tx = Arc::new(message_tx); - let mut message_rx = Some(message_rx); - let message_cb = { - let message_tx = message_tx.clone(); - move |message_data: Reference| { - if let Some(buffer) = message_data.downcast::>() { - let _ = message_tx.unbounded_send(Ok(buffer.to_vec())); - } else { - let _ = message_tx.unbounded_send(Err(IoError::new( - IoErrorKind::InvalidData, - "received ws message of unknown type", - ))); - } - } - }; - - // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents - // the open dialing websocket. Also create a `open_cb` callback that will be used for the - // `open` message of the websocket. - let (open_tx, open_rx) = oneshot::channel::>(); - let open_tx = Arc::new(Mutex::new(Some(open_tx))); - let websocket_clone = websocket.clone(); - let open_cb = { - let open_tx = open_tx.clone(); - move || { - // Note that `open_tx` can be empty (and a panic happens) if the `open` event - // is triggered twice, or is triggered after the `close` event. We never reuse the - // same websocket twice, so this is not supposed to happen. - let tx = open_tx - .lock() - .unwrap() - .take() - .expect("the websocket can only open once"); - // `message_rx` can be empty if the `open` event is triggered twice, which again - // is not supposed to happen. - let message_rx = message_rx.take().expect("the websocket can only open once"); - - // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that - // would happen the future has been dropped by the user. - let _ = tx.send(Ok(BrowserWsConn { - websocket: websocket_clone.clone(), - incoming_data: RwStreamSink::new(message_rx.then(|result| { - // An `Err` happens here if `message_tx` has been dropped. However - // `message_tx` is grabbed by the websocket, which stays alive for as - // long as the `BrowserWsConn` is alive. - match result { - Ok(r) => r, - Err(_) => { - unreachable!("the message channel outlives the BrowserWsConn") - } - } - })), - })); - } - }; - - // Used for the `close` message of the websocket. - // The websocket can be closed either before or after being opened, so we send an error - // to both the `open` and `message` channels if that happens. - let close_cb = move || { - if let Some(tx) = open_tx.lock().unwrap().take() { - let _ = tx.send(Err(IoError::new( - IoErrorKind::ConnectionRefused, - "close event on the websocket", - ))); - } - - let _ = message_tx.unbounded_send(Err(IoError::new( - IoErrorKind::ConnectionRefused, - "close event on the websocket", - ))); - }; - - js! { - var socket = @{websocket}; - var open_cb = @{open_cb}; - var message_cb = @{message_cb}; - var close_cb = @{close_cb}; - socket.addEventListener("open", function(event) { - open_cb(); - }); - socket.addEventListener("message", function(event) { - var reader = new FileReader(); - reader.addEventListener("loadend", function() { - var typed = new Uint8Array(reader.result); - message_cb(typed); - }); - reader.readAsArrayBuffer(event.data); - }); - socket.addEventListener("close", function(event) { - close_cb(); - }); - }; - - Ok(Box::new(open_rx.then(|result| { - match result { - Ok(Ok(r)) => Ok(r), - Ok(Err(e)) => Err(e), - // `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by - // the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself - // captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should - // never be destroyed. - // TODO: how do we break this cyclic dependency? difficult question - Err(_) => unreachable!("the sending side will only close when we drop the future"), - } - })) as Box<_>) - } -} - -pub struct BrowserWsConn { - websocket: Reference, - // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. - incoming_data: RwStreamSink< - StreamThen< - mpsc::UnboundedReceiver, IoError>>, - fn(Result, IoError>, ()>) -> Result, IoError>, - Result, IoError>, - >, - >, -} - -impl Drop for BrowserWsConn { - #[inline] - fn drop(&mut self) { - // TODO: apparently there's a memory leak related to callbacks? - js! { @{&self.websocket}.close(); } - } -} - -impl AsyncRead for BrowserWsConn { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.incoming_data.prepare_uninitialized_buffer(buf) - } - - fn read_buf(&mut self, buf: &mut B) -> Poll { - self.incoming_data.read_buf(buf) - } -} - -impl Read for BrowserWsConn { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { - self.incoming_data.read(buf) - } -} - -impl AsyncWrite for BrowserWsConn { - #[inline] - fn shutdown(&mut self) -> Poll<(), IoError> { - Ok(Async::Ready(())) - } -} - -impl Write for BrowserWsConn { - fn write(&mut self, buf: &[u8]) -> Result { - let typed_array = TypedArray::from(buf); - - // `send` can throw if the websocket isn't open (which can happen if it was closed by the - // remote). - let returned = js! { - try { - @{&self.websocket}.send(@{typed_array.buffer()}); - return true; - } catch(e) { - return false; - } - }; - - match returned { - stdweb::Value::Bool(true) => Ok(buf.len()), - stdweb::Value::Bool(false) => Err(IoError::new( - IoErrorKind::BrokenPipe, - "websocket has been closed by the remote", - )), - _ => unreachable!(), - } - } - - #[inline] - fn flush(&mut self) -> Result<(), IoError> { - // Everything is always considered flushed. - Ok(()) - } -} - -// Tries to interpret the `Multiaddr` as a `/ipN/.../tcp/.../ws` multiaddress, and if so returns -// the corresponding `ws://.../` URL. -fn multiaddr_to_target(addr: &Multiaddr) -> Result { - let protocols: Vec<_> = addr.iter().collect(); - - if protocols.len() != 3 { - return Err(()); - } - - match (&protocols[0], &protocols[1], &protocols[2]) { - (&Protocol::Ip4(ref ip), &Protocol::Tcp(port), &Protocol::Ws(ref ws_path)) => { - if ip.is_unspecified() || port == 0 { - return Err(()); - } - Ok(format!("ws://{}:{}{}", ip, port, ws_path)) - } - (&Protocol::Ip6(ref ip), &Protocol::Tcp(port), &Protocol::Ws(ref ws_path)) => { - if ip.is_unspecified() || port == 0 { - return Err(()); - } - Ok(format!("ws://[{}]:{}{}", ip, port, ws_path)) - } - (&Protocol::Ip4(ref ip), &Protocol::Tcp(port), &Protocol::Wss(ref ws_path)) => { - if ip.is_unspecified() || port == 0 { - return Err(()); - } - Ok(format!("wss://{}:{}{}", ip, port, ws_path)) - } - (&Protocol::Ip6(ref ip), &Protocol::Tcp(port), &Protocol::Wss(ref ws_path)) => { - if ip.is_unspecified() || port == 0 { - return Err(()); - } - Ok(format!("wss://[{}]:{}{}", ip, port, ws_path)) - } - (&Protocol::Dns4(ref ns), &Protocol::Tcp(port), &Protocol::Ws(ref ws_path)) => { - Ok(format!("ws://{}:{}{}", ns, port, ws_path)) - } - (&Protocol::Dns6(ref ns), &Protocol::Tcp(port), &Protocol::Ws(ref ws_path)) => { - Ok(format!("ws://{}:{}{}", ns, port, ws_path)) - } - (&Protocol::Dns4(ref ns), &Protocol::Tcp(port), &Protocol::Wss(ref ws_path)) => { - Ok(format!("wss://{}:{}{}", ns, port, ws_path)) - } - (&Protocol::Dns6(ref ns), &Protocol::Tcp(port), &Protocol::Wss(ref ws_path)) => { - Ok(format!("wss://{}:{}{}", ns, port, ws_path)) - } - _ => Err(()), - } -} - -// TODO: write tests (tests are very difficult to write with emscripten) -// - remote refuses connection -// - remote closes connection before we receive -// - remote closes connection before we send -// - remote sends text data instead of binary diff --git a/transports/websocket/src/desktop.rs b/transports/websocket/src/desktop.rs deleted file mode 100644 index 2714761a..00000000 --- a/transports/websocket/src/desktop.rs +++ /dev/null @@ -1,357 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::{Future, IntoFuture, Sink, Stream}; -use libp2p_core::{ - Transport, - multiaddr::{Protocol, Multiaddr}, - transport::{ListenerEvent, TransportError} -}; -use log::{debug, trace}; -use rw_stream_sink::RwStreamSink; -use std::{error, fmt}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use tokio_io::{AsyncRead, AsyncWrite}; -use websocket::client::builder::ClientBuilder; -use websocket::message::OwnedMessage; -use websocket::server::upgrade::r#async::IntoWs; -use websocket::stream::r#async::Stream as AsyncStream; - -/// Represents the configuration for a websocket transport capability for libp2p. Must be put on -/// top of another `Transport`. -/// -/// This implementation of `Transport` accepts any address that ends with `/ws` or `/wss`, and will -/// try to pass the underlying multiaddress to the underlying `Transport`. -/// -/// Note that the underlying multiaddr is `/dns4/...` or `/dns6/...`, then this library will -/// pass the domain name in the headers of the request. This is important is the listener is behind -/// an HTTP proxy. -/// -/// > **Note**: `/wss` is only supported for dialing and not listening. -#[derive(Debug, Clone)] -pub struct WsConfig { - transport: T, -} - -impl WsConfig { - /// Creates a new configuration object for websocket. - /// - /// The websockets will run on top of the `Transport` you pass as parameter. - #[inline] - pub fn new(inner: T) -> WsConfig { - WsConfig { transport: inner } - } -} - -impl Transport for WsConfig -where - // TODO: this 'static is pretty arbitrary and is necessary because of the websocket library - T: Transport + 'static, - T::Error: Send, - T::Dial: Send, - T::Listener: Send, - T::ListenerUpgrade: Send, - // TODO: this Send is pretty arbitrary and is necessary because of the websocket library - T::Output: AsyncRead + AsyncWrite + Send, -{ - type Output = Box; - type Error = WsError; - type Listener = Box, Error = Self::Error> + Send>; - type ListenerUpgrade = Box + Send>; - type Dial = Box + Send>; - - fn listen_on(self, original_addr: Multiaddr) -> Result> { - let mut inner_addr = original_addr.clone(); - match inner_addr.pop() { - Some(Protocol::Ws(ref path)) if path == "/" || path.is_empty() => {}, - _ => return Err(TransportError::MultiaddrNotSupported(original_addr)), - }; - - let inner_listen = self.transport.listen_on(inner_addr) - .map_err(|err| err.map(WsError::Underlying))?; - - let listen = inner_listen.map_err(WsError::Underlying).map(move |event| { - match event { - ListenerEvent::NewAddress(mut a) => { - a = a.with(Protocol::Ws(From::from("/"))); - debug!("Listening on {}", a); - ListenerEvent::NewAddress(a) - } - ListenerEvent::AddressExpired(mut a) => { - a = a.with(Protocol::Ws(From::from("/"))); - ListenerEvent::AddressExpired(a) - } - ListenerEvent::Upgrade { upgrade, mut listen_addr, mut remote_addr } => { - listen_addr = listen_addr.with(Protocol::Ws(From::from("/"))); - remote_addr = remote_addr.with(Protocol::Ws(From::from("/"))); - - // Upgrade the listener to websockets like the websockets library requires us to do. - let upgraded = upgrade.map_err(WsError::Underlying).and_then(move |stream| { - debug!("Incoming connection"); - stream.into_ws() - .map_err(|e| WsError::WebSocket(Box::new(e.3))) - .and_then(|stream| { - // Accept the next incoming connection. - stream - .accept() - .map_err(|e| WsError::WebSocket(Box::new(e))) - .map(|(client, _http_headers)| { - debug!("Upgraded incoming connection to websockets"); - - // Plug our own API on top of the `websockets` API. - let framed_data = client - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) - .with(|data| Ok(OwnedMessage::Binary(data))) - .and_then(|recv| { - match recv { - OwnedMessage::Binary(data) => Ok(Some(data)), - OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), - OwnedMessage::Close(_) => Ok(None), - // TODO: handle pings and pongs, which is freaking hard - // for now we close the socket when that happens - _ => Ok(None) - } - }) - // TODO: is there a way to merge both lines into one? - .take_while(|v| Ok(v.is_some())) - .map(|v| v.expect("we only take while this is Some")); - - let read_write = RwStreamSink::new(framed_data); - Box::new(read_write) as Box - }) - }) - .map(|s| Box::new(Ok(s).into_future()) as Box + Send>) - .into_future() - .flatten() - }); - - ListenerEvent::Upgrade { - upgrade: Box::new(upgraded) as Box + Send>, - listen_addr, - remote_addr - } - } - } - }); - - Ok(Box::new(listen) as Box<_>) - } - - fn dial(self, original_addr: Multiaddr) -> Result> { - let mut inner_addr = original_addr.clone(); - let (ws_path, is_wss) = match inner_addr.pop() { - Some(Protocol::Ws(path)) => (path.into_owned(), false), - Some(Protocol::Wss(path)) => (path.into_owned(), true), - _ => { - trace!( - "Ignoring dial attempt for {} because it is not a websocket multiaddr", - original_addr - ); - return Err(TransportError::MultiaddrNotSupported(original_addr)); - } - }; - - debug!("Dialing {} through inner transport", inner_addr); - - let ws_addr = client_addr_to_ws(&inner_addr, &ws_path, is_wss); - - let inner_dial = self.transport.dial(inner_addr) - .map_err(|err| err.map(WsError::Underlying))?; - - let dial = inner_dial - .map_err(WsError::Underlying) - .into_future() - .and_then(move |connec| { - ClientBuilder::new(&ws_addr) - .expect("generated ws address is always valid") - .async_connect_on(connec) - .map_err(|e| WsError::WebSocket(Box::new(e))) - .map(|(client, _)| { - debug!("Upgraded outgoing connection to websockets"); - - // Plug our own API on top of the API of the websockets library. - let framed_data = client - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) - .with(|data| Ok(OwnedMessage::Binary(data))) - .and_then(|recv| { - match recv { - OwnedMessage::Binary(data) => Ok(data), - OwnedMessage::Text(data) => Ok(data.into_bytes()), - // TODO: pings and pongs and close messages need to be - // answered; and this is really hard; for now we produce - // an error when that happens - _ => Err(IoError::new(IoErrorKind::Other, "unimplemented")), - } - }); - let read_write = RwStreamSink::new(framed_data); - Box::new(read_write) as Box - }) - }); - - Ok(Box::new(dial) as Box<_>) - } -} - -/// Error in WebSockets. -#[derive(Debug)] -pub enum WsError { - /// Error in the WebSocket layer. - WebSocket(Box), - /// Error in the transport layer underneath. - Underlying(TErr), -} - -impl fmt::Display for WsError -where TErr: fmt::Display -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - WsError::WebSocket(err) => write!(f, "{}", err), - WsError::Underlying(err) => write!(f, "{}", err), - } - } -} - -impl error::Error for WsError -where TErr: error::Error + 'static -{ - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - WsError::WebSocket(err) => Some(&**err), - WsError::Underlying(err) => Some(err), - } - } -} - -fn client_addr_to_ws(client_addr: &Multiaddr, ws_path: &str, is_wss: bool) -> String { - let inner = { - let protocols: Vec<_> = client_addr.iter().collect(); - - if protocols.len() != 2 { - "127.0.0.1".to_owned() - } else { - match (&protocols[0], &protocols[1]) { - (&Protocol::Ip4(ref ip), &Protocol::Tcp(port)) => { - format!("{}:{}", ip, port) - } - (&Protocol::Ip6(ref ip), &Protocol::Tcp(port)) => { - format!("[{}]:{}", ip, port) - } - (&Protocol::Dns4(ref ns), &Protocol::Tcp(port)) => { - format!("{}:{}", ns, port) - } - (&Protocol::Dns6(ref ns), &Protocol::Tcp(port)) => { - format!("{}:{}", ns, port) - } - _ => "127.0.0.1".to_owned(), - } - } - }; - - if is_wss { - format!("wss://{}{}", inner, ws_path) - } else { - format!("ws://{}{}", inner, ws_path) - } -} - -#[cfg(test)] -mod tests { - use libp2p_tcp as tcp; - use tokio::runtime::current_thread::Runtime; - use futures::{Future, Stream}; - use libp2p_core::{ - Transport, - multiaddr::Protocol, - transport::ListenerEvent - }; - use super::WsConfig; - - #[test] - fn dialer_connects_to_listener_ipv4() { - let ws_config = WsConfig::new(tcp::TcpConfig::new()); - - let mut listener = ws_config.clone() - .listen_on("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()) - .unwrap(); - - let addr = listener.by_ref().wait() - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); - - assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2)); - assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1)); - - let listener = listener - .filter_map(ListenerEvent::into_upgrade) - .into_future() - .map_err(|(e, _)| e) - .and_then(|(c, _)| c.unwrap().0); - - let dialer = ws_config.clone().dial(addr.clone()).unwrap(); - - let future = listener - .select(dialer) - .map_err(|(e, _)| e) - .and_then(|(_, n)| n); - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); - } - - #[test] - fn dialer_connects_to_listener_ipv6() { - let ws_config = WsConfig::new(tcp::TcpConfig::new()); - - let mut listener = ws_config.clone() - .listen_on("/ip6/::1/tcp/0/ws".parse().unwrap()) - .unwrap(); - - let addr = listener.by_ref().wait() - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); - - assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2)); - assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1)); - - let listener = listener - .filter_map(ListenerEvent::into_upgrade) - .into_future() - .map_err(|(e, _)| e) - .and_then(|(c, _)| c.unwrap().0); - - let dialer = ws_config.clone().dial(addr.clone()).unwrap(); - - let future = listener - .select(dialer) - .map_err(|(e, _)| e) - .and_then(|(_, n)| n); - - let mut rt = Runtime::new().unwrap(); - let _ = rt.block_on(future).unwrap(); - } -} diff --git a/transports/websocket/src/error.rs b/transports/websocket/src/error.rs new file mode 100644 index 00000000..10bea425 --- /dev/null +++ b/transports/websocket/src/error.rs @@ -0,0 +1,76 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::Multiaddr; +use crate::tls; +use std::{error, fmt}; + +/// Error in WebSockets. +#[derive(Debug)] +pub enum Error { + /// Error in the transport layer underneath. + Transport(E), + /// A TLS related error. + Tls(tls::Error), + /// Websocket handshake error. + Handshake(Box), + /// The configured maximum of redirects have been made. + TooManyRedirects, + /// A multi-address is not supported. + InvalidMultiaddr(Multiaddr), + /// The location header URL was invalid. + InvalidRedirectLocation, + /// Websocket base framing error. + Base(Box) +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::Transport(err) => write!(f, "{}", err), + Error::Tls(err) => write!(f, "{}", err), + Error::Handshake(err) => write!(f, "{}", err), + Error::InvalidMultiaddr(ma) => write!(f, "invalid multi-address: {}", ma), + Error::TooManyRedirects => f.write_str("too many redirects"), + Error::InvalidRedirectLocation => f.write_str("invalid redirect location"), + Error::Base(err) => write!(f, "{}", err) + } + } +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + Error::Transport(err) => Some(err), + Error::Tls(err) => Some(err), + Error::Handshake(err) => Some(&**err), + Error::Base(err) => Some(&**err), + Error::InvalidMultiaddr(_) + | Error::TooManyRedirects + | Error::InvalidRedirectLocation => None + } + } +} + +impl From for Error { + fn from(e: tls::Error) -> Self { + Error::Tls(e) + } +} diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs new file mode 100644 index 00000000..4a5a3960 --- /dev/null +++ b/transports/websocket/src/framed.rs @@ -0,0 +1,431 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::BytesMut; +use crate::{error::Error, tls}; +use futures::{future::{self, Either, Loop}, prelude::*, try_ready}; +use libp2p_core::{ + Transport, + either::EitherOutput, + multiaddr::{Protocol, Multiaddr}, + transport::{ListenerEvent, TransportError} +}; +use log::{debug, trace}; +use tokio_rustls::{client, server}; +use soketto::{base, connection::{Connection, Mode}, handshake::{self, Redirect, Response}}; +use std::io; +use tokio_codec::{Framed, FramedParts}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_rustls::webpki; +use url::Url; + +/// Max. number of payload bytes of a single frame. +const MAX_DATA_SIZE: u64 = 256 * 1024 * 1024; + +/// A Websocket transport whose output type is a [`Stream`] and [`Sink`] of +/// frame payloads which does not implement [`AsyncRead`] or +/// [`AsyncWrite`]. See [`crate::WsConfig`] if you require the latter. +#[derive(Debug, Clone)] +pub struct WsConfig { + transport: T, + max_data_size: u64, + tls_config: tls::Config, + max_redirects: u8 +} + +impl WsConfig { + /// Create a new websocket transport based on another transport. + pub fn new(transport: T) -> Self { + WsConfig { + transport, + max_data_size: MAX_DATA_SIZE, + tls_config: tls::Config::client(), + max_redirects: 0 + } + } + + /// Return the configured maximum number of redirects. + pub fn max_redirects(&self) -> u8 { + self.max_redirects + } + + /// Set max. number of redirects to follow. + pub fn set_max_redirects(&mut self, max: u8) -> &mut Self { + self.max_redirects = max; + self + } + + /// Get the max. frame data size we support. + pub fn max_data_size(&self) -> u64 { + self.max_data_size + } + + /// Set the max. frame data size we support. + pub fn set_max_data_size(&mut self, size: u64) -> &mut Self { + self.max_data_size = size; + self + } + + /// Set the TLS configuration if TLS support is desired. + pub fn set_tls_config(&mut self, c: tls::Config) -> &mut Self { + self.tls_config = c; + self + } +} + +impl Transport for WsConfig +where + T: Transport + Send + Clone + 'static, + T::Error: Send + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::Output: AsyncRead + AsyncWrite + Send + 'static +{ + type Output = BytesConnection; + type Error = Error; + type Listener = Box, Error = Self::Error> + Send>; + type ListenerUpgrade = Box + Send>; + type Dial = Box + Send>; + + fn listen_on(self, addr: Multiaddr) -> Result> { + let mut inner_addr = addr.clone(); + + let (use_tls, proto) = match inner_addr.pop() { + Some(p@Protocol::Wss(_)) => + if self.tls_config.server.is_some() { + (true, p) + } else { + debug!("/wss address but TLS server support is not configured"); + return Err(TransportError::MultiaddrNotSupported(addr)) + } + Some(p@Protocol::Ws(_)) => (false, p), + _ => { + debug!("{} is not a websocket multiaddr", addr); + return Err(TransportError::MultiaddrNotSupported(addr)) + } + }; + + let tls_config = self.tls_config; + let max_size = self.max_data_size; + let listen = self.transport.listen_on(inner_addr) + .map_err(|e| e.map(Error::Transport))? + .map_err(Error::Transport) + .map(move |event| match event { + ListenerEvent::NewAddress(mut a) => { + a = a.with(proto.clone()); + debug!("Listening on {}", a); + ListenerEvent::NewAddress(a) + } + ListenerEvent::AddressExpired(mut a) => { + a = a.with(proto.clone()); + ListenerEvent::AddressExpired(a) + } + ListenerEvent::Upgrade { upgrade, mut listen_addr, mut remote_addr } => { + listen_addr = listen_addr.with(proto.clone()); + remote_addr = remote_addr.with(proto.clone()); + let remote1 = remote_addr.clone(); // used for logging + let remote2 = remote_addr.clone(); // used for logging + let tls_config = tls_config.clone(); + let upgraded = upgrade.map_err(Error::Transport) + .and_then(move |stream| { + trace!("incoming connection from {}", remote1); + if use_tls { // begin TLS session + let server = tls_config.server.expect("for use_tls we checked server"); + trace!("awaiting TLS handshake with {}", remote1); + let future = server.accept(stream) + .map_err(move |e| { + debug!("TLS handshake with {} failed: {}", remote1, e); + Error::Tls(tls::Error::from(e)) + }) + .map(|s| EitherOutput::First(EitherOutput::Second(s))); + Either::A(future) + } else { // continue with plain stream + Either::B(future::ok(EitherOutput::Second(stream))) + } + }) + .and_then(move |stream| { + trace!("receiving websocket handshake request from {}", remote2); + Framed::new(stream, handshake::Server::new()) + .into_future() + .map_err(|(e, _framed)| Error::Handshake(Box::new(e))) + .and_then(move |(request, framed)| { + if let Some(r) = request { + trace!("accepting websocket handshake request from {}", remote2); + let key = Vec::from(r.key()); + Either::A(framed.send(Ok(handshake::Accept::new(key))) + .map_err(|e| Error::Base(Box::new(e))) + .map(move |f| { + trace!("websocket handshake with {} successful", remote2); + let c = new_connection(f, max_size, Mode::Server); + BytesConnection { inner: c } + })) + } else { + debug!("connection to {} terminated during handshake", remote2); + let e: io::Error = io::ErrorKind::ConnectionAborted.into(); + Either::B(future::err(Error::Handshake(Box::new(e)))) + } + }) + }); + ListenerEvent::Upgrade { + upgrade: Box::new(upgraded) as Box + Send>, + listen_addr, + remote_addr + } + } + }); + Ok(Box::new(listen) as Box<_>) + } + + fn dial(self, addr: Multiaddr) -> Result> { + // Quick sanity check of the provided Multiaddr. + if let Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) = addr.iter().last() { + // ok + } else { + debug!("{} is not a websocket multiaddr", addr); + return Err(TransportError::MultiaddrNotSupported(addr)) + } + // We are looping here in order to follow redirects (if any): + let max_redirects = self.max_redirects; + let future = future::loop_fn((addr, self, max_redirects), |(addr, cfg, remaining)| { + dial(addr, cfg.clone()).and_then(move |result| match result { + Either::A(redirect) => { + if remaining == 0 { + debug!("too many redirects"); + return Err(Error::TooManyRedirects) + } + let a = location_to_multiaddr(redirect.location())?; + Ok(Loop::Continue((a, cfg, remaining - 1))) + } + Either::B(conn) => Ok(Loop::Break(conn)) + }) + }); + Ok(Box::new(future) as Box<_>) + } +} + +/// Attempty to dial the given address and perform a websocket handshake. +fn dial(address: Multiaddr, config: WsConfig) + -> impl Future>, Error = Error> +where + T: Transport, + T::Output: AsyncRead + AsyncWrite +{ + trace!("dial address: {}", address); + + let WsConfig { transport, max_data_size, tls_config, .. } = config; + + let (host_port, dns_name) = match host_and_dnsname(&address) { + Ok(x) => x, + Err(e) => return Either::A(future::err(e)) + }; + + let mut inner_addr = address.clone(); + + let (use_tls, path) = match inner_addr.pop() { + Some(Protocol::Ws(path)) => (false, path), + Some(Protocol::Wss(path)) => { + if dns_name.is_none() { + debug!("no DNS name in {}", address); + return Either::A(future::err(Error::InvalidMultiaddr(address))) + } + (true, path) + } + _ => { + debug!("{} is not a websocket multiaddr", address); + return Either::A(future::err(Error::InvalidMultiaddr(address))) + } + }; + + let dial = match transport.dial(inner_addr) { + Ok(dial) => dial, + Err(TransportError::MultiaddrNotSupported(a)) => + return Either::A(future::err(Error::InvalidMultiaddr(a))), + Err(TransportError::Other(e)) => + return Either::A(future::err(Error::Transport(e))) + }; + + let address1 = address.clone(); // used for logging + let address2 = address.clone(); // used for logging + let future = dial.map_err(Error::Transport) + .and_then(move |stream| { + trace!("connected to {}", address); + if use_tls { // begin TLS session + let dns_name = dns_name.expect("for use_tls we have checked that dns_name is some"); + trace!("starting TLS handshake with {}", address); + let future = tls_config.client.connect(dns_name.as_ref(), stream) + .map_err(move |e| { + debug!("TLS handshake with {} failed: {}", address, e); + Error::Tls(tls::Error::from(e)) + }) + .map(|s| EitherOutput::First(EitherOutput::First(s))); + return Either::A(future) + } + // continue with plain stream + Either::B(future::ok(EitherOutput::Second(stream))) + }) + .and_then(move |stream| { + trace!("sending websocket handshake request to {}", address1); + let client = handshake::Client::new(host_port, path); + Framed::new(stream, client) + .send(()) + .map_err(|e| Error::Handshake(Box::new(e))) + .and_then(move |framed| { + trace!("awaiting websocket handshake response form {}", address2); + framed.into_future().map_err(|(e, _)| Error::Base(Box::new(e))) + }) + .and_then(move |(response, framed)| { + match response { + None => { + debug!("connection to {} terminated during handshake", address1); + let e: io::Error = io::ErrorKind::ConnectionAborted.into(); + return Err(Error::Handshake(Box::new(e))) + } + Some(Response::Redirect(r)) => { + debug!("received {}", r); + return Ok(Either::A(r)) + } + Some(Response::Accepted(_)) => { + trace!("websocket handshake with {} successful", address1) + } + } + let c = new_connection(framed, max_data_size, Mode::Client); + Ok(Either::B(BytesConnection { inner: c })) + }) + }); + + Either::B(future) +} + +// Extract host, port and optionally the DNS name from the given [`Multiaddr`]. +fn host_and_dnsname(addr: &Multiaddr) -> Result<(String, Option), Error> { + let mut iter = addr.iter(); + match (iter.next(), iter.next()) { + (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) => + Ok((format!("{}:{}", ip, port), None)), + (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) => + Ok((format!("{}:{}", ip, port), None)), + (Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) => + Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))), + (Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) => + Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))), + _ => { + debug!("multi-address format not supported: {}", addr); + Err(Error::InvalidMultiaddr(addr.clone())) + } + } +} + +// Given a location URL, build a new websocket [`Multiaddr`]. +fn location_to_multiaddr(location: &str) -> Result> { + match Url::parse(location) { + Ok(url) => { + let mut a = Multiaddr::empty(); + match url.host() { + Some(url::Host::Domain(h)) => { + a.push(Protocol::Dns4(h.into())) + } + Some(url::Host::Ipv4(ip)) => { + a.push(Protocol::Ip4(ip)) + } + Some(url::Host::Ipv6(ip)) => { + a.push(Protocol::Ip6(ip)) + } + None => return Err(Error::InvalidRedirectLocation) + } + if let Some(p) = url.port() { + a.push(Protocol::Tcp(p)) + } + let s = url.scheme(); + if s.eq_ignore_ascii_case("https") | s.eq_ignore_ascii_case("wss") { + a.push(Protocol::Wss(url.path().into())) + } else if s.eq_ignore_ascii_case("http") | s.eq_ignore_ascii_case("ws") { + a.push(Protocol::Ws(url.path().into())) + } else { + debug!("unsupported scheme: {}", s); + return Err(Error::InvalidRedirectLocation) + } + Ok(a) + } + Err(e) => { + debug!("failed to parse url as multi-address: {:?}", e); + Err(Error::InvalidRedirectLocation) + } + } +} + +/// Create a `Connection` from an existing `Framed` value. +fn new_connection(framed: Framed, max_size: u64, mode: Mode) -> Connection +where + T: AsyncRead + AsyncWrite +{ + let mut codec = base::Codec::new(); + codec.set_max_data_size(max_size); + let old = framed.into_parts(); + let mut new = FramedParts::new(old.io, codec); + new.read_buf = old.read_buf; + new.write_buf = old.write_buf; + let framed = Framed::from_parts(new); + Connection::from_framed(framed, mode) +} + +// BytesConnection //////////////////////////////////////////////////////////////////////////////// + +/// A [`Stream`] and [`Sink`] that produces and consumes [`BytesMut`] values +/// which correspond to the payload data of websocket frames. +#[derive(Debug)] +pub struct BytesConnection { + inner: Connection, server::TlsStream>, T>> +} + +impl Stream for BytesConnection { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let data = try_ready!(self.inner.poll().map_err(|e| io::Error::new(io::ErrorKind::Other, e))); + Ok(Async::Ready(data.map(base::Data::into_bytes))) + } +} + +impl Sink for BytesConnection { + type SinkItem = BytesMut; + type SinkError = io::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + let result = self.inner.start_send(base::Data::Binary(item)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)); + + if let AsyncSink::NotReady(data) = result? { + Ok(AsyncSink::NotReady(data.into_bytes())) + } else { + Ok(AsyncSink::Ready) + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.poll_complete().map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.close().map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } +} + diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 85b36ff6..00f56da8 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. +// Copyright 2017-2019 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -18,66 +18,198 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -#![recursion_limit = "512"] - //! Implementation of the libp2p `Transport` trait for Websockets. -//! -//! See the documentation of `swarm` and of libp2p in general to learn how to use the `Transport` -//! trait. -//! -//! This library is used in a different way depending on whether you are compiling for emscripten -//! or for a different operating system. -//! -//! # Emscripten -//! -//! On emscripten, you can create a `BrowserWsConfig` object with `BrowserWsConfig::new()`. It can -//! then be used as a transport. -//! -//! Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress -//! which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. -//! -//! ```ignore -//! use libp2p_websocket::BrowserWsConfig; -//! -//! let ws_config = BrowserWsConfig::new(); -//! // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); -//! ``` -//! -//! # Other operating systems -//! -//! On other operating systems, this library doesn't open any socket by itself. Instead it must be -//! plugged on top of another implementation of `Transport` such as TCP/IP. -//! -//! This underlying transport must be put inside a `WsConfig` object through the -//! `WsConfig::new()` function. -//! -//! ``` -//! extern crate libp2p_core; -//! extern crate libp2p_tcp; -//! extern crate libp2p_websocket; -//! -//! use libp2p_core::{Multiaddr, Transport}; -//! use libp2p_tcp::TcpConfig; -//! use libp2p_websocket::WsConfig; -//! -//! # fn main() { -//! let ws_config = WsConfig::new(TcpConfig::new()); -//! # return; -//! let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); -//! # } -//! ``` -//! -#[cfg(any(target_os = "emscripten", target_os = "unknown"))] -#[macro_use] -extern crate stdweb; +pub mod error; +pub mod framed; +pub mod tls; -#[cfg(any(target_os = "emscripten", target_os = "unknown"))] -mod browser; -#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] -mod desktop; +use error::Error; +use framed::BytesConnection; +use futures::prelude::*; +use libp2p_core::{ + ConnectedPoint, + Transport, + multiaddr::Multiaddr, + transport::{map::{MapFuture, MapStream}, ListenerEvent, TransportError} +}; +use rw_stream_sink::RwStreamSink; +use tokio_io::{AsyncRead, AsyncWrite}; -#[cfg(any(target_os = "emscripten", target_os = "unknown"))] -pub use self::browser::{BrowserWsConfig, BrowserWsConn}; -#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] -pub use self::desktop::WsConfig; +/// A Websocket transport. +#[derive(Debug, Clone)] +pub struct WsConfig { + transport: framed::WsConfig +} + +impl WsConfig { + /// Create a new websocket transport based on the given transport. + pub fn new(transport: T) -> Self { + framed::WsConfig::new(transport).into() + } + + /// Return the configured maximum number of redirects. + pub fn max_redirects(&self) -> u8 { + self.transport.max_redirects() + } + + /// Set max. number of redirects to follow. + pub fn set_max_redirects(&mut self, max: u8) -> &mut Self { + self.transport.set_max_redirects(max); + self + } + + /// Get the max. frame data size we support. + pub fn max_data_size(&self) -> u64 { + self.transport.max_data_size() + } + + /// Set the max. frame data size we support. + pub fn set_max_data_size(&mut self, size: u64) -> &mut Self { + self.transport.set_max_data_size(size); + self + } + + /// Set the TLS configuration if TLS support is desired. + pub fn set_tls_config(&mut self, c: tls::Config) -> &mut Self { + self.transport.set_tls_config(c); + self + } +} + +impl From> for WsConfig { + fn from(framed: framed::WsConfig) -> Self { + WsConfig { + transport: framed + } + } +} + +impl Transport for WsConfig +where + T: Transport + Send + Clone + 'static, + T::Error: Send + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::Output: AsyncRead + AsyncWrite + Send + 'static +{ + type Output = RwStreamSink>; + type Error = Error; + type Listener = MapStream, WrapperFn>; + type ListenerUpgrade = MapFuture, WrapperFn>; + type Dial = MapFuture, WrapperFn>; + + fn listen_on(self, addr: Multiaddr) -> Result> { + self.transport.map(wrap_connection as WrapperFn).listen_on(addr) + } + + fn dial(self, addr: Multiaddr) -> Result> { + self.transport.map(wrap_connection as WrapperFn).dial(addr) + } +} + +/// Type alias corresponding to `framed::WsConfig::Listener`. +pub type InnerStream = + Box<(dyn Stream, Item = ListenerEvent>> + Send)>; + +/// Type alias corresponding to `framed::WsConfig::Dial` and `framed::WsConfig::ListenerUpgrade`. +pub type InnerFuture = + Box<(dyn Future, Error = Error> + Send)>; + +/// Function type that wraps a websocket connection (see. `wrap_connection`). +pub type WrapperFn = + fn(BytesConnection, ConnectedPoint) -> RwStreamSink>; + +/// Wrap a websocket connection producing data frames into a `RwStreamSink` +/// implementing `AsyncRead` + `AsyncWrite`. +fn wrap_connection(c: BytesConnection, _: ConnectedPoint) -> RwStreamSink> +where + T: AsyncRead + AsyncWrite +{ + RwStreamSink::new(c) +} + +// Tests ////////////////////////////////////////////////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + use libp2p_tcp as tcp; + use tokio::runtime::current_thread::Runtime; + use futures::{Future, Stream}; + use libp2p_core::{ + Transport, + multiaddr::Protocol, + transport::ListenerEvent + }; + use super::WsConfig; + + #[test] + fn dialer_connects_to_listener_ipv4() { + let ws_config = WsConfig::new(tcp::TcpConfig::new()); + + let mut listener = ws_config.clone() + .listen_on("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap()) + .unwrap(); + + let addr = listener.by_ref().wait() + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2)); + assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1)); + + let listener = listener + .filter_map(ListenerEvent::into_upgrade) + .into_future() + .map_err(|(e, _)| e) + .and_then(|(c, _)| c.unwrap().0); + + let dialer = ws_config.clone().dial(addr.clone()).unwrap(); + + let future = listener + .select(dialer) + .map_err(|(e, _)| e) + .and_then(|(_, n)| n); + let mut rt = Runtime::new().unwrap(); + let _ = rt.block_on(future).unwrap(); + } + + #[test] + fn dialer_connects_to_listener_ipv6() { + let ws_config = WsConfig::new(tcp::TcpConfig::new()); + + let mut listener = ws_config.clone() + .listen_on("/ip6/::1/tcp/0/ws".parse().unwrap()) + .unwrap(); + + let addr = listener.by_ref().wait() + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2)); + assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1)); + + let listener = listener + .filter_map(ListenerEvent::into_upgrade) + .into_future() + .map_err(|(e, _)| e) + .and_then(|(c, _)| c.unwrap().0); + + let dialer = ws_config.clone().dial(addr.clone()).unwrap(); + + let future = listener + .select(dialer) + .map_err(|(e, _)| e) + .and_then(|(_, n)| n); + + let mut rt = Runtime::new().unwrap(); + let _ = rt.block_on(future).unwrap(); + } +} diff --git a/transports/websocket/src/tls.rs b/transports/websocket/src/tls.rs new file mode 100644 index 00000000..7e7cb246 --- /dev/null +++ b/transports/websocket/src/tls.rs @@ -0,0 +1,176 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{fmt, io, sync::Arc}; +use tokio_rustls::{ + TlsConnector, + TlsAcceptor, + rustls, + webpki +}; + +/// TLS configuration. +#[derive(Clone)] +pub struct Config { + pub(crate) client: TlsConnector, + pub(crate) server: Option +} + +impl fmt::Debug for Config { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("Config") + } +} + +/// Private key, DER-encoded ASN.1 in either PKCS#8 or PKCS#1 format. +#[derive(Clone)] +pub struct PrivateKey(rustls::PrivateKey); + +impl PrivateKey { + /// Assert the given bytes are DER-encoded ASN.1 in either PKCS#8 or PKCS#1 format. + pub fn new(bytes: Vec) -> Self { + PrivateKey(rustls::PrivateKey(bytes)) + } +} + +/// Certificate, DER-encoded X.509 format. +#[derive(Debug, Clone)] +pub struct Certificate(rustls::Certificate); + +impl Certificate { + /// Assert the given bytes are in DER-encoded X.509 format. + pub fn new(bytes: Vec) -> Self { + Certificate(rustls::Certificate(bytes)) + } +} + +impl Config { + /// Create a new TLS configuration with the given server key and certificate chain. + pub fn new(key: PrivateKey, certs: I) -> Result + where + I: IntoIterator + { + let mut builder = Config::builder(); + builder.server(key, certs)?; + Ok(builder.finish()) + } + + /// Create a client-only configuration. + pub fn client() -> Self { + Config { + client: Arc::new(client_config()).into(), + server: None + } + } + + /// Create a new TLS configuration builder. + pub fn builder() -> Builder { + Builder { client: client_config(), server: None } + } +} + +/// Setup the rustls client configuration. +fn client_config() -> rustls::ClientConfig { + let mut client = rustls::ClientConfig::new(); + client.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + client +} + +/// TLS configuration builder. +pub struct Builder { + client: rustls::ClientConfig, + server: Option +} + +impl Builder { + /// Set server key and certificate chain. + pub fn server(&mut self, key: PrivateKey, certs: I) -> Result<&mut Self, Error> + where + I: IntoIterator + { + let mut server = rustls::ServerConfig::new(rustls::NoClientAuth::new()); + let certs = certs.into_iter().map(|c| c.0).collect(); + server.set_single_cert(certs, key.0).map_err(|e| Error::Tls(Box::new(e)))?; + self.server = Some(server); + Ok(self) + } + + /// Add an additional trust anchor. + pub fn add_trust(&mut self, cert: &Certificate) -> Result<&mut Self, Error> { + self.client.root_store.add(&cert.0).map_err(|e| Error::Tls(Box::new(e)))?; + Ok(self) + } + + /// Finish configuration. + pub fn finish(self) -> Config { + Config { + client: Arc::new(self.client).into(), + server: self.server.map(|s| Arc::new(s).into()) + } + } +} + +pub(crate) fn dns_name_ref(name: &str) -> Result, Error> { + webpki::DNSNameRef::try_from_ascii_str(name).map_err(|()| Error::InvalidDnsName(name.into())) +} + +// Error ////////////////////////////////////////////////////////////////////////////////////////// + +/// TLS related errors. +#[derive(Debug)] +pub enum Error { + /// An underlying I/O error. + Io(io::Error), + /// Actual TLS error. + Tls(Box), + /// The DNS name was invalid. + InvalidDnsName(String), + + #[doc(hidden)] + __Nonexhaustive +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Io(e) => write!(f, "i/o error: {}", e), + Error::Tls(e) => write!(f, "tls error: {}", e), + Error::InvalidDnsName(n) => write!(f, "invalid DNS name: {}", n), + Error::__Nonexhaustive => f.write_str("__Nonexhaustive") + } + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Error::Io(e) => Some(e), + Error::Tls(e) => Some(&**e), + Error::InvalidDnsName(_) | Error::__Nonexhaustive => None + } + } +} + +impl From for Error { + fn from(e: io::Error) -> Self { + Error::Io(e) + } +} +