Rustfmt and use tabs

This commit is contained in:
Pierre Krieger 2018-01-11 11:11:49 +01:00
parent b8829d7cb1
commit 6837a3928d
No known key found for this signature in database
GPG Key ID: A03CE3AD81F08F7C
2 changed files with 246 additions and 225 deletions

View File

@ -1,31 +1,31 @@
// Copyright 2017 Parity Technologies (UK) Ltd. // Copyright 2017 Parity Technologies (UK) Ltd.
// //
// Permission is hereby granted, free of charge, to any person obtaining a // Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"), // copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation // to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense, // the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the // and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions: // Software is furnished to do so, subject to the following conditions:
// //
// The above copyright notice and this permission notice shall be included in // The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software. // all copies or substantial portions of the Software.
// //
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use std::io::{Read, Write}; use futures::{Async, Future, Poll, Stream, Then as FutureThen};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::{Arc, Mutex};
use futures::{Future, Stream, Poll, Async, Then as FutureThen};
use futures::sync::{oneshot, mpsc};
use futures::stream::Then as StreamThen; use futures::stream::Then as StreamThen;
use multiaddr::{Multiaddr, AddrComponent}; use futures::sync::{mpsc, oneshot};
use multiaddr::{AddrComponent, Multiaddr};
use rw_stream_sink::RwStreamSink; use rw_stream_sink::RwStreamSink;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::io::{Read, Write};
use std::sync::{Arc, Mutex};
use stdweb::{self, Reference}; use stdweb::{self, Reference};
use stdweb::web::TypedArray; use stdweb::web::TypedArray;
use swarm::Transport; use swarm::Transport;
@ -41,237 +41,258 @@ use tokio_io::{AsyncRead, AsyncWrite};
pub struct BrowserWsConfig; pub struct BrowserWsConfig;
impl BrowserWsConfig { impl BrowserWsConfig {
/// Creates a new configuration object for websocket. /// Creates a new configuration object for websocket.
#[inline] #[inline]
pub fn new() -> BrowserWsConfig { pub fn new() -> BrowserWsConfig {
BrowserWsConfig BrowserWsConfig
} }
} }
impl Transport for BrowserWsConfig { impl Transport for BrowserWsConfig {
type RawConn = BrowserWsConn; type RawConn = BrowserWsConn;
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>; // TODO: use `!` type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>; // TODO: use `!`
type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>; // TODO: use `!` type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>; // TODO: use `!`
type Dial = FutureThen<oneshot::Receiver<Result<BrowserWsConn, IoError>>, Result<BrowserWsConn, IoError>, fn(Result<Result<BrowserWsConn, IoError>, oneshot::Canceled>) -> Result<BrowserWsConn, IoError>>; type Dial = FutureThen<
oneshot::Receiver<Result<BrowserWsConn, IoError>>,
Result<BrowserWsConn, IoError>,
fn(Result<Result<BrowserWsConn, IoError>, oneshot::Canceled>)
-> Result<BrowserWsConn, IoError>,
>;
#[inline] #[inline]
fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
// Listening is never supported. // Listening is never supported.
Err((self, a)) Err((self, a))
} }
fn dial(self, original_addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> { fn dial(self, original_addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
// Making sure we are initialized before we dial. Initialization is protected by a simple // Making sure we are initialized before we dial. Initialization is protected by a simple
// boolean static variable, so it's not a problem to call it multiple times and the cost // boolean static variable, so it's not a problem to call it multiple times and the cost
// is negligible. // is negligible.
stdweb::initialize(); stdweb::initialize();
// Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as // Tries to interpret the multiaddr, and returns a corresponding `ws://x.x.x.x/` URL (as
// a string) on success. // a string) on success.
let inner_addr = match multiaddr_to_target(&original_addr) { let inner_addr = match multiaddr_to_target(&original_addr) {
Ok(a) => a, Ok(a) => a,
Err(_) => return Err((self, original_addr)), Err(_) => return Err((self, original_addr)),
}; };
// Create the JS `WebSocket` object. // Create the JS `WebSocket` object.
let websocket = { let websocket = {
let val = js! { let val = js! {
try { try {
return new WebSocket(@{inner_addr}); return new WebSocket(@{inner_addr});
} catch(e) { } catch(e) {
return false; return false;
} }
}; };
match val.into_reference() { match val.into_reference() {
Some(ws) => ws, Some(ws) => ws,
None => return Err((self, original_addr)), // `false` was returned by `js!` None => return Err((self, original_addr)), // `false` was returned by `js!`
} }
}; };
// Create a `message` channel that will be used for both bytes messages and errors, and a // Create a `message` channel that will be used for both bytes messages and errors, and a
// `message_cb` used for the `message` event on the WebSocket. // `message_cb` used for the `message` event on the WebSocket.
// `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed // `message_tx` is grabbed by `message_cb` and `close_cb`, and `message_rx` is grabbed
// by `open_cb`. // by `open_cb`.
let (message_tx, message_rx) = mpsc::unbounded::<Result<Vec<u8>, IoError>>(); let (message_tx, message_rx) = mpsc::unbounded::<Result<Vec<u8>, IoError>>();
let message_tx = Arc::new(message_tx); let message_tx = Arc::new(message_tx);
let mut message_rx = Some(message_rx); let mut message_rx = Some(message_rx);
let message_cb = { let message_cb = {
let message_tx = message_tx.clone(); let message_tx = message_tx.clone();
move |message_data: Reference| { move |message_data: Reference| {
if let Some(buffer) = message_data.downcast::<TypedArray<u8>>() { if let Some(buffer) = message_data.downcast::<TypedArray<u8>>() {
let _ = message_tx.unbounded_send(Ok(buffer.to_vec())); let _ = message_tx.unbounded_send(Ok(buffer.to_vec()));
} else { } else {
let _ = message_tx.unbounded_send(Err(IoError::new(IoErrorKind::InvalidData, let _ = message_tx.unbounded_send(Err(IoError::new(
"received ws message of unknown type"))); IoErrorKind::InvalidData,
} "received ws message of unknown type",
} )));
}; }
}
};
// Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents // Create a `open` channel that will be used to communicate the `BrowserWsConn` that represents
// the open dialing websocket. Also create a `open_cb` callback that will be used for the // the open dialing websocket. Also create a `open_cb` callback that will be used for the
// `open` message of the websocket. // `open` message of the websocket.
let (open_tx, open_rx) = oneshot::channel::<Result<BrowserWsConn, IoError>>(); let (open_tx, open_rx) = oneshot::channel::<Result<BrowserWsConn, IoError>>();
let open_tx = Arc::new(Mutex::new(Some(open_tx))); let open_tx = Arc::new(Mutex::new(Some(open_tx)));
let websocket_clone = websocket.clone(); let websocket_clone = websocket.clone();
let open_cb = { let open_cb = {
let open_tx = open_tx.clone(); let open_tx = open_tx.clone();
move || { move || {
// Note that `open_tx` can be empty (and a panic happens) if the `open` event // Note that `open_tx` can be empty (and a panic happens) if the `open` event
// is triggered twice, or is triggered after the `close` event. We never reuse the // is triggered twice, or is triggered after the `close` event. We never reuse the
// same websocket twice, so this is not supposed to happen. // same websocket twice, so this is not supposed to happen.
let tx = open_tx.lock().unwrap().take().expect("the websocket can only open once"); let tx = open_tx
// `message_rx` can be empty if the `open` event is triggered twice, which again .lock()
// is not supposed to happen. .unwrap()
let message_rx = message_rx.take().expect("the websocket can only open once"); .take()
.expect("the websocket can only open once");
// `message_rx` can be empty if the `open` event is triggered twice, which again
// is not supposed to happen.
let message_rx = message_rx.take().expect("the websocket can only open once");
// Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that // Send a `BrowserWsConn` to the future that was returned by `dial`. Ignoring errors that
// would happen the future has been dropped by the user. // would happen the future has been dropped by the user.
let _ = tx let _ = tx.send(Ok(BrowserWsConn {
.send(Ok(BrowserWsConn { websocket: websocket_clone.clone(),
websocket: websocket_clone.clone(), incoming_data: RwStreamSink::new(message_rx.then(|result| {
incoming_data: RwStreamSink::new(message_rx.then(|result| { // An `Err` happens here if `message_tx` has been dropped. However
// An `Err` happens here if `message_tx` has been dropped. However // `message_tx` is grabbed by the websocket, which stays alive for as
// `message_tx` is grabbed by the websocket, which stays alive for as // long as the `BrowserWsConn` is alive.
// long as the `BrowserWsConn` is alive. match result {
match result { Ok(r) => r,
Ok(r) => r, Err(_) => {
Err(_) => unreachable!("the message channel outlives the BrowserWsConn") unreachable!("the message channel outlives the BrowserWsConn")
} }
})), }
})); })),
} }));
}; }
};
// Used for the `close` message of the websocket. // Used for the `close` message of the websocket.
// The websocket can be closed either before or after being opened, so we send an error // The websocket can be closed either before or after being opened, so we send an error
// to both the `open` and `message` channels if that happens. // to both the `open` and `message` channels if that happens.
let close_cb = move || { let close_cb = move || {
if let Some(tx) = open_tx.lock().unwrap().take() { if let Some(tx) = open_tx.lock().unwrap().take() {
let _ = tx.send(Err(IoError::new(IoErrorKind::ConnectionRefused, let _ = tx.send(Err(IoError::new(
"close event on the websocket"))); IoErrorKind::ConnectionRefused,
} "close event on the websocket",
)));
}
let _ = message_tx.unbounded_send(Err(IoError::new(IoErrorKind::ConnectionRefused, let _ = message_tx.unbounded_send(Err(IoError::new(
"close event on the websocket"))); IoErrorKind::ConnectionRefused,
}; "close event on the websocket",
)));
};
js! { js! {
var socket = @{websocket}; var socket = @{websocket};
var open_cb = @{open_cb}; var open_cb = @{open_cb};
var message_cb = @{message_cb}; var message_cb = @{message_cb};
var close_cb = @{close_cb}; var close_cb = @{close_cb};
socket.addEventListener("open", function(event) { socket.addEventListener("open", function(event) {
open_cb(); open_cb();
}); });
socket.addEventListener("message", function(event) { socket.addEventListener("message", function(event) {
var reader = new FileReader(); var reader = new FileReader();
reader.addEventListener("loadend", function() { reader.addEventListener("loadend", function() {
var typed = new Uint8Array(reader.result); var typed = new Uint8Array(reader.result);
message_cb(typed); message_cb(typed);
}); });
reader.readAsArrayBuffer(event.data); reader.readAsArrayBuffer(event.data);
}); });
socket.addEventListener("close", function(event) { socket.addEventListener("close", function(event) {
close_cb(); close_cb();
}); });
}; };
Ok(open_rx.then(|result| { Ok(open_rx.then(|result| {
match result { match result {
Ok(Ok(r)) => Ok(r), Ok(Ok(r)) => Ok(r),
Ok(Err(e)) => Err(e), Ok(Err(e)) => Err(e),
// `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by // `Err` would happen here if `open_tx` is destroyed. `open_tx` is captured by
// the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself // the `WebSocket`, and the `WebSocket` is captured by `open_cb`, which is itself
// captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should // captured by the `WebSocket`. Due to this cyclic dependency, `open_tx` should
// never be destroyed. // never be destroyed.
// TODO: how do we break this cyclic dependency? difficult question // TODO: how do we break this cyclic dependency? difficult question
Err(_) => unreachable!("the sending side will only close when we drop the future") Err(_) => unreachable!("the sending side will only close when we drop the future"),
} }
})) }))
} }
} }
pub struct BrowserWsConn { pub struct BrowserWsConn {
websocket: Reference, websocket: Reference,
// Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`. // Stream of messages that goes through a `RwStreamSink` in order to become a `AsyncRead`.
incoming_data: RwStreamSink<StreamThen<mpsc::UnboundedReceiver<Result<Vec<u8>, IoError>>, incoming_data: RwStreamSink<
fn(Result<Result<Vec<u8>, IoError>, ()>) -> Result<Vec<u8>, IoError>, StreamThen<
Result<Vec<u8>, IoError> mpsc::UnboundedReceiver<Result<Vec<u8>, IoError>>,
>>, fn(Result<Result<Vec<u8>, IoError>, ()>) -> Result<Vec<u8>, IoError>,
Result<Vec<u8>, IoError>,
>,
>,
} }
impl Drop for BrowserWsConn { impl Drop for BrowserWsConn {
#[inline] #[inline]
fn drop(&mut self) { fn drop(&mut self) {
// TODO: apparently there's a memory leak related to callbacks? // TODO: apparently there's a memory leak related to callbacks?
js! { @{&self.websocket}.close(); } js! { @{&self.websocket}.close(); }
} }
} }
impl AsyncRead for BrowserWsConn { impl AsyncRead for BrowserWsConn {}
}
impl Read for BrowserWsConn { impl Read for BrowserWsConn {
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> { fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
self.incoming_data.read(buf) self.incoming_data.read(buf)
} }
} }
impl AsyncWrite for BrowserWsConn { impl AsyncWrite for BrowserWsConn {
#[inline] #[inline]
fn shutdown(&mut self) -> Poll<(), IoError> { fn shutdown(&mut self) -> Poll<(), IoError> {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
} }
impl Write for BrowserWsConn { impl Write for BrowserWsConn {
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> { fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
let typed_array = TypedArray::from(buf); let typed_array = TypedArray::from(buf);
// `send` can throw if the websocket isn't open (which can happen if it was closed by the // `send` can throw if the websocket isn't open (which can happen if it was closed by the
// remote). // remote).
let returned = js! { let returned = js! {
try { try {
@{&self.websocket}.send(@{typed_array.buffer()}); @{&self.websocket}.send(@{typed_array.buffer()});
return true; return true;
} catch(e) { } catch(e) {
return false; return false;
} }
}; };
match returned { match returned {
stdweb::Value::Bool(true) => Ok(buf.len()), stdweb::Value::Bool(true) => Ok(buf.len()),
stdweb::Value::Bool(false) => Err(IoError::new(IoErrorKind::BrokenPipe, "websocket has been closed by the remote")), stdweb::Value::Bool(false) => Err(IoError::new(
_ => unreachable!() IoErrorKind::BrokenPipe,
} "websocket has been closed by the remote",
} )),
_ => unreachable!(),
}
}
#[inline] #[inline]
fn flush(&mut self) -> Result<(), IoError> { fn flush(&mut self) -> Result<(), IoError> {
// Everything is always considered flushed. // Everything is always considered flushed.
Ok(()) Ok(())
} }
} }
// Tries to interpret the `Multiaddr` as a `/ipN/.../tcp/.../ws` multiaddress, and if so returns // Tries to interpret the `Multiaddr` as a `/ipN/.../tcp/.../ws` multiaddress, and if so returns
// the corresponding `ws://.../` URL. // the corresponding `ws://.../` URL.
fn multiaddr_to_target(addr: &Multiaddr) -> Result<String, ()> { fn multiaddr_to_target(addr: &Multiaddr) -> Result<String, ()> {
let protocols: Vec<_> = addr.iter().collect(); let protocols: Vec<_> = addr.iter().collect();
if protocols.len() != 3 { if protocols.len() != 3 {
return Err(()); return Err(());
} }
match (&protocols[0], &protocols[1], &protocols[2]) { match (&protocols[0], &protocols[1], &protocols[2]) {
(&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { (&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => {
Ok(format!("ws://{}:{}/", ip, port)) Ok(format!("ws://{}:{}/", ip, port))
} }
(&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => { (&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port), &AddrComponent::WS) => {
Ok(format!("ws://[{}]:{}/", ip, port)) Ok(format!("ws://[{}]:{}/", ip, port))
} }
_ => Err(()), _ => Err(()),
} }
} }
// TODO: write tests (tests are very difficult to write with emscripten) // TODO: write tests (tests are very difficult to write with emscripten)

View File

@ -106,22 +106,22 @@ where
.map(|(client, _http_headers)| { .map(|(client, _http_headers)| {
// Plug our own API on top of the `websockets` API. // Plug our own API on top of the `websockets` API.
let framed_data = client let framed_data = client
.map_err(|err| IoError::new(IoErrorKind::Other, err)) .map_err(|err| IoError::new(IoErrorKind::Other, err))
.sink_map_err(|err| IoError::new(IoErrorKind::Other, err)) .sink_map_err(|err| IoError::new(IoErrorKind::Other, err))
.with(|data| Ok(OwnedMessage::Binary(data))) .with(|data| Ok(OwnedMessage::Binary(data)))
.and_then(|recv| { .and_then(|recv| {
match recv { match recv {
OwnedMessage::Binary(data) => Ok(Some(data)), OwnedMessage::Binary(data) => Ok(Some(data)),
OwnedMessage::Text(data) => Ok(Some(data.into_bytes())), OwnedMessage::Text(data) => Ok(Some(data.into_bytes())),
OwnedMessage::Close(_) => Ok(None), OwnedMessage::Close(_) => Ok(None),
// TODO: handle pings and pongs, which is freaking hard // TODO: handle pings and pongs, which is freaking hard
// for now we close the socket when that happens // for now we close the socket when that happens
_ => Ok(None) _ => Ok(None)
} }
}) })
// TODO: is there a way to merge both lines into one? // TODO: is there a way to merge both lines into one?
.take_while(|v| Ok(v.is_some())) .take_while(|v| Ok(v.is_some()))
.map(|v| v.expect("we only take while this is Some")); .map(|v| v.expect("we only take while this is Some"));
let read_write = RwStreamSink::new(framed_data); let read_write = RwStreamSink::new(framed_data);
Box::new(read_write) as Box<AsyncStream> Box::new(read_write) as Box<AsyncStream>