From 6837a3928d6fc3cd43970dd114ee4be72c14f268 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 11 Jan 2018 11:11:49 +0100 Subject: [PATCH] Rustfmt and use tabs --- libp2p-websocket/src/browser.rs | 441 +++++++++++++++++--------------- libp2p-websocket/src/desktop.rs | 30 +-- 2 files changed, 246 insertions(+), 225 deletions(-) diff --git a/libp2p-websocket/src/browser.rs b/libp2p-websocket/src/browser.rs index 71132ebb..0440207b 100644 --- a/libp2p-websocket/src/browser.rs +++ b/libp2p-websocket/src/browser.rs @@ -1,31 +1,31 @@ // Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in +// The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::io::{Read, Write}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::sync::{Arc, Mutex}; -use futures::{Future, Stream, Poll, Async, Then as FutureThen}; -use futures::sync::{oneshot, mpsc}; +use futures::{Async, Future, Poll, Stream, Then as FutureThen}; use futures::stream::Then as StreamThen; -use multiaddr::{Multiaddr, AddrComponent}; +use futures::sync::{mpsc, oneshot}; +use multiaddr::{AddrComponent, Multiaddr}; use rw_stream_sink::RwStreamSink; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::io::{Read, Write}; +use std::sync::{Arc, Mutex}; use stdweb::{self, Reference}; use stdweb::web::TypedArray; use swarm::Transport; @@ -41,237 +41,258 @@ use tokio_io::{AsyncRead, AsyncWrite}; pub struct BrowserWsConfig; impl BrowserWsConfig { - /// Creates a new configuration object for websocket. - #[inline] - pub fn new() -> BrowserWsConfig { - BrowserWsConfig - } + /// Creates a new configuration object for websocket. + #[inline] + pub fn new() -> BrowserWsConfig { + BrowserWsConfig + } } impl Transport for BrowserWsConfig { - type RawConn = BrowserWsConn; - type Listener = Box>; // TODO: use `!` - type ListenerUpgrade = Box>; // TODO: use `!` - type Dial = FutureThen>, Result, fn(Result, oneshot::Canceled>) -> Result>; + type RawConn = BrowserWsConn; + type Listener = Box>; // TODO: use `!` + type ListenerUpgrade = Box>; // TODO: use `!` + type Dial = FutureThen< + oneshot::Receiver>, + Result, + fn(Result, oneshot::Canceled>) + -> Result, + >; - #[inline] - fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - // Listening is never supported. - Err((self, a)) - } + #[inline] + fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + // Listening is never supported. + Err((self, a)) + } - fn dial(self, original_addr: Multiaddr) -> Result { - // Making sure we are initialized before we dial. Initialization is protected by a simple - // boolean static variable, so it's not a problem to call it multiple times and the cost - // is negligible. - stdweb::initialize(); + fn dial(self, original_addr: Multiaddr) -> Result { + // Making sure we are initialized before we dial. Initialization is protected by a simple + // boolean static variable, so it's not a problem to call it multiple times and the cost + // is negligible. + stdweb::initialize(); - // Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as - // a string) on success. - let inner_addr = match multiaddr_to_target(&original_addr) { - Ok(a) => a, - Err(_) => return Err((self, original_addr)), - }; + // Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as + // a string) on success. + let inner_addr = match multiaddr_to_target(&original_addr) { + Ok(a) => a, + Err(_) => return Err((self, original_addr)), + }; - // Create the JS `WebSocket` object. - let websocket = { - let val = js! { - try { - return new WebSocket(@{inner_addr}); - } catch(e) { - return false; - } - }; - match val.into_reference() { - Some(ws) => ws, - None => return Err((self, original_addr)), // `false` was returned by `js!` - } - }; + // Create the JS `WebSocket` object. + let websocket = { + let val = js! { + try { + return new WebSocket(@{inner_addr}); + } catch(e) { + return false; + } + }; + match val.into_reference() { + Some(ws) => ws, + None => return Err((self, original_addr)), // `false` was returned by `js!` + } + }; - // Create a `message` channel that will be used for both bytes messages and errors, and a - // `message_cb` used for the `message` event on the WebSocket. - // `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed - // by `open_cb`. - let (message_tx, message_rx) = mpsc::unbounded::, IoError>>(); - let message_tx = Arc::new(message_tx); - let mut message_rx = Some(message_rx); - let message_cb = { - let message_tx = message_tx.clone(); - move |message_data: Reference| { - if let Some(buffer) = message_data.downcast::>() { - let _ = message_tx.unbounded_send(Ok(buffer.to_vec())); - } else { - let _ = message_tx.unbounded_send(Err(IoError::new(IoErrorKind::InvalidData, - "received ws message of unknown type"))); - } - } - }; + // Create a `message` channel that will be used for both bytes messages and errors, and a + // `message_cb` used for the `message` event on the WebSocket. + // `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed + // by `open_cb`. + let (message_tx, message_rx) = mpsc::unbounded::, IoError>>(); + let message_tx = Arc::new(message_tx); + let mut message_rx = Some(message_rx); + let message_cb = { + let message_tx = message_tx.clone(); + move |message_data: Reference| { + if let Some(buffer) = message_data.downcast::>() { + let _ = message_tx.unbounded_send(Ok(buffer.to_vec())); + } else { + let _ = message_tx.unbounded_send(Err(IoError::new( + IoErrorKind::InvalidData, + "received ws message of unknown type", + ))); + } + } + }; - // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents - // the open dialing websocket. Also create a `open_cb` callback that will be used for the - // `open` message of the websocket. - let (open_tx, open_rx) = oneshot::channel::>(); - let open_tx = Arc::new(Mutex::new(Some(open_tx))); - let websocket_clone = websocket.clone(); - let open_cb = { - let open_tx = open_tx.clone(); - move || { - // Note that `open_tx` can be empty (and a panic happens) if the `open` event - // is triggered twice, or is triggered after the `close` event. We never reuse the - // same websocket twice, so this is not supposed to happen. - let tx = open_tx.lock().unwrap().take().expect("the websocket can only open once"); - // `message_rx` can be empty if the `open` event is triggered twice, which again - // is not supposed to happen. - let message_rx = message_rx.take().expect("the websocket can only open once"); + // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents + // the open dialing websocket. Also create a `open_cb` callback that will be used for the + // `open` message of the websocket. + let (open_tx, open_rx) = oneshot::channel::>(); + let open_tx = Arc::new(Mutex::new(Some(open_tx))); + let websocket_clone = websocket.clone(); + let open_cb = { + let open_tx = open_tx.clone(); + move || { + // Note that `open_tx` can be empty (and a panic happens) if the `open` event + // is triggered twice, or is triggered after the `close` event. We never reuse the + // same websocket twice, so this is not supposed to happen. + let tx = open_tx + .lock() + .unwrap() + .take() + .expect("the websocket can only open once"); + // `message_rx` can be empty if the `open` event is triggered twice, which again + // is not supposed to happen. + let message_rx = message_rx.take().expect("the websocket can only open once"); - // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that - // would happen the future has been dropped by the user. - let _ = tx - .send(Ok(BrowserWsConn { - websocket: websocket_clone.clone(), - incoming_data: RwStreamSink::new(message_rx.then(|result| { - // An `Err` happens here if `message_tx` has been dropped. However - // `message_tx` is grabbed by the websocket, which stays alive for as - // long as the `BrowserWsConn` is alive. - match result { - Ok(r) => r, - Err(_) => unreachable!("the message channel outlives the BrowserWsConn") - } - })), - })); - } - }; + // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that + // would happen the future has been dropped by the user. + let _ = tx.send(Ok(BrowserWsConn { + websocket: websocket_clone.clone(), + incoming_data: RwStreamSink::new(message_rx.then(|result| { + // An `Err` happens here if `message_tx` has been dropped. However + // `message_tx` is grabbed by the websocket, which stays alive for as + // long as the `BrowserWsConn` is alive. + match result { + Ok(r) => r, + Err(_) => { + unreachable!("the message channel outlives the BrowserWsConn") + } + } + })), + })); + } + }; - // Used for the `close` message of the websocket. - // The websocket can be closed either before or after being opened, so we send an error - // to both the `open` and `message` channels if that happens. - let close_cb = move || { - if let Some(tx) = open_tx.lock().unwrap().take() { - let _ = tx.send(Err(IoError::new(IoErrorKind::ConnectionRefused, - "close event on the websocket"))); - } + // Used for the `close` message of the websocket. + // The websocket can be closed either before or after being opened, so we send an error + // to both the `open` and `message` channels if that happens. + let close_cb = move || { + if let Some(tx) = open_tx.lock().unwrap().take() { + let _ = tx.send(Err(IoError::new( + IoErrorKind::ConnectionRefused, + "close event on the websocket", + ))); + } - let _ = message_tx.unbounded_send(Err(IoError::new(IoErrorKind::ConnectionRefused, - "close event on the websocket"))); - }; + let _ = message_tx.unbounded_send(Err(IoError::new( + IoErrorKind::ConnectionRefused, + "close event on the websocket", + ))); + }; - js! { - var socket = @{websocket}; - var open_cb = @{open_cb}; - var message_cb = @{message_cb}; - var close_cb = @{close_cb}; - socket.addEventListener("open", function(event) { - open_cb(); - }); - socket.addEventListener("message", function(event) { - var reader = new FileReader(); - reader.addEventListener("loadend", function() { - var typed = new Uint8Array(reader.result); - message_cb(typed); - }); - reader.readAsArrayBuffer(event.data); - }); - socket.addEventListener("close", function(event) { - close_cb(); - }); - }; + js! { + var socket = @{websocket}; + var open_cb = @{open_cb}; + var message_cb = @{message_cb}; + var close_cb = @{close_cb}; + socket.addEventListener("open", function(event) { + open_cb(); + }); + socket.addEventListener("message", function(event) { + var reader = new FileReader(); + reader.addEventListener("loadend", function() { + var typed = new Uint8Array(reader.result); + message_cb(typed); + }); + reader.readAsArrayBuffer(event.data); + }); + socket.addEventListener("close", function(event) { + close_cb(); + }); + }; - Ok(open_rx.then(|result| { - match result { - Ok(Ok(r)) => Ok(r), - Ok(Err(e)) => Err(e), - // `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by - // the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself - // captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should - // never be destroyed. - // TODO: how do we break this cyclic dependency? difficult question - Err(_) => unreachable!("the sending side will only close when we drop the future") - } - })) - } + Ok(open_rx.then(|result| { + match result { + Ok(Ok(r)) => Ok(r), + Ok(Err(e)) => Err(e), + // `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by + // the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself + // captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should + // never be destroyed. + // TODO: how do we break this cyclic dependency? difficult question + Err(_) => unreachable!("the sending side will only close when we drop the future"), + } + })) + } } pub struct BrowserWsConn { - websocket: Reference, - // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. - incoming_data: RwStreamSink, IoError>>, - fn(Result, IoError>, ()>) -> Result, IoError>, - Result, IoError> - >>, + websocket: Reference, + // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. + incoming_data: RwStreamSink< + StreamThen< + mpsc::UnboundedReceiver, IoError>>, + fn(Result, IoError>, ()>) -> Result, IoError>, + Result, IoError>, + >, + >, } impl Drop for BrowserWsConn { - #[inline] - fn drop(&mut self) { - // TODO: apparently there's a memory leak related to callbacks? - js! { @{&self.websocket}.close(); } - } + #[inline] + fn drop(&mut self) { + // TODO: apparently there's a memory leak related to callbacks? + js! { @{&self.websocket}.close(); } + } } -impl AsyncRead for BrowserWsConn { -} +impl AsyncRead for BrowserWsConn {} impl Read for BrowserWsConn { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { - self.incoming_data.read(buf) - } + #[inline] + fn read(&mut self, buf: &mut [u8]) -> Result { + self.incoming_data.read(buf) + } } impl AsyncWrite for BrowserWsConn { - #[inline] - fn shutdown(&mut self) -> Poll<(), IoError> { - Ok(Async::Ready(())) - } + #[inline] + fn shutdown(&mut self) -> Poll<(), IoError> { + Ok(Async::Ready(())) + } } impl Write for BrowserWsConn { - fn write(&mut self, buf: &[u8]) -> Result { - let typed_array = TypedArray::from(buf); + fn write(&mut self, buf: &[u8]) -> Result { + let typed_array = TypedArray::from(buf); - // `send` can throw if the websocket isn't open (which can happen if it was closed by the - // remote). - let returned = js! { - try { - @{&self.websocket}.send(@{typed_array.buffer()}); - return true; - } catch(e) { - return false; - } - }; + // `send` can throw if the websocket isn't open (which can happen if it was closed by the + // remote). + let returned = js! { + try { + @{&self.websocket}.send(@{typed_array.buffer()}); + return true; + } catch(e) { + return false; + } + }; - match returned { - stdweb::Value::Bool(true) => Ok(buf.len()), - stdweb::Value::Bool(false) => Err(IoError::new(IoErrorKind::BrokenPipe, "websocket has been closed by the remote")), - _ => unreachable!() - } - } + match returned { + stdweb::Value::Bool(true) => Ok(buf.len()), + stdweb::Value::Bool(false) => Err(IoError::new( + IoErrorKind::BrokenPipe, + "websocket has been closed by the remote", + )), + _ => unreachable!(), + } + } - #[inline] - fn flush(&mut self) -> Result<(), IoError> { - // Everything is always considered flushed. - Ok(()) - } + #[inline] + fn flush(&mut self) -> Result<(), IoError> { + // Everything is always considered flushed. + Ok(()) + } } // Tries to interpret the `Multiaddr` as a `/ipN/.../tcp/.../ws` multiaddress, and if so returns // the corresponding `ws://.../` URL. fn multiaddr_to_target(addr: &Multiaddr) -> Result { - let protocols: Vec<_> = addr.iter().collect(); + let protocols: Vec<_> = addr.iter().collect(); - if protocols.len() != 3 { - return Err(()); - } + if protocols.len() != 3 { + return Err(()); + } - match (&protocols[0], &protocols[1], &protocols[2]) { - (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { - Ok(format!("ws://{}:{}/", ip, port)) - } - (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { - Ok(format!("ws://[{}]:{}/", ip, port)) - } - _ => Err(()), - } + match (&protocols[0], &protocols[1], &protocols[2]) { + (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { + Ok(format!("ws://{}:{}/", ip, port)) + } + (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { + Ok(format!("ws://[{}]:{}/", ip, port)) + } + _ => Err(()), + } } // TODO: write tests (tests are very difficult to write with emscripten) diff --git a/libp2p-websocket/src/desktop.rs b/libp2p-websocket/src/desktop.rs index 8299065b..cdb082f0 100644 --- a/libp2p-websocket/src/desktop.rs +++ b/libp2p-websocket/src/desktop.rs @@ -106,22 +106,22 @@ where .map(|(client, _http_headers)| { // Plug our own API on top of the `websockets` API. let framed_data = client - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) - .with(|data| Ok(OwnedMessage::Binary(data))) - .and_then(|recv| { - match recv { - OwnedMessage::Binary(data) => Ok(Some(data)), - OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), - OwnedMessage::Close(_) => Ok(None), - // TODO: handle pings and pongs, which is freaking hard + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) + .with(|data| Ok(OwnedMessage::Binary(data))) + .and_then(|recv| { + match recv { + OwnedMessage::Binary(data) => Ok(Some(data)), + OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), + OwnedMessage::Close(_) => Ok(None), + // TODO: handle pings and pongs, which is freaking hard // for now we close the socket when that happens - _ => Ok(None) - } - }) - // TODO: is there a way to merge both lines into one? - .take_while(|v| Ok(v.is_some())) - .map(|v| v.expect("we only take while this is Some")); + _ => Ok(None) + } + }) + // TODO: is there a way to merge both lines into one? + .take_while(|v| Ok(v.is_some())) + .map(|v| v.expect("we only take while this is Some")); let read_write = RwStreamSink::new(framed_data); Box::new(read_write) as Box