diff --git a/Cargo.toml b/Cargo.toml index d1ec4f90..059be1bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "libp2p-secio", "libp2p-swarm", "libp2p-tcp-transport", + "libp2p-websocket", "multistream-select", "datastore", "futures-mutex", diff --git a/README.md b/README.md index c025f7d7..4c6f84a0 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Architecture of the crates of this repository: `ConnectionUpgrade` trait of `libp2p-swarm`. - `libp2p-swarm`: Core library that contains all the traits of *libp2p* and plugs things together. - `libp2p-tcp-transport`: Implementation of the `Transport` trait of `libp2p-swarm` for TCP/IP. +- `libp2p-websocket`: Implementation of the `Transport` trait of `libp2p-swarm` for Websockets. - `multistream-select`: Implementation of the `multistream-select` protocol, which is used to negotiate a protocol over a newly-established connection with a peer, or after a connection upgrade. diff --git a/example/Cargo.toml b/example/Cargo.toml index 4ae24fc3..79f9e751 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -10,5 +10,6 @@ multiplex = { path = "../multiplex-rs" } libp2p-secio = { path = "../libp2p-secio" } libp2p-swarm = { path = "../libp2p-swarm" } libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +libp2p-websocket = { path = "../libp2p-websocket" } tokio-core = "0.1" tokio-io = "0.1" diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index 85023651..c5103a86 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -23,6 +23,7 @@ extern crate futures; extern crate libp2p_secio as secio; extern crate libp2p_swarm as swarm; extern crate libp2p_tcp_transport as tcp; +extern crate libp2p_websocket as websocket; extern crate multiplex; extern crate tokio_core; extern crate tokio_io; @@ -34,6 +35,7 @@ use swarm::{UpgradeExt, SimpleProtocol, Transport, DeniedConnectionUpgrade}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; +use websocket::WsConfig; fn main() { // Determine which address to dial. @@ -46,6 +48,11 @@ fn main() { // We start by creating a `TcpConfig` that indicates that we want TCP/IP. let transport = TcpConfig::new(core.handle()) + // In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP. + // The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be + // used for the underlying multiaddress. + .or_transport(WsConfig::new(TcpConfig::new(core.handle()))) + // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. .with_upgrade({ diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index d185c364..995b8c03 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -23,6 +23,7 @@ extern crate futures; extern crate libp2p_secio as secio; extern crate libp2p_swarm as swarm; extern crate libp2p_tcp_transport as tcp; +extern crate libp2p_websocket as websocket; extern crate multiplex; extern crate tokio_core; extern crate tokio_io; @@ -34,6 +35,7 @@ use swarm::{Transport, UpgradeExt, SimpleProtocol}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; +use websocket::WsConfig; fn main() { // Determine which address to listen to. @@ -45,6 +47,10 @@ fn main() { // Now let's build the transport stack. // We start by creating a `TcpConfig` that indicates that we want TCP/IP. let transport = TcpConfig::new(core.handle()) + // In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP. + // The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be + // used for the underlying multiaddress. + .or_transport(WsConfig::new(TcpConfig::new(core.handle()))) // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, // depending on which one the remote supports. diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index 906e1886..3821d6a3 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -199,6 +199,7 @@ where S: Stream, fn poll(&mut self) -> Poll, Self::Error> { match self.listener.poll() { Ok(Async::Ready(Some((upgrade, client_addr)))) => { + println!("ready stream"); self.current_upgrades.push((upgrade, client_addr)); } Ok(Async::NotReady) => (), diff --git a/libp2p-websocket/Cargo.toml b/libp2p-websocket/Cargo.toml new file mode 100644 index 00000000..52144403 --- /dev/null +++ b/libp2p-websocket/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "libp2p-websocket" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +libp2p-swarm = { path = "../libp2p-swarm" } +futures = "0.1" +multiaddr = "0.2.0" +rw-stream-sink = { path = "../rw-stream-sink" } +tokio-io = "0.1" + +[target.'cfg(not(target_os = "emscripten"))'.dependencies] +websocket = { version = "0.20.2", default-features = false, features = ["async", "async-ssl"] } + +[target.'cfg(target_os = "emscripten")'.dependencies] +stdweb = { version = "0.1.3", default-features = false } + +[target.'cfg(not(target_os = "emscripten"))'.dev-dependencies] +libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +tokio-core = "0.1" diff --git a/libp2p-websocket/README.md b/libp2p-websocket/README.md new file mode 100644 index 00000000..3f5f4e69 --- /dev/null +++ b/libp2p-websocket/README.md @@ -0,0 +1,46 @@ +Implementation of the libp2p `Transport` trait for Websockets. + +See the documentation of `swarm` and of libp2p in general to learn how to use the `Transport` +trait. + +This library is used in a different way depending on whether you are compiling for emscripten +or for a different operating system. + +# Emscripten + +On emscripten, you can create a `BrowserWsConfig` object with `BrowserWsConfig::new()`. It can +then be used as a transport. + +Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress +which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. + +```rust +use libp2p_websocket::BrowserWsConfig; + +let ws_config = BrowserWsConfig::new(); +// let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); +``` + +# Other operating systems + +On other operating systems, this library doesn't open any socket by itself. Instead it must be +plugged on top of another implementation of `Transport` such as TCP/IP. + +This underlying transport must be put inside a `WsConfig` object through the +`WsConfig::new()` function. + +```rust +extern crate libp2p_swarm; +extern crate libp2p_tcp_transport; +extern crate libp2p_websocket; +extern crate tokio_core; + +use libp2p_swarm::{Multiaddr, Transport}; +use libp2p_tcp_transport::TcpConfig; +use libp2p_websocket::WsConfig; +use tokio_core::reactor::Core; + +let core = Core::new().unwrap(); +let ws_config = WsConfig::new(TcpConfig::new(core.handle())); +let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); +``` diff --git a/libp2p-websocket/src/browser.rs b/libp2p-websocket/src/browser.rs new file mode 100644 index 00000000..fe911ec8 --- /dev/null +++ b/libp2p-websocket/src/browser.rs @@ -0,0 +1,306 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{Async, Future, Poll, Stream, Then as FutureThen}; +use futures::stream::Then as StreamThen; +use futures::sync::{mpsc, oneshot}; +use multiaddr::{AddrComponent, Multiaddr}; +use rw_stream_sink::RwStreamSink; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::io::{Read, Write}; +use std::sync::{Arc, Mutex}; +use stdweb::{self, Reference}; +use stdweb::web::TypedArray; +use swarm::Transport; +use tokio_io::{AsyncRead, AsyncWrite}; + +/// Represents the configuration for a websocket transport capability for libp2p. +/// +/// This implementation of `Transport` accepts any address that looks like +/// `/ip4/.../tcp/.../ws` or `/ip6/.../tcp/.../ws`, and connect to the corresponding IP and port. +#[derive(Debug, Clone)] +pub struct BrowserWsConfig; + +impl BrowserWsConfig { + /// Creates a new configuration object for websocket. + #[inline] + pub fn new() -> BrowserWsConfig { + BrowserWsConfig + } +} + +impl Transport for BrowserWsConfig { + type RawConn = BrowserWsConn; + type Listener = Box>; // TODO: use `!` + type ListenerUpgrade = Box>; // TODO: use `!` + type Dial = FutureThen< + oneshot::Receiver>, + Result, + fn(Result, oneshot::Canceled>) + -> Result, + >; + + #[inline] + fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + // Listening is never supported. + Err((self, a)) + } + + fn dial(self, original_addr: Multiaddr) -> Result { + // Making sure we are initialized before we dial. Initialization is protected by a simple + // boolean static variable, so it's not a problem to call it multiple times and the cost + // is negligible. + stdweb::initialize(); + + // Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as + // a string) on success. + let inner_addr = match multiaddr_to_target(&original_addr) { + Ok(a) => a, + Err(_) => return Err((self, original_addr)), + }; + + // Create the JS `WebSocket` object. + let websocket = { + let val = js! { + try { + return new WebSocket(@{inner_addr}); + } catch(e) { + return false; + } + }; + match val.into_reference() { + Some(ws) => ws, + None => return Err((self, original_addr)), // `false` was returned by `js!` + } + }; + + // Create a `message` channel that will be used for both bytes messages and errors, and a + // `message_cb` used for the `message` event on the WebSocket. + // `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed + // by `open_cb`. + let (message_tx, message_rx) = mpsc::unbounded::, IoError>>(); + let message_tx = Arc::new(message_tx); + let mut message_rx = Some(message_rx); + let message_cb = { + let message_tx = message_tx.clone(); + move |message_data: Reference| { + if let Some(buffer) = message_data.downcast::>() { + let _ = message_tx.unbounded_send(Ok(buffer.to_vec())); + } else { + let _ = message_tx.unbounded_send(Err(IoError::new( + IoErrorKind::InvalidData, + "received ws message of unknown type", + ))); + } + } + }; + + // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents + // the open dialing websocket. Also create a `open_cb` callback that will be used for the + // `open` message of the websocket. + let (open_tx, open_rx) = oneshot::channel::>(); + let open_tx = Arc::new(Mutex::new(Some(open_tx))); + let websocket_clone = websocket.clone(); + let open_cb = { + let open_tx = open_tx.clone(); + move || { + // Note that `open_tx` can be empty (and a panic happens) if the `open` event + // is triggered twice, or is triggered after the `close` event. We never reuse the + // same websocket twice, so this is not supposed to happen. + let tx = open_tx + .lock() + .unwrap() + .take() + .expect("the websocket can only open once"); + // `message_rx` can be empty if the `open` event is triggered twice, which again + // is not supposed to happen. + let message_rx = message_rx.take().expect("the websocket can only open once"); + + // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that + // would happen the future has been dropped by the user. + let _ = tx.send(Ok(BrowserWsConn { + websocket: websocket_clone.clone(), + incoming_data: RwStreamSink::new(message_rx.then(|result| { + // An `Err` happens here if `message_tx` has been dropped. However + // `message_tx` is grabbed by the websocket, which stays alive for as + // long as the `BrowserWsConn` is alive. + match result { + Ok(r) => r, + Err(_) => { + unreachable!("the message channel outlives the BrowserWsConn") + } + } + })), + })); + } + }; + + // Used for the `close` message of the websocket. + // The websocket can be closed either before or after being opened, so we send an error + // to both the `open` and `message` channels if that happens. + let close_cb = move || { + if let Some(tx) = open_tx.lock().unwrap().take() { + let _ = tx.send(Err(IoError::new( + IoErrorKind::ConnectionRefused, + "close event on the websocket", + ))); + } + + let _ = message_tx.unbounded_send(Err(IoError::new( + IoErrorKind::ConnectionRefused, + "close event on the websocket", + ))); + }; + + js! { + var socket = @{websocket}; + var open_cb = @{open_cb}; + var message_cb = @{message_cb}; + var close_cb = @{close_cb}; + socket.addEventListener("open", function(event) { + open_cb(); + }); + socket.addEventListener("message", function(event) { + var reader = new FileReader(); + reader.addEventListener("loadend", function() { + var typed = new Uint8Array(reader.result); + message_cb(typed); + }); + reader.readAsArrayBuffer(event.data); + }); + socket.addEventListener("close", function(event) { + close_cb(); + }); + }; + + Ok(open_rx.then(|result| { + match result { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) => Err(e), + // `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by + // the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself + // captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should + // never be destroyed. + // TODO: how do we break this cyclic dependency? difficult question + Err(_) => unreachable!("the sending side will only close when we drop the future"), + } + })) + } +} + +pub struct BrowserWsConn { + websocket: Reference, + // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. + incoming_data: RwStreamSink< + StreamThen< + mpsc::UnboundedReceiver, IoError>>, + fn(Result, IoError>, ()>) -> Result, IoError>, + Result, IoError>, + >, + >, +} + +impl Drop for BrowserWsConn { + #[inline] + fn drop(&mut self) { + // TODO: apparently there's a memory leak related to callbacks? + js! { @{&self.websocket}.close(); } + } +} + +impl AsyncRead for BrowserWsConn {} + +impl Read for BrowserWsConn { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.incoming_data.read(buf) + } +} + +impl AsyncWrite for BrowserWsConn { + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + Ok(Async::Ready(())) + } +} + +impl Write for BrowserWsConn { + fn write(&mut self, buf: &[u8]) -> Result { + let typed_array = TypedArray::from(buf); + + // `send` can throw if the websocket isn't open (which can happen if it was closed by the + // remote). + let returned = js! { + try { + @{&self.websocket}.send(@{typed_array.buffer()}); + return true; + } catch(e) { + return false; + } + }; + + match returned { + stdweb::Value::Bool(true) => Ok(buf.len()), + stdweb::Value::Bool(false) => Err(IoError::new( + IoErrorKind::BrokenPipe, + "websocket has been closed by the remote", + )), + _ => unreachable!(), + } + } + + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + // Everything is always considered flushed. + Ok(()) + } +} + +// Tries to interpret the `Multiaddr` as a `/ipN/.../tcp/.../ws` multiaddress, and if so returns +// the corresponding `ws://.../` URL. +fn multiaddr_to_target(addr: &Multiaddr) -> Result { + let protocols: Vec<_> = addr.iter().collect(); + + if protocols.len() != 3 { + return Err(()); + } + + match (&protocols[0], &protocols[1], &protocols[2]) { + (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { + Ok(format!("ws://{}:{}/", ip, port)) + } + (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { + Ok(format!("ws://[{}]:{}/", ip, port)) + } + (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WSS) => { + Ok(format!("wss://{}:{}/", ip, port)) + } + (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WSS) => { + Ok(format!("wss://[{}]:{}/", ip, port)) + } + _ => Err(()), + } +} + +// TODO: write tests (tests are very difficult to write with emscripten) +// - remote refuses connection +// - remote closes connection before we receive +// - remote closes connection before we send +// - remote sends text data instead of binary diff --git a/libp2p-websocket/src/desktop.rs b/libp2p-websocket/src/desktop.rs new file mode 100644 index 00000000..38b9f65d --- /dev/null +++ b/libp2p-websocket/src/desktop.rs @@ -0,0 +1,252 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{stream, Future, IntoFuture, Sink, Stream}; +use multiaddr::{AddrComponent, Multiaddr}; +use rw_stream_sink::RwStreamSink; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use swarm::Transport; +use websocket::client::builder::ClientBuilder; +use websocket::message::OwnedMessage; +use websocket::server::upgrade::async::IntoWs; +use websocket::stream::async::Stream as AsyncStream; + +/// Represents the configuration for a websocket transport capability for libp2p. Must be put on +/// top of another `Transport`. +/// +/// This implementation of `Transport` accepts any address that ends with `/ws` or `/wss`, and will +/// try to pass the underlying multiaddress to the underlying `Transport`. +/// +/// > **Note**: `/wss` is only supported for dialing and not listening. +#[derive(Debug, Clone)] +pub struct WsConfig { + transport: T, +} + +impl WsConfig { + /// Creates a new configuration object for websocket. + /// + /// The websockets will run on top of the `Transport` you pass as parameter. + #[inline] + pub fn new(inner: T) -> WsConfig { + WsConfig { transport: inner } + } +} + +impl Transport for WsConfig +where + T: Transport + 'static, // TODO: this 'static is pretty arbitrary and is necessary because of the websocket library + T::RawConn: Send, // TODO: this Send is pretty arbitrary and is necessary because of the websocket library +{ + type RawConn = Box; + type Listener = stream::Map< + T::Listener, + fn((::ListenerUpgrade, Multiaddr)) -> (Self::ListenerUpgrade, Multiaddr), + >; + type ListenerUpgrade = Box>; + type Dial = Box>; + + fn listen_on( + self, + original_addr: Multiaddr, + ) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + let mut inner_addr = original_addr.clone(); + match inner_addr.pop() { + Some(AddrComponent::WS) => {} + _ => return Err((self, original_addr)), + }; + + let (inner_listen, new_addr) = match self.transport.listen_on(inner_addr) { + Ok((listen, mut new_addr)) => { + // Need to suffix `/ws` to the listening address. + new_addr.append(AddrComponent::WS); + (listen, new_addr) + } + Err((transport, _)) => { + return Err(( + WsConfig { + transport: transport, + }, + original_addr, + )); + } + }; + + let listen = inner_listen.map::<_, fn(_) -> _>(|(stream, mut client_addr)| { + // Need to suffix `/ws` to each client address. + client_addr.append(AddrComponent::WS); + + // Upgrade the listener to websockets like the websockets library requires us to do. + let upgraded = stream.and_then(|stream| { + stream + .into_ws() + .map_err(|e| IoError::new(IoErrorKind::Other, e.3)) + .and_then(|stream| { + // Accept the next incoming connection. + stream + .accept() + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .map(|(client, _http_headers)| { + // Plug our own API on top of the `websockets` API. + let framed_data = client + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) + .with(|data| Ok(OwnedMessage::Binary(data))) + .and_then(|recv| { + match recv { + OwnedMessage::Binary(data) => Ok(Some(data)), + OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), + OwnedMessage::Close(_) => Ok(None), + // TODO: handle pings and pongs, which is freaking hard + // for now we close the socket when that happens + _ => Ok(None) + } + }) + // TODO: is there a way to merge both lines into one? + .take_while(|v| Ok(v.is_some())) + .map(|v| v.expect("we only take while this is Some")); + + let read_write = RwStreamSink::new(framed_data); + Box::new(read_write) as Box + }) + }) + .map(|s| Box::new(Ok(s).into_future()) as Box>) + .into_future() + .flatten() + }); + + ( + Box::new(upgraded) as Box>, + client_addr, + ) + }); + + Ok((listen, new_addr)) + } + + fn dial(self, original_addr: Multiaddr) -> Result { + let mut inner_addr = original_addr.clone(); + let is_wss = match inner_addr.pop() { + Some(AddrComponent::WS) => false, + Some(AddrComponent::WSS) => true, + _ => return Err((self, original_addr)), + }; + + let inner_dial = match self.transport.dial(inner_addr) { + Ok(d) => d, + Err((transport, _)) => { + return Err(( + WsConfig { + transport: transport, + }, + original_addr, + )); + } + }; + + let dial = inner_dial.into_future().and_then(move |connec| { + // We pass a dummy address to `ClientBuilder` because it is never used anywhere + // in the negotiation anyway, and we use `async_connect_on` to pass a stream. + ClientBuilder::new(if is_wss { "wss://127.0.0.1" } else { "ws://127.0.0.1" }) + .expect("hard-coded ws address is always valid") + .async_connect_on(connec) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .map(|(client, _)| { + // Plug our own API on top of the API of the websockets library. + let framed_data = client + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) + .with(|data| Ok(OwnedMessage::Binary(data))) + .and_then(|recv| { + match recv { + OwnedMessage::Binary(data) => Ok(data), + OwnedMessage::Text(data) => Ok(data.into_bytes()), + // TODO: pings and pongs and close messages need to be + // answered ; and this is really hard ; for now we produce + // an error when that happens + _ => Err(IoError::new(IoErrorKind::Other, "unimplemented")), + } + }); + let read_write = RwStreamSink::new(framed_data); + Box::new(read_write) as Box + }) + }); + + Ok(Box::new(dial) as Box<_>) + } +} + +#[cfg(test)] +mod tests { + extern crate libp2p_tcp_transport as tcp; + extern crate tokio_core; + use self::tokio_core::reactor::Core; + use WsConfig; + use futures::{Future, Stream}; + use swarm::Transport; + + #[test] + fn dialer_connects_to_listener_ipv4() { + let mut core = Core::new().unwrap(); + let ws_config = WsConfig::new(tcp::TcpConfig::new(core.handle())); + + let (listener, addr) = ws_config + .clone() + .listen_on("/ip4/0.0.0.0/tcp/0/ws".parse().unwrap()) + .unwrap(); + assert!(addr.to_string().ends_with("/ws")); + assert!(!addr.to_string().ends_with("/0/ws")); + let listener = listener + .into_future() + .map_err(|(e, _)| e) + .and_then(|(c, _)| c.unwrap().0); + let dialer = ws_config.clone().dial(addr).unwrap(); + + let future = listener + .select(dialer) + .map_err(|(e, _)| e) + .and_then(|(_, n)| n); + core.run(future).unwrap(); + } + + #[test] + fn dialer_connects_to_listener_ipv6() { + let mut core = Core::new().unwrap(); + let ws_config = WsConfig::new(tcp::TcpConfig::new(core.handle())); + + let (listener, addr) = ws_config + .clone() + .listen_on("/ip6/::1/tcp/0/ws".parse().unwrap()) + .unwrap(); + assert!(addr.to_string().ends_with("/ws")); + assert!(!addr.to_string().ends_with("/0/ws")); + let listener = listener + .into_future() + .map_err(|(e, _)| e) + .and_then(|(c, _)| c.unwrap().0); + let dialer = ws_config.clone().dial(addr).unwrap(); + + let future = listener + .select(dialer) + .map_err(|(e, _)| e) + .and_then(|(_, n)| n); + core.run(future).unwrap(); + } +} diff --git a/libp2p-websocket/src/lib.rs b/libp2p-websocket/src/lib.rs new file mode 100644 index 00000000..148f9b75 --- /dev/null +++ b/libp2p-websocket/src/lib.rs @@ -0,0 +1,97 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![recursion_limit = "512"] + +// TODO: use this once stable ; for now we just copy-paste the content of the README.md +//#![doc(include = "../README.md")] + +//! Implementation of the libp2p `Transport` trait for Websockets. +//! +//! See the documentation of `swarm` and of libp2p in general to learn how to use the `Transport` +//! trait. +//! +//! This library is used in a different way depending on whether you are compiling for emscripten +//! or for a different operating system. +//! +//! # Emscripten +//! +//! On emscripten, you can create a `BrowserWsConfig` object with `BrowserWsConfig::new()`. It can +//! then be used as a transport. +//! +//! Listening on a websockets multiaddress isn't supported on emscripten. Dialing a multiaddress +//! which uses `ws` on top of TCP/IP will automatically use the `XMLHttpRequest` Javascript object. +//! +//! ```ignore +//! use libp2p_websocket::BrowserWsConfig; +//! +//! let ws_config = BrowserWsConfig::new(); +//! // let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); +//! ``` +//! +//! # Other operating systems +//! +//! On other operating systems, this library doesn't open any socket by itself. Instead it must be +//! plugged on top of another implementation of `Transport` such as TCP/IP. +//! +//! This underlying transport must be put inside a `WsConfig` object through the +//! `WsConfig::new()` function. +//! +//! ``` +//! extern crate libp2p_swarm; +//! extern crate libp2p_tcp_transport; +//! extern crate libp2p_websocket; +//! extern crate tokio_core; +//! +//! use libp2p_swarm::{Multiaddr, Transport}; +//! use libp2p_tcp_transport::TcpConfig; +//! use libp2p_websocket::WsConfig; +//! use tokio_core::reactor::Core; +//! +//! # fn main() { +//! let core = Core::new().unwrap(); +//! let ws_config = WsConfig::new(TcpConfig::new(core.handle())); +//! # return; +//! let _ = ws_config.dial("/ip4/40.41.42.43/tcp/12345/ws".parse().unwrap()); +//! # } +//! ``` +//! + +extern crate futures; +extern crate libp2p_swarm as swarm; +extern crate multiaddr; +extern crate rw_stream_sink; +extern crate tokio_io; + +#[cfg(target_os = "emscripten")] +#[macro_use] +extern crate stdweb; +#[cfg(not(target_os = "emscripten"))] +extern crate websocket; + +#[cfg(not(target_os = "emscripten"))] +mod desktop; +#[cfg(target_os = "emscripten")] +mod browser; + +#[cfg(target_os = "emscripten")] +pub use self::browser::{BrowserWsConfig, BrowserWsConn}; +#[cfg(not(target_os = "emscripten"))] +pub use self::desktop::WsConfig; diff --git a/rust-multiaddr/src/lib.rs b/rust-multiaddr/src/lib.rs index f80adb2d..25bb6197 100644 --- a/rust-multiaddr/src/lib.rs +++ b/rust-multiaddr/src/lib.rs @@ -102,6 +102,23 @@ impl Multiaddr { Ok(Multiaddr { bytes: bytes }) } + /// Adds an already-parsed address component to the end of this multiaddr. + /// + /// # Examples + /// + /// ``` + /// use multiaddr::{Multiaddr, AddrComponent}; + /// + /// let mut address: Multiaddr = "/ip4/127.0.0.1".parse().unwrap(); + /// address.append(AddrComponent::TCP(10000)); + /// assert_eq!(address, "/ip4/127.0.0.1/tcp/10000".parse().unwrap()); + /// ``` + /// + #[inline] + pub fn append(&mut self, component: AddrComponent) { + component.write_bytes(&mut self.bytes).expect("writing to a Vec never fails") + } + /// Remove the outermost address. /// /// # Examples