From c211d6b96ed41edd1ff5d93ed71982b87c6040c5 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 2 Jan 2018 15:22:55 +0100 Subject: [PATCH 1/5] Add a websocket transport --- Cargo.toml | 1 + README.md | 1 + example/Cargo.toml | 1 + example/examples/echo-dialer.rs | 7 + example/examples/echo-server.rs | 6 + libp2p-swarm/src/connection_reuse.rs | 1 + libp2p-tcp-transport/src/lib.rs | 1 + libp2p-websocket/Cargo.toml | 21 ++ libp2p-websocket/README.md | 43 ++++ libp2p-websocket/src/browser.rs | 281 +++++++++++++++++++++++++++ libp2p-websocket/src/desktop.rs | 253 ++++++++++++++++++++++++ libp2p-websocket/src/lib.rs | 91 +++++++++ 12 files changed, 707 insertions(+) create mode 100644 libp2p-websocket/Cargo.toml create mode 100644 libp2p-websocket/README.md create mode 100644 libp2p-websocket/src/browser.rs create mode 100644 libp2p-websocket/src/desktop.rs create mode 100644 libp2p-websocket/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index f1fdf589..55336225 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "libp2p-secio", "libp2p-swarm", "libp2p-tcp-transport", + "libp2p-websocket", "multistream-select", "datastore", "futures-mutex", diff --git a/README.md b/README.md index e389a7c8..e936368f 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ Architecture of the crates of this repository: `ConnectionUpgrade` trait of `libp2p-swarm`. - `libp2p-swarm`: Core library that contains all the traits of *libp2p* and plugs things together. - `libp2p-tcp-transport`: Implementation of the `Transport` trait of `libp2p-swarm` for TCP/IP. +- `libp2p-websocket`: Implementation of the `Transport` trait of `libp2p-swarm` for Websockets. - `multistream-select`: Implementation of the `multistream-select` protocol, which is used to negotiate a protocol over a newly-established connection with a peer, or after a connection upgrade. diff --git a/example/Cargo.toml b/example/Cargo.toml index 4ae24fc3..79f9e751 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -10,5 +10,6 @@ multiplex = { path = "../multiplex-rs" } libp2p-secio = { path = "../libp2p-secio" } libp2p-swarm = { path = "../libp2p-swarm" } libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +libp2p-websocket = { path = "../libp2p-websocket" } tokio-core = "0.1" tokio-io = "0.1" diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index 06dea677..e255154a 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -23,6 +23,7 @@ extern crate futures; extern crate libp2p_secio as secio; extern crate libp2p_swarm as swarm; extern crate libp2p_tcp_transport as tcp; +extern crate libp2p_websocket as websocket; extern crate multiplex; extern crate tokio_core; extern crate tokio_io; @@ -34,6 +35,7 @@ use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; +use websocket::WsConfig; fn main() { // Determine which address to dial. @@ -46,6 +48,11 @@ fn main() { // We start by creating a `TcpConfig` that indicates that we want TCP/IP. let transport = TcpConfig::new(core.handle()) + // In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP. + // The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be + // used for the underlying multiaddress. + .or_transport(WsConfig::new(TcpConfig::new(core.handle()))) + // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. .with_upgrade({ diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 4334bef0..9a5d084a 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -23,6 +23,7 @@ extern crate futures; extern crate libp2p_secio as secio; extern crate libp2p_swarm as swarm; extern crate libp2p_tcp_transport as tcp; +extern crate libp2p_websocket as websocket; extern crate multiplex; extern crate tokio_core; extern crate tokio_io; @@ -34,6 +35,7 @@ use swarm::{Transport, UpgradeExt, SimpleProtocol}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; +use websocket::WsConfig; fn main() { // Determine which address to listen to. @@ -45,6 +47,10 @@ fn main() { // Now let's build the transport stack. // We start by creating a `TcpConfig` that indicates that we want TCP/IP. let transport = TcpConfig::new(core.handle()) + // In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP. + // The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be + // used for the underlying multiaddress. + .or_transport(WsConfig::new(TcpConfig::new(core.handle()))) // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 906e1886..3821d6a3 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -199,6 +199,7 @@ where S: Stream, fn poll(&mut self) -> Poll, Self::Error> { match self.listener.poll() { Ok(Async::Ready(Some((upgrade, client_addr)))) => { + println!("ready stream"); self.current_upgrades.push((upgrade, client_addr)); } Ok(Async::NotReady) => (), diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index cc311769..d54fdd63 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -109,6 +109,7 @@ impl Transport for TcpConfig { let future = future::result(listener).map(|listener| { // Pull out a stream of sockets for incoming connections listener.incoming().map(|(sock, addr)| { + println!("incoming tcp stream"); let addr = addr.to_multiaddr() .expect("generating a multiaddr from a socket addr never fails"); (Ok(sock).into_future(), addr) diff --git a/libp2p-websocket/Cargo.toml b/libp2p-websocket/Cargo.toml new file mode 100644 index 00000000..1561b3a6 --- /dev/null +++ b/libp2p-websocket/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "libp2p-websocket" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +libp2p-swarm = { path = "../libp2p-swarm" } +futures = "0.1" +multiaddr = "0.2.0" +rw-stream-sink = { path = "../rw-stream-sink" } +tokio-io = "0.1" + +[target.'cfg(not(target_os = "emscripten"))'.dependencies] +websocket = { version = "0.20.2", default-features = false, features = ["async"] } + +[target.'cfg(target_os = "emscripten")'.dependencies] +stdweb = { version = "0.1.3", default-features = false } + +[target.'cfg(not(target_os = "emscripten"))'.dev-dependencies] +libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +tokio-core = "0.1" diff --git a/libp2p-websocket/README.md b/libp2p-websocket/README.md new file mode 100644 index 00000000..3f3b762d --- /dev/null +++ b/libp2p-websocket/README.md @@ -0,0 +1,43 @@ +Implementation of the libp2p `Transport` trait for Websockets. + +See the documentation of `swarm` and of libp2p in general to learn how to use the `Transport` +trait. + +This library is used in a different way depending on whether you are compiling for emscripten +or for a different operating system. + +# Emscripten + +On emscripten, you can create a `WsConfig` object with `WsConfig::new()`. It can then be used +as a transport. + +Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress +which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. + +```rust +use libp2p_websocket::WsConfig; + +let ws_config = WsConfig::new(); +// let _ = ws_config.dial(Multiaddr::new("/ip4/40.41.42.43/tcp/12345/ws").unwrap()); +``` + +# Other operating systems + +On other operating systems, this library doesn't open any socket by itself. Instead it must be +plugged on top of another implementation of `Transport` such as TCP/IP. + +This underlying transport must be passed to the `WsConfig::new()` function. + +```rust +extern crate libp2p_tcp_transport; +extern crate libp2p_websocket; +extern crate tokio_core; + +use libp2p_websocket::WsConfig; +use libp2p_tcp_transport::TcpConfig; +use tokio_core::reactor::Core; + +let core = Core::new().unwrap(); +let ws_config = WsConfig::new(TcpConfig::new(core.handle())); +// let _ = ws_config.dial(Multiaddr::new("/ip4/40.41.42.43/tcp/12345/ws").unwrap()); +``` diff --git a/libp2p-websocket/src/browser.rs b/libp2p-websocket/src/browser.rs new file mode 100644 index 00000000..fe327c02 --- /dev/null +++ b/libp2p-websocket/src/browser.rs @@ -0,0 +1,281 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::io::{Read, Write}; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::sync::{Arc, Mutex}; +use futures::{Future, Stream, Poll, Async, Then as FutureThen}; +use futures::sync::{oneshot, mpsc}; +use futures::stream::Then as StreamThen; +use multiaddr::{Multiaddr, AddrComponent}; +use rw_stream_sink::RwStreamSink; +use stdweb::{self, Reference}; +use stdweb::web::TypedArray; +use swarm::Transport; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Represents the configuration for a websocket transport capability for libp2p. +/// +/// This implementation of `Transport` accepts any address that looks like +/// `/ip4/.../tcp/.../ws` or `/ip6/.../tcp/.../ws`, and connect to the corresponding IP and port. +/// +/// > **Note**: The `/wss` protocol isn't supported. +#[derive(Debug, Clone)] +pub struct WsConfig; + +impl WsConfig { + /// Creates a new configuration object for websocket. + #[inline] + pub fn new() -> WsConfig { + WsConfig + } +} + +impl Transport for WsConfig { + type RawConn = Connec; + type Listener = Box>; // TODO: use `!` + type ListenerUpgrade = Box>; // TODO: use `!` + type Dial = FutureThen>, Result, fn(Result, oneshot::Canceled>) -> Result>; + + #[inline] + fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + // Listening is never supported. + Err((self, a)) + } + + fn dial(self, original_addr: Multiaddr) -> Result { + // Making sure we are initialized before we dial. Initialization is protected by a simple + // boolean static variable, so it's not a problem to call it multiple times and the cost + // is negligible. + stdweb::initialize(); + + // Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as + // a string) on success. + let inner_addr = match multiaddr_to_target(&original_addr) { + Ok(a) => a, + Err(_) => return Err((self, original_addr)), + }; + + // Create the JS `WebSocket` object. + let websocket = { + let val = js!{ + try { + return new WebSocket(@{inner_addr}); + } catch(e) { + return false; + } + }; + match val.into_reference() { + Some(ws) => ws, + None => return Err((self, original_addr)), // `false` was returned by `js!` + } + }; + + // Create a `message` channel that will be used for both bytes messages and errors, and a + // `message_cb` used for the `message` event on the WebSocket. + // `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed + // by `open_cb`. + let (message_tx, message_rx) = mpsc::unbounded::, IoError>>(); + let message_tx = Arc::new(message_tx); + let mut message_rx = Some(message_rx); + let message_cb = { + let message_tx = message_tx.clone(); + move |message_data: Reference| { + if let Some(buffer) = message_data.downcast::>() { + let _ = message_tx.unbounded_send(Ok(buffer.to_vec())); + } else { + let _ = message_tx.unbounded_send(Err(IoError::new(IoErrorKind::InvalidData, + "received ws message of unknown type"))); + } + } + }; + + // Create a `open` channel that will be used to communicate the `Connec` that represents + // the open dialing websocket. Also create a `open_cb` callback that will be used for the + // `open` message of the websocket. + let (open_tx, open_rx) = oneshot::channel::>(); + let open_tx = Arc::new(Mutex::new(Some(open_tx))); + let websocket_clone = websocket.clone(); + let open_cb = { + let open_tx = open_tx.clone(); + move || { + // Note that `open_tx` can be empty (and a panic happens) if the `open` event + // is triggered twice, or is triggered after the `close` event. We never reuse the + // same websocket twice, so this is not supposed to happen. + let tx = open_tx.lock().unwrap().take().expect("the websocket can only open once"); + // `message_rx` can be empty if the `open` event is triggered twice, which again + // is not supposed to happen. + let message_rx = message_rx.take().expect("the websocket can only open once"); + + // Send a `Connec` to the future that was returned by `dial`. Ignoring errors that + // would happen the future has been dropped by the user. + let _ = tx + .send(Ok(Connec { + websocket: websocket_clone.clone(), + incoming_data: RwStreamSink::new(message_rx.then(|result| { + // An `Err` happens here if `message_tx` has been dropped. However + // `message_tx` is grabbed by the websocket, which stays alive for as + // long as the `Connec` is alive. + match result { + Ok(r) => r, + Err(_) => unreachable!("the message channel outlives the Connec") + } + })), + })); + } + }; + + // Used for the `close` message of the websocket. + // The websocket can be closed either before or after being opened, so we send an error + // to both the `open` and `message` channels if that happens. + let close_cb = move || { + if let Some(tx) = open_tx.lock().unwrap().take() { + let _ = tx.send(Err(IoError::new(IoErrorKind::ConnectionRefused, + "close event on the websocket"))); + } + + let _ = message_tx.unbounded_send(Err(IoError::new(IoErrorKind::ConnectionRefused, + "close event on the websocket"))); + }; + + js!{ + var socket = @{websocket}; + var open_cb = @{open_cb}; + var message_cb = @{message_cb}; + var close_cb = @{close_cb}; + socket.addEventListener("open", function(event) { + open_cb(); + }); + socket.addEventListener("message", function(event) { + var reader = new FileReader(); + reader.addEventListener("loadend", function() { + var typed = new Uint8Array(reader.result); + message_cb(typed); + }); + reader.readAsArrayBuffer(event.data); + }); + socket.addEventListener("close", function(event) { + close_cb(); + }); + }; + + Ok(open_rx.then(|result| { + match result { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) => Err(e), + // `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by + // the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself + // captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should + // never be destroyed. + // TODO: how do we break this cyclic dependency? difficult question + Err(_) => unreachable!("the sending side will only close when we drop the future") + } + })) + } +} + +pub struct Connec { + websocket: Reference, + // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. + incoming_data: RwStreamSink, IoError>>, + fn(Result, IoError>, ()>) -> Result, IoError>, + Result, IoError> + >>, +} + +impl Drop for Connec { + #[inline] + fn drop(&mut self) { + // TODO: apparently there's a memory leak related to callbacks? + js!{ @{&self.websocket}.close(); } + } +} + +impl AsyncRead for Connec { +} + +impl Read for Connec { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.incoming_data.read(buf) + } +} + +impl AsyncWrite for Connec { + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + Ok(Async::Ready(())) + } +} + +impl Write for Connec { + fn write(&mut self, buf: &[u8]) -> Result { + let typed_array = TypedArray::from(buf); + + // `send` can throw if the websocket isn't open (which can happen if it was closed by the + // remote). + let returned = js!{ + try { + @{&self.websocket}.send(@{typed_array.buffer()}); + return true; + } catch(e) { + return false; + } + }; + + match returned { + stdweb::Value::Bool(true) => Ok(buf.len()), + stdweb::Value::Bool(false) => Err(IoError::new(IoErrorKind::BrokenPipe, "websocket has been closed by the remote")), + _ => unreachable!() + } + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + // Everything is always considered flushed. + Ok(()) + } +} + +// Tries to interpret the `Multiaddr` as a `/ipN/.../tcp/.../ws` multiaddress, and if so returns +// the corresponding `ws://.../` URL. +fn multiaddr_to_target(addr: &Multiaddr) -> Result { + let protocols: Vec<_> = addr.iter().collect(); + + if protocols.len() != 3 { + return Err(()); + } + + match (&protocols[0], &protocols[1], &protocols[2]) { + (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { + Ok(format!("ws://{}:{}/", ip, port)) + } + (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { + Ok(format!("ws://[{}]:{}/", ip, port)) + } + _ => Err(()), + } +} + +// TODO: write tests (tests are very difficult to write with emscripten) +// - remote refuses connection +// - remote closes connection before we receive +// - remote closes connection before we send +// - remote sends text data instead of binary diff --git a/libp2p-websocket/src/desktop.rs b/libp2p-websocket/src/desktop.rs new file mode 100644 index 00000000..b7a37ef5 --- /dev/null +++ b/libp2p-websocket/src/desktop.rs @@ -0,0 +1,253 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{stream, Future, IntoFuture, Sink, Stream}; +use multiaddr::{AddrComponent, Multiaddr}; +use rw_stream_sink::RwStreamSink; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use swarm::Transport; +use websocket::client::builder::ClientBuilder; +use websocket::message::OwnedMessage; +use websocket::server::upgrade::async::IntoWs; +use websocket::stream::async::Stream as AsyncStream; + +/// Represents the configuration for a websocket transport capability for libp2p. Must be put on +/// top of another `Transport`. +/// +/// This implementation of `Transport` accepts any address that ends with `/ws`, and will try to +/// pass the underlying multiaddress to the underlying `Transport`. +/// +/// > **Note**: The `/wss` protocol isn't supported. +#[derive(Debug, Clone)] +pub struct WsConfig { + transport: T, +} + +impl WsConfig { + /// Creates a new configuration object for websocket. + /// + /// The websockets will run on top of the `Transport` you pass as parameter. + #[inline] + pub fn new(inner: T) -> WsConfig { + WsConfig { transport: inner } + } +} + +impl Transport for WsConfig +where + T: Transport + 'static, // TODO: this 'static is pretty arbitrary and is necessary because of the websocket library + T::RawConn: Send, // TODO: this Send is pretty arbitrary and is necessary because of the websocket library +{ + type RawConn = Box; + type Listener = stream::Map< + T::Listener, + fn((::ListenerUpgrade, Multiaddr)) -> (Self::ListenerUpgrade, Multiaddr), + >; + type ListenerUpgrade = Box>; + type Dial = Box>; + + fn listen_on( + self, + original_addr: Multiaddr, + ) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + let mut inner_addr = original_addr.clone(); + match inner_addr.pop() { + Some(AddrComponent::WS) => {} + _ => return Err((self, original_addr)), + }; + + let (inner_listen, new_addr) = match self.transport.listen_on(inner_addr) { + Ok((listen, inner_new_addr)) => { + // Need to suffix `/ws` to the listening address. + let new_addr = inner_new_addr + .encapsulate("/ws") + .expect("the /ws suffix is always valid"); + (listen, new_addr) + } + Err((transport, _)) => { + return Err(( + WsConfig { + transport: transport, + }, + original_addr, + )); + } + }; + + let listen = inner_listen.map::<_, fn(_) -> _>(|(stream, client_addr)| { + // Need to suffix `/ws` to each client address. + let client_addr = client_addr + .encapsulate("/ws") + .expect("the /ws suffix is always valid"); + + // Upgrade the listener to websockets like the websockets library requires us to do. + let upgraded = stream.and_then(|stream| { + stream + .into_ws() + .map_err(|e| IoError::new(IoErrorKind::Other, e.3)) + .and_then(|stream| { + // Accept the next incoming connection. + stream + .accept() + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .map(|(client, _http_headers)| { + // Plug our own API on top of the `websockets` API. + let framed_data = client + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) + .with(|data| Ok(OwnedMessage::Binary(data))) + .and_then(|recv| { + match recv { + OwnedMessage::Binary(data) => Ok(Some(data)), + OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), + OwnedMessage::Close(_) => Ok(None), + // TODO: pings and pongs ; freaking hard + _ => unimplemented!() + } + }) + // TODO: is there a way to merge both lines into one? + .take_while(|v| Ok(v.is_some())) + .map(|v| v.expect("we only take while this is Some")); + + let read_write = RwStreamSink::new(framed_data); + Box::new(read_write) as Box + }) + }) + .map(|s| Box::new(Ok(s).into_future()) as Box>) + .into_future() + .flatten() + }); + + ( + Box::new(upgraded) as Box>, + client_addr, + ) + }); + + Ok((listen, new_addr)) + } + + fn dial(self, original_addr: Multiaddr) -> Result { + let mut inner_addr = original_addr.clone(); + match inner_addr.pop() { + Some(AddrComponent::WS) => {} + _ => return Err((self, original_addr)), + }; + + let inner_dial = match self.transport.dial(inner_addr) { + Ok(d) => d, + Err((transport, _)) => { + return Err(( + WsConfig { + transport: transport, + }, + original_addr, + )); + } + }; + + let dial = inner_dial.into_future().and_then(|connec| { + // We pass a dummy address to `ClientBuilder` because it is never used anywhere + // in the negotiation anyway, and we use `async_connect_on` to pass a stream. + ClientBuilder::new("ws://127.0.0.1:80") + .expect("hard-coded ws address is always valid") + .async_connect_on(connec) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .map(|(client, _)| { + // Plug our own API on top of the API of the websockets library. + let framed_data = client + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) + .with(|data| Ok(OwnedMessage::Binary(data))) + .and_then(|recv| { + match recv { + OwnedMessage::Binary(data) => Ok(data), + OwnedMessage::Text(data) => Ok(data.into_bytes()), + // TODO: pings and pongs and close messages need to be + // answered ; and this is really hard + _ => unimplemented!(), + } + }); + let read_write = RwStreamSink::new(framed_data); + Box::new(read_write) as Box + }) + }); + + Ok(Box::new(dial) as Box<_>) + } +} + +#[cfg(test)] +mod tests { + extern crate libp2p_tcp_transport as tcp; + extern crate tokio_core; + use self::tokio_core::reactor::Core; + use WsConfig; + use futures::{Future, Stream}; + use swarm::Transport; + + #[test] + fn dialer_connects_to_listener_ipv4() { + let mut core = Core::new().unwrap(); + let ws_config = WsConfig::new(tcp::TcpConfig::new(core.handle())); + + let (listener, addr) = ws_config + .clone() + .listen_on("/ip4/0.0.0.0/tcp/0/ws".parse().unwrap()) + .unwrap(); + assert!(addr.to_string().ends_with("/ws")); + assert!(!addr.to_string().ends_with("/0/ws")); + let listener = listener + .into_future() + .map_err(|(e, _)| e) + .and_then(|(c, _)| c.unwrap().0); + let dialer = ws_config.clone().dial(addr).unwrap(); + + let future = listener + .select(dialer) + .map_err(|(e, _)| e) + .and_then(|(_, n)| n); + core.run(future).unwrap(); + } + + #[test] + fn dialer_connects_to_listener_ipv6() { + let mut core = Core::new().unwrap(); + let ws_config = WsConfig::new(tcp::TcpConfig::new(core.handle())); + + let (listener, addr) = ws_config + .clone() + .listen_on("/ip6/::1/tcp/0/ws".parse().unwrap()) + .unwrap(); + assert!(addr.to_string().ends_with("/ws")); + assert!(!addr.to_string().ends_with("/0/ws")); + let listener = listener + .into_future() + .map_err(|(e, _)| e) + .and_then(|(c, _)| c.unwrap().0); + let dialer = ws_config.clone().dial(addr).unwrap(); + + let future = listener + .select(dialer) + .map_err(|(e, _)| e) + .and_then(|(_, n)| n); + core.run(future).unwrap(); + } +} diff --git a/libp2p-websocket/src/lib.rs b/libp2p-websocket/src/lib.rs new file mode 100644 index 00000000..6798035d --- /dev/null +++ b/libp2p-websocket/src/lib.rs @@ -0,0 +1,91 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![recursion_limit = "512"] + +// TODO: use this once stable ; for now we just copy-paste the content of the README.md +//#![doc(include = "../README.md")] + +//! Implementation of the libp2p `Transport` trait for Websockets. +//! +//! See the documentation of `swarm` and of libp2p in general to learn how to use the `Transport` +//! trait. +//! +//! This library is used in a different way depending on whether you are compiling for emscripten +//! or for a different operating system. +//! +//! # Emscripten +//! +//! On emscripten, you can create a `WsConfig` object with `WsConfig::new()`. It can then be used +//! as a transport. +//! +//! Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress +//! which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. +//! +//! ```ignore +//! use libp2p_websocket::WsConfig; +//! +//! let ws_config = WsConfig::new(); +//! // let _ = ws_config.dial(Multiaddr::new("/ip4/40.41.42.43/tcp/12345/ws").unwrap()); +//! ``` +//! +//! # Other operating systems +//! +//! On other operating systems, this library doesn't open any socket by itself. Instead it must be +//! plugged on top of another implementation of `Transport` such as TCP/IP. +//! +//! This underlying transport must be passed to the `WsConfig::new()` function. +//! +//! ```ignore +//! extern crate libp2p_tcp_transport; +//! extern crate libp2p_websocket; +//! extern crate tokio_core; +//! +//! use libp2p_websocket::WsConfig; +//! use libp2p_tcp_transport::TcpConfig; +//! use tokio_core::reactor::Core; +//! +//! let core = Core::new().unwrap(); +//! let ws_config = WsConfig::new(TcpConfig::new(core.handle())); +//! // let _ = ws_config.dial(Multiaddr::new("/ip4/40.41.42.43/tcp/12345/ws").unwrap()); +//! ``` +//! + +extern crate futures; +extern crate libp2p_swarm as swarm; +extern crate multiaddr; +extern crate rw_stream_sink; +extern crate tokio_io; + +#[cfg(target_os = "emscripten")] +#[macro_use] +extern crate stdweb; +#[cfg(not(target_os = "emscripten"))] +extern crate websocket; + +#[cfg(not(target_os = "emscripten"))] +mod desktop; +#[cfg(target_os = "emscripten")] +mod browser; + +#[cfg(target_os = "emscripten")] +pub use self::browser::WsConfig; +#[cfg(not(target_os = "emscripten"))] +pub use self::desktop::WsConfig; From 2783c5713e04557b3bef9b68e174c3ba5e71cc25 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 10 Jan 2018 18:24:58 +0100 Subject: [PATCH 2/5] Fix concerns --- libp2p-tcp-transport/src/lib.rs | 1 - libp2p-websocket/README.md | 8 +++++--- libp2p-websocket/src/browser.rs | 36 ++++++++++++++++----------------- libp2p-websocket/src/lib.rs | 13 ++++++++---- 4 files changed, 32 insertions(+), 26 deletions(-) diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index db341307..f7e9ed85 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -109,7 +109,6 @@ impl Transport for TcpConfig { let future = future::result(listener).map(|listener| { // Pull out a stream of sockets for incoming connections listener.incoming().map(|(sock, addr)| { - println!("incoming tcp stream"); let addr = addr.to_multiaddr() .expect("generating a multiaddr from a socket addr never fails"); (Ok(sock).into_future(), addr) diff --git a/libp2p-websocket/README.md b/libp2p-websocket/README.md index 3f3b762d..04d8c86f 100644 --- a/libp2p-websocket/README.md +++ b/libp2p-websocket/README.md @@ -18,7 +18,7 @@ which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Jav use libp2p_websocket::WsConfig; let ws_config = WsConfig::new(); -// let _ = ws_config.dial(Multiaddr::new("/ip4/40.41.42.43/tcp/12345/ws").unwrap()); +// let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); ``` # Other operating systems @@ -29,15 +29,17 @@ plugged on top of another implementation of `Transport` such as TCP/IP. This underlying transport must be passed to the `WsConfig::new()` function. ```rust +extern crate libp2p_swarm; extern crate libp2p_tcp_transport; extern crate libp2p_websocket; extern crate tokio_core; -use libp2p_websocket::WsConfig; +use libp2p_swarm::{Multiaddr, Transport}; use libp2p_tcp_transport::TcpConfig; +use libp2p_websocket::WsConfig; use tokio_core::reactor::Core; let core = Core::new().unwrap(); let ws_config = WsConfig::new(TcpConfig::new(core.handle())); -// let _ = ws_config.dial(Multiaddr::new("/ip4/40.41.42.43/tcp/12345/ws").unwrap()); +let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); ``` diff --git a/libp2p-websocket/src/browser.rs b/libp2p-websocket/src/browser.rs index fe327c02..fa95a5ca 100644 --- a/libp2p-websocket/src/browser.rs +++ b/libp2p-websocket/src/browser.rs @@ -49,10 +49,10 @@ impl WsConfig { } impl Transport for WsConfig { - type RawConn = Connec; + type RawConn = WsConn; type Listener = Box>; // TODO: use `!` type ListenerUpgrade = Box>; // TODO: use `!` - type Dial = FutureThen>, Result, fn(Result, oneshot::Canceled>) -> Result>; + type Dial = FutureThen>, Result, fn(Result, oneshot::Canceled>) -> Result>; #[inline] fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -75,7 +75,7 @@ impl Transport for WsConfig { // Create the JS `WebSocket` object. let websocket = { - let val = js!{ + let val = js! { try { return new WebSocket(@{inner_addr}); } catch(e) { @@ -107,10 +107,10 @@ impl Transport for WsConfig { } }; - // Create a `open` channel that will be used to communicate the `Connec` that represents + // Create a `open` channel that will be used to communicate the `WsConn` that represents // the open dialing websocket. Also create a `open_cb` callback that will be used for the // `open` message of the websocket. - let (open_tx, open_rx) = oneshot::channel::>(); + let (open_tx, open_rx) = oneshot::channel::>(); let open_tx = Arc::new(Mutex::new(Some(open_tx))); let websocket_clone = websocket.clone(); let open_cb = { @@ -124,18 +124,18 @@ impl Transport for WsConfig { // is not supposed to happen. let message_rx = message_rx.take().expect("the websocket can only open once"); - // Send a `Connec` to the future that was returned by `dial`. Ignoring errors that + // Send a `WsConn` to the future that was returned by `dial`. Ignoring errors that // would happen the future has been dropped by the user. let _ = tx - .send(Ok(Connec { + .send(Ok(WsConn { websocket: websocket_clone.clone(), incoming_data: RwStreamSink::new(message_rx.then(|result| { // An `Err` happens here if `message_tx` has been dropped. However // `message_tx` is grabbed by the websocket, which stays alive for as - // long as the `Connec` is alive. + // long as the `WsConn` is alive. match result { Ok(r) => r, - Err(_) => unreachable!("the message channel outlives the Connec") + Err(_) => unreachable!("the message channel outlives the WsConn") } })), })); @@ -155,7 +155,7 @@ impl Transport for WsConfig { "close event on the websocket"))); }; - js!{ + js! { var socket = @{websocket}; var open_cb = @{open_cb}; var message_cb = @{message_cb}; @@ -191,7 +191,7 @@ impl Transport for WsConfig { } } -pub struct Connec { +pub struct WsConn { websocket: Reference, // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. incoming_data: RwStreamSink, IoError>>, @@ -200,38 +200,38 @@ pub struct Connec { >>, } -impl Drop for Connec { +impl Drop for WsConn { #[inline] fn drop(&mut self) { // TODO: apparently there's a memory leak related to callbacks? - js!{ @{&self.websocket}.close(); } + js! { @{&self.websocket}.close(); } } } -impl AsyncRead for Connec { +impl AsyncRead for WsConn { } -impl Read for Connec { +impl Read for WsConn { #[inline] fn read(&mut self, buf: &mut [u8]) -> Result { self.incoming_data.read(buf) } } -impl AsyncWrite for Connec { +impl AsyncWrite for WsConn { #[inline] fn shutdown(&mut self) -> Poll<(), IoError> { Ok(Async::Ready(())) } } -impl Write for Connec { +impl Write for WsConn { fn write(&mut self, buf: &[u8]) -> Result { let typed_array = TypedArray::from(buf); // `send` can throw if the websocket isn't open (which can happen if it was closed by the // remote). - let returned = js!{ + let returned = js! { try { @{&self.websocket}.send(@{typed_array.buffer()}); return true; diff --git a/libp2p-websocket/src/lib.rs b/libp2p-websocket/src/lib.rs index 6798035d..175916c6 100644 --- a/libp2p-websocket/src/lib.rs +++ b/libp2p-websocket/src/lib.rs @@ -43,7 +43,7 @@ //! use libp2p_websocket::WsConfig; //! //! let ws_config = WsConfig::new(); -//! // let _ = ws_config.dial(Multiaddr::new("/ip4/40.41.42.43/tcp/12345/ws").unwrap()); +//! // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); //! ``` //! //! # Other operating systems @@ -53,18 +53,23 @@ //! //! This underlying transport must be passed to the `WsConfig::new()` function. //! -//! ```ignore +//! ``` +//! extern crate libp2p_swarm; //! extern crate libp2p_tcp_transport; //! extern crate libp2p_websocket; //! extern crate tokio_core; //! -//! use libp2p_websocket::WsConfig; +//! use libp2p_swarm::{Multiaddr, Transport}; //! use libp2p_tcp_transport::TcpConfig; +//! use libp2p_websocket::WsConfig; //! use tokio_core::reactor::Core; //! +//! # fn main() { //! let core = Core::new().unwrap(); //! let ws_config = WsConfig::new(TcpConfig::new(core.handle())); -//! // let _ = ws_config.dial(Multiaddr::new("/ip4/40.41.42.43/tcp/12345/ws").unwrap()); +//! # return; +//! let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); +//! # } //! ``` //! From b8829d7cb1bc5c929445b3512bc9b566e7e298c2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 11 Jan 2018 10:51:13 +0100 Subject: [PATCH 3/5] More concerns --- libp2p-websocket/README.md | 11 +++++----- libp2p-websocket/src/browser.rs | 38 ++++++++++++++++----------------- libp2p-websocket/src/desktop.rs | 22 +++++++++---------- libp2p-websocket/src/lib.rs | 13 +++++------ rust-multiaddr/src/lib.rs | 17 +++++++++++++++ 5 files changed, 59 insertions(+), 42 deletions(-) diff --git a/libp2p-websocket/README.md b/libp2p-websocket/README.md index 04d8c86f..3f5f4e69 100644 --- a/libp2p-websocket/README.md +++ b/libp2p-websocket/README.md @@ -8,16 +8,16 @@ or for a different operating system. # Emscripten -On emscripten, you can create a `WsConfig` object with `WsConfig::new()`. It can then be used -as a transport. +On emscripten, you can create a `BrowserWsConfig` object with `BrowserWsConfig::new()`. It can +then be used as a transport. Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. ```rust -use libp2p_websocket::WsConfig; +use libp2p_websocket::BrowserWsConfig; -let ws_config = WsConfig::new(); +let ws_config = BrowserWsConfig::new(); // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); ``` @@ -26,7 +26,8 @@ let ws_config = WsConfig::new(); On other operating systems, this library doesn't open any socket by itself. Instead it must be plugged on top of another implementation of `Transport` such as TCP/IP. -This underlying transport must be passed to the `WsConfig::new()` function. +This underlying transport must be put inside a `WsConfig` object through the +`WsConfig::new()` function. ```rust extern crate libp2p_swarm; diff --git a/libp2p-websocket/src/browser.rs b/libp2p-websocket/src/browser.rs index fa95a5ca..71132ebb 100644 --- a/libp2p-websocket/src/browser.rs +++ b/libp2p-websocket/src/browser.rs @@ -38,21 +38,21 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// /// > **Note**: The `/wss` protocol isn't supported. #[derive(Debug, Clone)] -pub struct WsConfig; +pub struct BrowserWsConfig; -impl WsConfig { +impl BrowserWsConfig { /// Creates a new configuration object for websocket. #[inline] - pub fn new() -> WsConfig { - WsConfig + pub fn new() -> BrowserWsConfig { + BrowserWsConfig } } -impl Transport for WsConfig { - type RawConn = WsConn; +impl Transport for BrowserWsConfig { + type RawConn = BrowserWsConn; type Listener = Box>; // TODO: use `!` type ListenerUpgrade = Box>; // TODO: use `!` - type Dial = FutureThen>, Result, fn(Result, oneshot::Canceled>) -> Result>; + type Dial = FutureThen>, Result, fn(Result, oneshot::Canceled>) -> Result>; #[inline] fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -107,10 +107,10 @@ impl Transport for WsConfig { } }; - // Create a `open` channel that will be used to communicate the `WsConn` that represents + // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents // the open dialing websocket. Also create a `open_cb` callback that will be used for the // `open` message of the websocket. - let (open_tx, open_rx) = oneshot::channel::>(); + let (open_tx, open_rx) = oneshot::channel::>(); let open_tx = Arc::new(Mutex::new(Some(open_tx))); let websocket_clone = websocket.clone(); let open_cb = { @@ -124,18 +124,18 @@ impl Transport for WsConfig { // is not supposed to happen. let message_rx = message_rx.take().expect("the websocket can only open once"); - // Send a `WsConn` to the future that was returned by `dial`. Ignoring errors that + // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that // would happen the future has been dropped by the user. let _ = tx - .send(Ok(WsConn { + .send(Ok(BrowserWsConn { websocket: websocket_clone.clone(), incoming_data: RwStreamSink::new(message_rx.then(|result| { // An `Err` happens here if `message_tx` has been dropped. However // `message_tx` is grabbed by the websocket, which stays alive for as - // long as the `WsConn` is alive. + // long as the `BrowserWsConn` is alive. match result { Ok(r) => r, - Err(_) => unreachable!("the message channel outlives the WsConn") + Err(_) => unreachable!("the message channel outlives the BrowserWsConn") } })), })); @@ -191,7 +191,7 @@ impl Transport for WsConfig { } } -pub struct WsConn { +pub struct BrowserWsConn { websocket: Reference, // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. incoming_data: RwStreamSink, IoError>>, @@ -200,7 +200,7 @@ pub struct WsConn { >>, } -impl Drop for WsConn { +impl Drop for BrowserWsConn { #[inline] fn drop(&mut self) { // TODO: apparently there's a memory leak related to callbacks? @@ -208,24 +208,24 @@ impl Drop for WsConn { } } -impl AsyncRead for WsConn { +impl AsyncRead for BrowserWsConn { } -impl Read for WsConn { +impl Read for BrowserWsConn { #[inline] fn read(&mut self, buf: &mut [u8]) -> Result { self.incoming_data.read(buf) } } -impl AsyncWrite for WsConn { +impl AsyncWrite for BrowserWsConn { #[inline] fn shutdown(&mut self) -> Poll<(), IoError> { Ok(Async::Ready(())) } } -impl Write for WsConn { +impl Write for BrowserWsConn { fn write(&mut self, buf: &[u8]) -> Result { let typed_array = TypedArray::from(buf); diff --git a/libp2p-websocket/src/desktop.rs b/libp2p-websocket/src/desktop.rs index b7a37ef5..8299065b 100644 --- a/libp2p-websocket/src/desktop.rs +++ b/libp2p-websocket/src/desktop.rs @@ -74,11 +74,9 @@ where }; let (inner_listen, new_addr) = match self.transport.listen_on(inner_addr) { - Ok((listen, inner_new_addr)) => { + Ok((listen, mut new_addr)) => { // Need to suffix `/ws` to the listening address. - let new_addr = inner_new_addr - .encapsulate("/ws") - .expect("the /ws suffix is always valid"); + new_addr.append(AddrComponent::WS); (listen, new_addr) } Err((transport, _)) => { @@ -91,11 +89,9 @@ where } }; - let listen = inner_listen.map::<_, fn(_) -> _>(|(stream, client_addr)| { + let listen = inner_listen.map::<_, fn(_) -> _>(|(stream, mut client_addr)| { // Need to suffix `/ws` to each client address. - let client_addr = client_addr - .encapsulate("/ws") - .expect("the /ws suffix is always valid"); + client_addr.append(AddrComponent::WS); // Upgrade the listener to websockets like the websockets library requires us to do. let upgraded = stream.and_then(|stream| { @@ -118,8 +114,9 @@ where OwnedMessage::Binary(data) => Ok(Some(data)), OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), OwnedMessage::Close(_) => Ok(None), - // TODO: pings and pongs ; freaking hard - _ => unimplemented!() + // TODO: handle pings and pongs, which is freaking hard + // for now we close the socket when that happens + _ => Ok(None) } }) // TODO: is there a way to merge both lines into one? @@ -181,8 +178,9 @@ where OwnedMessage::Binary(data) => Ok(data), OwnedMessage::Text(data) => Ok(data.into_bytes()), // TODO: pings and pongs and close messages need to be - // answered ; and this is really hard - _ => unimplemented!(), + // answered ; and this is really hard ; for now we produce + // an error when that happens + _ => Err(IoError::new(IoErrorKind::Other, "unimplemented")), } }); let read_write = RwStreamSink::new(framed_data); diff --git a/libp2p-websocket/src/lib.rs b/libp2p-websocket/src/lib.rs index 175916c6..148f9b75 100644 --- a/libp2p-websocket/src/lib.rs +++ b/libp2p-websocket/src/lib.rs @@ -33,16 +33,16 @@ //! //! # Emscripten //! -//! On emscripten, you can create a `WsConfig` object with `WsConfig::new()`. It can then be used -//! as a transport. +//! On emscripten, you can create a `BrowserWsConfig` object with `BrowserWsConfig::new()`. It can +//! then be used as a transport. //! //! Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress //! which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. //! //! ```ignore -//! use libp2p_websocket::WsConfig; +//! use libp2p_websocket::BrowserWsConfig; //! -//! let ws_config = WsConfig::new(); +//! let ws_config = BrowserWsConfig::new(); //! // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); //! ``` //! @@ -51,7 +51,8 @@ //! On other operating systems, this library doesn't open any socket by itself. Instead it must be //! plugged on top of another implementation of `Transport` such as TCP/IP. //! -//! This underlying transport must be passed to the `WsConfig::new()` function. +//! This underlying transport must be put inside a `WsConfig` object through the +//! `WsConfig::new()` function. //! //! ``` //! extern crate libp2p_swarm; @@ -91,6 +92,6 @@ mod desktop; mod browser; #[cfg(target_os = "emscripten")] -pub use self::browser::WsConfig; +pub use self::browser::{BrowserWsConfig, BrowserWsConn}; #[cfg(not(target_os = "emscripten"))] pub use self::desktop::WsConfig; diff --git a/rust-multiaddr/src/lib.rs b/rust-multiaddr/src/lib.rs index f80adb2d..25bb6197 100644 --- a/rust-multiaddr/src/lib.rs +++ b/rust-multiaddr/src/lib.rs @@ -102,6 +102,23 @@ impl Multiaddr { Ok(Multiaddr { bytes: bytes }) } + /// Adds an already-parsed address component to the end of this multiaddr. + /// + /// # Examples + /// + /// ``` + /// use multiaddr::{Multiaddr, AddrComponent}; + /// + /// let mut address: Multiaddr = "/ip4/127.0.0.1".parse().unwrap(); + /// address.append(AddrComponent::TCP(10000)); + /// assert_eq!(address, "/ip4/127.0.0.1/tcp/10000".parse().unwrap()); + /// ``` + /// + #[inline] + pub fn append(&mut self, component: AddrComponent) { + component.write_bytes(&mut self.bytes).expect("writing to a Vec never fails") + } + /// Remove the outermost address. /// /// # Examples From 6837a3928d6fc3cd43970dd114ee4be72c14f268 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 11 Jan 2018 11:11:49 +0100 Subject: [PATCH 4/5] Rustfmt and use tabs --- libp2p-websocket/src/browser.rs | 441 +++++++++++++++++--------------- libp2p-websocket/src/desktop.rs | 30 +-- 2 files changed, 246 insertions(+), 225 deletions(-) diff --git a/libp2p-websocket/src/browser.rs b/libp2p-websocket/src/browser.rs index 71132ebb..0440207b 100644 --- a/libp2p-websocket/src/browser.rs +++ b/libp2p-websocket/src/browser.rs @@ -1,31 +1,31 @@ // Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in +// The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::io::{Read, Write}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::sync::{Arc, Mutex}; -use futures::{Future, Stream, Poll, Async, Then as FutureThen}; -use futures::sync::{oneshot, mpsc}; +use futures::{Async, Future, Poll, Stream, Then as FutureThen}; use futures::stream::Then as StreamThen; -use multiaddr::{Multiaddr, AddrComponent}; +use futures::sync::{mpsc, oneshot}; +use multiaddr::{AddrComponent, Multiaddr}; use rw_stream_sink::RwStreamSink; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::io::{Read, Write}; +use std::sync::{Arc, Mutex}; use stdweb::{self, Reference}; use stdweb::web::TypedArray; use swarm::Transport; @@ -41,237 +41,258 @@ use tokio_io::{AsyncRead, AsyncWrite}; pub struct BrowserWsConfig; impl BrowserWsConfig { - /// Creates a new configuration object for websocket. - #[inline] - pub fn new() -> BrowserWsConfig { - BrowserWsConfig - } + /// Creates a new configuration object for websocket. + #[inline] + pub fn new() -> BrowserWsConfig { + BrowserWsConfig + } } impl Transport for BrowserWsConfig { - type RawConn = BrowserWsConn; - type Listener = Box>; // TODO: use `!` - type ListenerUpgrade = Box>; // TODO: use `!` - type Dial = FutureThen>, Result, fn(Result, oneshot::Canceled>) -> Result>; + type RawConn = BrowserWsConn; + type Listener = Box>; // TODO: use `!` + type ListenerUpgrade = Box>; // TODO: use `!` + type Dial = FutureThen< + oneshot::Receiver>, + Result, + fn(Result, oneshot::Canceled>) + -> Result, + >; - #[inline] - fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - // Listening is never supported. - Err((self, a)) - } + #[inline] + fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + // Listening is never supported. + Err((self, a)) + } - fn dial(self, original_addr: Multiaddr) -> Result { - // Making sure we are initialized before we dial. Initialization is protected by a simple - // boolean static variable, so it's not a problem to call it multiple times and the cost - // is negligible. - stdweb::initialize(); + fn dial(self, original_addr: Multiaddr) -> Result { + // Making sure we are initialized before we dial. Initialization is protected by a simple + // boolean static variable, so it's not a problem to call it multiple times and the cost + // is negligible. + stdweb::initialize(); - // Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as - // a string) on success. - let inner_addr = match multiaddr_to_target(&original_addr) { - Ok(a) => a, - Err(_) => return Err((self, original_addr)), - }; + // Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as + // a string) on success. + let inner_addr = match multiaddr_to_target(&original_addr) { + Ok(a) => a, + Err(_) => return Err((self, original_addr)), + }; - // Create the JS `WebSocket` object. - let websocket = { - let val = js! { - try { - return new WebSocket(@{inner_addr}); - } catch(e) { - return false; - } - }; - match val.into_reference() { - Some(ws) => ws, - None => return Err((self, original_addr)), // `false` was returned by `js!` - } - }; + // Create the JS `WebSocket` object. + let websocket = { + let val = js! { + try { + return new WebSocket(@{inner_addr}); + } catch(e) { + return false; + } + }; + match val.into_reference() { + Some(ws) => ws, + None => return Err((self, original_addr)), // `false` was returned by `js!` + } + }; - // Create a `message` channel that will be used for both bytes messages and errors, and a - // `message_cb` used for the `message` event on the WebSocket. - // `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed - // by `open_cb`. - let (message_tx, message_rx) = mpsc::unbounded::, IoError>>(); - let message_tx = Arc::new(message_tx); - let mut message_rx = Some(message_rx); - let message_cb = { - let message_tx = message_tx.clone(); - move |message_data: Reference| { - if let Some(buffer) = message_data.downcast::>() { - let _ = message_tx.unbounded_send(Ok(buffer.to_vec())); - } else { - let _ = message_tx.unbounded_send(Err(IoError::new(IoErrorKind::InvalidData, - "received ws message of unknown type"))); - } - } - }; + // Create a `message` channel that will be used for both bytes messages and errors, and a + // `message_cb` used for the `message` event on the WebSocket. + // `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed + // by `open_cb`. + let (message_tx, message_rx) = mpsc::unbounded::, IoError>>(); + let message_tx = Arc::new(message_tx); + let mut message_rx = Some(message_rx); + let message_cb = { + let message_tx = message_tx.clone(); + move |message_data: Reference| { + if let Some(buffer) = message_data.downcast::>() { + let _ = message_tx.unbounded_send(Ok(buffer.to_vec())); + } else { + let _ = message_tx.unbounded_send(Err(IoError::new( + IoErrorKind::InvalidData, + "received ws message of unknown type", + ))); + } + } + }; - // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents - // the open dialing websocket. Also create a `open_cb` callback that will be used for the - // `open` message of the websocket. - let (open_tx, open_rx) = oneshot::channel::>(); - let open_tx = Arc::new(Mutex::new(Some(open_tx))); - let websocket_clone = websocket.clone(); - let open_cb = { - let open_tx = open_tx.clone(); - move || { - // Note that `open_tx` can be empty (and a panic happens) if the `open` event - // is triggered twice, or is triggered after the `close` event. We never reuse the - // same websocket twice, so this is not supposed to happen. - let tx = open_tx.lock().unwrap().take().expect("the websocket can only open once"); - // `message_rx` can be empty if the `open` event is triggered twice, which again - // is not supposed to happen. - let message_rx = message_rx.take().expect("the websocket can only open once"); + // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents + // the open dialing websocket. Also create a `open_cb` callback that will be used for the + // `open` message of the websocket. + let (open_tx, open_rx) = oneshot::channel::>(); + let open_tx = Arc::new(Mutex::new(Some(open_tx))); + let websocket_clone = websocket.clone(); + let open_cb = { + let open_tx = open_tx.clone(); + move || { + // Note that `open_tx` can be empty (and a panic happens) if the `open` event + // is triggered twice, or is triggered after the `close` event. We never reuse the + // same websocket twice, so this is not supposed to happen. + let tx = open_tx + .lock() + .unwrap() + .take() + .expect("the websocket can only open once"); + // `message_rx` can be empty if the `open` event is triggered twice, which again + // is not supposed to happen. + let message_rx = message_rx.take().expect("the websocket can only open once"); - // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that - // would happen the future has been dropped by the user. - let _ = tx - .send(Ok(BrowserWsConn { - websocket: websocket_clone.clone(), - incoming_data: RwStreamSink::new(message_rx.then(|result| { - // An `Err` happens here if `message_tx` has been dropped. However - // `message_tx` is grabbed by the websocket, which stays alive for as - // long as the `BrowserWsConn` is alive. - match result { - Ok(r) => r, - Err(_) => unreachable!("the message channel outlives the BrowserWsConn") - } - })), - })); - } - }; + // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that + // would happen the future has been dropped by the user. + let _ = tx.send(Ok(BrowserWsConn { + websocket: websocket_clone.clone(), + incoming_data: RwStreamSink::new(message_rx.then(|result| { + // An `Err` happens here if `message_tx` has been dropped. However + // `message_tx` is grabbed by the websocket, which stays alive for as + // long as the `BrowserWsConn` is alive. + match result { + Ok(r) => r, + Err(_) => { + unreachable!("the message channel outlives the BrowserWsConn") + } + } + })), + })); + } + }; - // Used for the `close` message of the websocket. - // The websocket can be closed either before or after being opened, so we send an error - // to both the `open` and `message` channels if that happens. - let close_cb = move || { - if let Some(tx) = open_tx.lock().unwrap().take() { - let _ = tx.send(Err(IoError::new(IoErrorKind::ConnectionRefused, - "close event on the websocket"))); - } + // Used for the `close` message of the websocket. + // The websocket can be closed either before or after being opened, so we send an error + // to both the `open` and `message` channels if that happens. + let close_cb = move || { + if let Some(tx) = open_tx.lock().unwrap().take() { + let _ = tx.send(Err(IoError::new( + IoErrorKind::ConnectionRefused, + "close event on the websocket", + ))); + } - let _ = message_tx.unbounded_send(Err(IoError::new(IoErrorKind::ConnectionRefused, - "close event on the websocket"))); - }; + let _ = message_tx.unbounded_send(Err(IoError::new( + IoErrorKind::ConnectionRefused, + "close event on the websocket", + ))); + }; - js! { - var socket = @{websocket}; - var open_cb = @{open_cb}; - var message_cb = @{message_cb}; - var close_cb = @{close_cb}; - socket.addEventListener("open", function(event) { - open_cb(); - }); - socket.addEventListener("message", function(event) { - var reader = new FileReader(); - reader.addEventListener("loadend", function() { - var typed = new Uint8Array(reader.result); - message_cb(typed); - }); - reader.readAsArrayBuffer(event.data); - }); - socket.addEventListener("close", function(event) { - close_cb(); - }); - }; + js! { + var socket = @{websocket}; + var open_cb = @{open_cb}; + var message_cb = @{message_cb}; + var close_cb = @{close_cb}; + socket.addEventListener("open", function(event) { + open_cb(); + }); + socket.addEventListener("message", function(event) { + var reader = new FileReader(); + reader.addEventListener("loadend", function() { + var typed = new Uint8Array(reader.result); + message_cb(typed); + }); + reader.readAsArrayBuffer(event.data); + }); + socket.addEventListener("close", function(event) { + close_cb(); + }); + }; - Ok(open_rx.then(|result| { - match result { - Ok(Ok(r)) => Ok(r), - Ok(Err(e)) => Err(e), - // `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by - // the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself - // captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should - // never be destroyed. - // TODO: how do we break this cyclic dependency? difficult question - Err(_) => unreachable!("the sending side will only close when we drop the future") - } - })) - } + Ok(open_rx.then(|result| { + match result { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) => Err(e), + // `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by + // the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself + // captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should + // never be destroyed. + // TODO: how do we break this cyclic dependency? difficult question + Err(_) => unreachable!("the sending side will only close when we drop the future"), + } + })) + } } pub struct BrowserWsConn { - websocket: Reference, - // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. - incoming_data: RwStreamSink, IoError>>, - fn(Result, IoError>, ()>) -> Result, IoError>, - Result, IoError> - >>, + websocket: Reference, + // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. + incoming_data: RwStreamSink< + StreamThen< + mpsc::UnboundedReceiver, IoError>>, + fn(Result, IoError>, ()>) -> Result, IoError>, + Result, IoError>, + >, + >, } impl Drop for BrowserWsConn { - #[inline] - fn drop(&mut self) { - // TODO: apparently there's a memory leak related to callbacks? - js! { @{&self.websocket}.close(); } - } + #[inline] + fn drop(&mut self) { + // TODO: apparently there's a memory leak related to callbacks? + js! { @{&self.websocket}.close(); } + } } -impl AsyncRead for BrowserWsConn { -} +impl AsyncRead for BrowserWsConn {} impl Read for BrowserWsConn { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { - self.incoming_data.read(buf) - } + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.incoming_data.read(buf) + } } impl AsyncWrite for BrowserWsConn { - #[inline] - fn shutdown(&mut self) -> Poll<(), IoError> { - Ok(Async::Ready(())) - } + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + Ok(Async::Ready(())) + } } impl Write for BrowserWsConn { - fn write(&mut self, buf: &[u8]) -> Result { - let typed_array = TypedArray::from(buf); + fn write(&mut self, buf: &[u8]) -> Result { + let typed_array = TypedArray::from(buf); - // `send` can throw if the websocket isn't open (which can happen if it was closed by the - // remote). - let returned = js! { - try { - @{&self.websocket}.send(@{typed_array.buffer()}); - return true; - } catch(e) { - return false; - } - }; + // `send` can throw if the websocket isn't open (which can happen if it was closed by the + // remote). + let returned = js! { + try { + @{&self.websocket}.send(@{typed_array.buffer()}); + return true; + } catch(e) { + return false; + } + }; - match returned { - stdweb::Value::Bool(true) => Ok(buf.len()), - stdweb::Value::Bool(false) => Err(IoError::new(IoErrorKind::BrokenPipe, "websocket has been closed by the remote")), - _ => unreachable!() - } - } + match returned { + stdweb::Value::Bool(true) => Ok(buf.len()), + stdweb::Value::Bool(false) => Err(IoError::new( + IoErrorKind::BrokenPipe, + "websocket has been closed by the remote", + )), + _ => unreachable!(), + } + } - #[inline] - fn flush(&mut self) -> Result<(), IoError> { - // Everything is always considered flushed. - Ok(()) - } + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + // Everything is always considered flushed. + Ok(()) + } } // Tries to interpret the `Multiaddr` as a `/ipN/.../tcp/.../ws` multiaddress, and if so returns // the corresponding `ws://.../` URL. fn multiaddr_to_target(addr: &Multiaddr) -> Result { - let protocols: Vec<_> = addr.iter().collect(); + let protocols: Vec<_> = addr.iter().collect(); - if protocols.len() != 3 { - return Err(()); - } + if protocols.len() != 3 { + return Err(()); + } - match (&protocols[0], &protocols[1], &protocols[2]) { - (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { - Ok(format!("ws://{}:{}/", ip, port)) - } - (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { - Ok(format!("ws://[{}]:{}/", ip, port)) - } - _ => Err(()), - } + match (&protocols[0], &protocols[1], &protocols[2]) { + (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { + Ok(format!("ws://{}:{}/", ip, port)) + } + (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { + Ok(format!("ws://[{}]:{}/", ip, port)) + } + _ => Err(()), + } } // TODO: write tests (tests are very difficult to write with emscripten) diff --git a/libp2p-websocket/src/desktop.rs b/libp2p-websocket/src/desktop.rs index 8299065b..cdb082f0 100644 --- a/libp2p-websocket/src/desktop.rs +++ b/libp2p-websocket/src/desktop.rs @@ -106,22 +106,22 @@ where .map(|(client, _http_headers)| { // Plug our own API on top of the `websockets` API. let framed_data = client - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) - .with(|data| Ok(OwnedMessage::Binary(data))) - .and_then(|recv| { - match recv { - OwnedMessage::Binary(data) => Ok(Some(data)), - OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), - OwnedMessage::Close(_) => Ok(None), - // TODO: handle pings and pongs, which is freaking hard + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) + .with(|data| Ok(OwnedMessage::Binary(data))) + .and_then(|recv| { + match recv { + OwnedMessage::Binary(data) => Ok(Some(data)), + OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), + OwnedMessage::Close(_) => Ok(None), + // TODO: handle pings and pongs, which is freaking hard // for now we close the socket when that happens - _ => Ok(None) - } - }) - // TODO: is there a way to merge both lines into one? - .take_while(|v| Ok(v.is_some())) - .map(|v| v.expect("we only take while this is Some")); + _ => Ok(None) + } + }) + // TODO: is there a way to merge both lines into one? + .take_while(|v| Ok(v.is_some())) + .map(|v| v.expect("we only take while this is Some")); let read_write = RwStreamSink::new(framed_data); Box::new(read_write) as Box From 6c737c61bf615138fd4afa86caa1d12df29ea75b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 11 Jan 2018 16:06:11 +0100 Subject: [PATCH 5/5] Add support for WSS for dialing --- libp2p-websocket/Cargo.toml | 2 +- libp2p-websocket/src/browser.rs | 8 ++++++-- libp2p-websocket/src/desktop.rs | 15 ++++++++------- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/libp2p-websocket/Cargo.toml b/libp2p-websocket/Cargo.toml index 1561b3a6..52144403 100644 --- a/libp2p-websocket/Cargo.toml +++ b/libp2p-websocket/Cargo.toml @@ -11,7 +11,7 @@ rw-stream-sink = { path = "../rw-stream-sink" } tokio-io = "0.1" [target.'cfg(not(target_os = "emscripten"))'.dependencies] -websocket = { version = "0.20.2", default-features = false, features = ["async"] } +websocket = { version = "0.20.2", default-features = false, features = ["async", "async-ssl"] } [target.'cfg(target_os = "emscripten")'.dependencies] stdweb = { version = "0.1.3", default-features = false } diff --git a/libp2p-websocket/src/browser.rs b/libp2p-websocket/src/browser.rs index 0440207b..fe911ec8 100644 --- a/libp2p-websocket/src/browser.rs +++ b/libp2p-websocket/src/browser.rs @@ -35,8 +35,6 @@ use tokio_io::{AsyncRead, AsyncWrite}; /// /// This implementation of `Transport` accepts any address that looks like /// `/ip4/.../tcp/.../ws` or `/ip6/.../tcp/.../ws`, and connect to the corresponding IP and port. -/// -/// > **Note**: The `/wss` protocol isn't supported. #[derive(Debug, Clone)] pub struct BrowserWsConfig; @@ -291,6 +289,12 @@ fn multiaddr_to_target(addr: &Multiaddr) -> Result { (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { Ok(format!("ws://[{}]:{}/", ip, port)) } + (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WSS) => { + Ok(format!("wss://{}:{}/", ip, port)) + } + (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WSS) => { + Ok(format!("wss://[{}]:{}/", ip, port)) + } _ => Err(()), } } diff --git a/libp2p-websocket/src/desktop.rs b/libp2p-websocket/src/desktop.rs index cdb082f0..38b9f65d 100644 --- a/libp2p-websocket/src/desktop.rs +++ b/libp2p-websocket/src/desktop.rs @@ -31,10 +31,10 @@ use websocket::stream::async::Stream as AsyncStream; /// Represents the configuration for a websocket transport capability for libp2p. Must be put on /// top of another `Transport`. /// -/// This implementation of `Transport` accepts any address that ends with `/ws`, and will try to -/// pass the underlying multiaddress to the underlying `Transport`. +/// This implementation of `Transport` accepts any address that ends with `/ws` or `/wss`, and will +/// try to pass the underlying multiaddress to the underlying `Transport`. /// -/// > **Note**: The `/wss` protocol isn't supported. +/// > **Note**: `/wss` is only supported for dialing and not listening. #[derive(Debug, Clone)] pub struct WsConfig { transport: T, @@ -143,8 +143,9 @@ where fn dial(self, original_addr: Multiaddr) -> Result { let mut inner_addr = original_addr.clone(); - match inner_addr.pop() { - Some(AddrComponent::WS) => {} + let is_wss = match inner_addr.pop() { + Some(AddrComponent::WS) => false, + Some(AddrComponent::WSS) => true, _ => return Err((self, original_addr)), }; @@ -160,10 +161,10 @@ where } }; - let dial = inner_dial.into_future().and_then(|connec| { + let dial = inner_dial.into_future().and_then(move |connec| { // We pass a dummy address to `ClientBuilder` because it is never used anywhere // in the negotiation anyway, and we use `async_connect_on` to pass a stream. - ClientBuilder::new("ws://127.0.0.1:80") + ClientBuilder::new(if is_wss { "wss://127.0.0.1" } else { "ws://127.0.0.1" }) .expect("hard-coded ws address is always valid") .async_connect_on(connec) .map_err(|err| IoError::new(IoErrorKind::Other, err))