Improve the code of the WebSocket browser binding (#1644)

* Improve the code of the WebSocket browser binding

* Bump version

* Set release date.

Co-authored-by: Roman S. Borschel <roman@parity.io>
This commit is contained in:
Pierre Krieger
2020-07-06 10:21:03 +02:00
committed by GitHub
parent 568a018090
commit d645ccb0df
4 changed files with 55 additions and 36 deletions

View File

@ -1,3 +1,7 @@
# 0.20.1 [2020-07-06]
- Improve the code quality of the `websockets.js` binding with the browser's `WebSocket` API.
# 0.20.0 [2020-07-01] # 0.20.0 [2020-07-01]
- Updated dependencies. - Updated dependencies.

View File

@ -1,6 +1,6 @@
[package] [package]
name = "libp2p-wasm-ext" name = "libp2p-wasm-ext"
version = "0.20.0" version = "0.20.1"
authors = ["Pierre Krieger <pierre.krieger1708@gmail.com>"] authors = ["Pierre Krieger <pierre.krieger1708@gmail.com>"]
edition = "2018" edition = "2018"
description = "Allows passing in an external transport in a WASM environment" description = "Allows passing in an external transport in a WASM environment"

View File

@ -72,14 +72,17 @@ pub mod ffi {
#[wasm_bindgen(method, catch)] #[wasm_bindgen(method, catch)]
pub fn listen_on(this: &Transport, multiaddr: &str) -> Result<js_sys::Iterator, JsValue>; pub fn listen_on(this: &Transport, multiaddr: &str) -> Result<js_sys::Iterator, JsValue>;
/// Returns a `ReadableStream`. /// Returns an iterator of JavaScript `Promise`s that resolve to `ArrayBuffer` objects
/// (or resolve to null, see below). These `ArrayBuffer` objects contain the data that the
/// remote has sent to us. If the remote closes the connection, the iterator must produce
/// a `Promise` that resolves to `null`.
#[wasm_bindgen(method, getter)] #[wasm_bindgen(method, getter)]
pub fn read(this: &Connection) -> js_sys::Iterator; pub fn read(this: &Connection) -> js_sys::Iterator;
/// Writes data to the connection. Returns a `Promise` that resolves when the connection is /// Writes data to the connection. Returns a `Promise` that resolves when the connection is
/// ready for writing again. /// ready for writing again.
/// ///
/// If the `Promise` returns an error, the writing side of the connection is considered /// If the `Promise` produces an error, the writing side of the connection is considered
/// unrecoverable and the connection should be closed as soon as possible. /// unrecoverable and the connection should be closed as soon as possible.
/// ///
/// Guaranteed to only be called after the previous write promise has resolved. /// Guaranteed to only be called after the previous write promise has resolved.
@ -95,7 +98,8 @@ pub mod ffi {
#[wasm_bindgen(method)] #[wasm_bindgen(method)]
pub fn close(this: &Connection); pub fn close(this: &Connection);
/// List of addresses we have started listening on. Must be an array of strings of multiaddrs. /// List of addresses we have started listening on. Must be an array of strings of
/// multiaddrs.
#[wasm_bindgen(method, getter)] #[wasm_bindgen(method, getter)]
pub fn new_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>; pub fn new_addrs(this: &ListenEvent) -> Option<Box<[JsValue]>>;

View File

@ -30,7 +30,6 @@ export const websocket_transport = () => {
} }
/// Turns a string multiaddress into a WebSockets string URL. /// Turns a string multiaddress into a WebSockets string URL.
// TODO: support dns addresses as well
const multiaddr_to_ws = (addr) => { const multiaddr_to_ws = (addr) => {
let parsed = addr.match(/^\/(ip4|ip6|dns4|dns6|dns)\/(.*?)\/tcp\/(.*?)\/(ws|wss|x-parity-ws\/(.*)|x-parity-wss\/(.*))$/); let parsed = addr.match(/^\/(ip4|ip6|dns4|dns6|dns)\/(.*?)\/tcp\/(.*?)\/(ws|wss|x-parity-ws\/(.*)|x-parity-wss\/(.*))$/);
if (parsed != null) { if (parsed != null) {
@ -54,43 +53,64 @@ const multiaddr_to_ws = (addr) => {
// Attempt to dial a multiaddress. // Attempt to dial a multiaddress.
const dial = (addr) => { const dial = (addr) => {
let ws = new WebSocket(multiaddr_to_ws(addr)); let ws = new WebSocket(multiaddr_to_ws(addr));
ws.binaryType = "arraybuffer";
let reader = read_queue(); let reader = read_queue();
return new Promise((resolve, reject) => { return new Promise((open_resolve, open_reject) => {
// TODO: handle ws.onerror properly after dialing has happened ws.onerror = (ev) => {
ws.onerror = (ev) => reject(ev); // If `open_resolve` has been called earlier, calling `open_reject` seems to be
ws.onmessage = (ev) => reader.inject_blob(ev.data); // silently ignored. It is easier to unconditionally call `open_reject` rather than
ws.onclose = () => reader.inject_eof(); // check in which state the connection is, which would be error-prone.
ws.onopen = () => resolve({ open_reject(ev);
// Injecting an EOF is how we report to the reading side that the connection has been
// closed. Injecting multiple EOFs is harmless.
reader.inject_eof();
};
ws.onclose = (ev) => {
// Same remarks as above.
open_reject(ev);
reader.inject_eof();
};
// We inject all incoming messages into the queue unconditionally. The caller isn't
// supposed to access this queue unless the connection is open.
ws.onmessage = (ev) => reader.inject_array_buffer(ev.data);
ws.onopen = () => open_resolve({
read: (function*() { while(ws.readyState == 1) { yield reader.next(); } })(), read: (function*() { while(ws.readyState == 1) { yield reader.next(); } })(),
write: (data) => { write: (data) => {
if (ws.readyState == 1) { if (ws.readyState == 1) {
ws.send(data); ws.send(data);
return promise_when_ws_finished(ws); return promise_when_send_finished(ws);
} else { } else {
return Promise.reject("WebSocket is closed"); return Promise.reject("WebSocket is closed");
} }
}, },
shutdown: () => {}, shutdown: () => ws.close(),
close: () => ws.close() close: () => {}
}); });
}); });
} }
// Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is 0. // Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is low enough
const promise_when_ws_finished = (ws) => { // to allow more data to be sent.
if (ws.bufferedAmount == 0) { const promise_when_send_finished = (ws) => {
return Promise.resolve();
}
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
setTimeout(function check() { function check() {
if (ws.bufferedAmount == 0) { if (ws.readyState != 1) {
reject("WebSocket is closed");
return;
}
// We put an arbitrary threshold of 8 kiB of buffered data.
if (ws.bufferedAmount < 8 * 1024) {
resolve(); resolve();
} else { } else {
setTimeout(check, 100); setTimeout(check, 100);
} }
}, 2); }
check();
}) })
} }
@ -108,29 +128,20 @@ const read_queue = () => {
return { return {
// Inserts a new Blob in the queue. // Inserts a new Blob in the queue.
inject_blob: (blob) => { inject_array_buffer: (buffer) => {
if (state.resolve != null) { if (state.resolve != null) {
var resolve = state.resolve; state.resolve(buffer);
state.resolve = null; state.resolve = null;
var reader = new FileReader();
reader.addEventListener("loadend", () => resolve(reader.result));
reader.readAsArrayBuffer(blob);
} else { } else {
state.queue.push(new Promise((resolve, reject) => { state.queue.push(Promise.resolve(buffer));
var reader = new FileReader();
reader.addEventListener("loadend", () => resolve(reader.result));
reader.readAsArrayBuffer(blob);
}));
} }
}, },
// Inserts an EOF message in the queue. // Inserts an EOF message in the queue.
inject_eof: () => { inject_eof: () => {
if (state.resolve != null) { if (state.resolve != null) {
var resolve = state.resolve; state.resolve(null);
state.resolve = null; state.resolve = null;
resolve(null);
} else { } else {
state.queue.push(Promise.resolve(null)); state.queue.push(Promise.resolve(null));
} }