diff --git a/transports/wasm-ext/CHANGELOG.md b/transports/wasm-ext/CHANGELOG.md index 5ab89252..1c87dc21 100644 --- a/transports/wasm-ext/CHANGELOG.md +++ b/transports/wasm-ext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.20.1 [2020-07-06] + +- Improve the code quality of the `websockets.js` binding with the browser's `WebSocket` API. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index ddfa8030..324fee2a 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-wasm-ext" -version = "0.20.0" +version = "0.20.1" authors = ["Pierre Krieger "] edition = "2018" description = "Allows passing in an external transport in a WASM environment" diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index 1d4334ec..2ce0f440 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -72,14 +72,17 @@ pub mod ffi { #[wasm_bindgen(method, catch)] pub fn listen_on(this: &Transport, multiaddr: &str) -> Result; - /// Returns a `Readable​Stream​`. + /// Returns an iterator of JavaScript `Promise`s that resolve to `ArrayBuffer` objects + /// (or resolve to null, see below). These `ArrayBuffer` objects contain the data that the + /// remote has sent to us. If the remote closes the connection, the iterator must produce + /// a `Promise` that resolves to `null`. #[wasm_bindgen(method, getter)] pub fn read(this: &Connection) -> js_sys::Iterator; /// Writes data to the connection. Returns a `Promise` that resolves when the connection is /// ready for writing again. /// - /// If the `Promise` returns an error, the writing side of the connection is considered + /// If the `Promise` produces an error, the writing side of the connection is considered /// unrecoverable and the connection should be closed as soon as possible. /// /// Guaranteed to only be called after the previous write promise has resolved. @@ -95,7 +98,8 @@ pub mod ffi { #[wasm_bindgen(method)] pub fn close(this: &Connection); - /// List of addresses we have started listening on. Must be an array of strings of multiaddrs. + /// List of addresses we have started listening on. Must be an array of strings of + /// multiaddrs. #[wasm_bindgen(method, getter)] pub fn new_addrs(this: &ListenEvent) -> Option>; diff --git a/transports/wasm-ext/src/websockets.js b/transports/wasm-ext/src/websockets.js index babf10f9..7b5fcbf1 100644 --- a/transports/wasm-ext/src/websockets.js +++ b/transports/wasm-ext/src/websockets.js @@ -30,7 +30,6 @@ export const websocket_transport = () => { } /// Turns a string multiaddress into a WebSockets string URL. -// TODO: support dns addresses as well const multiaddr_to_ws = (addr) => { let parsed = addr.match(/^\/(ip4|ip6|dns4|dns6|dns)\/(.*?)\/tcp\/(.*?)\/(ws|wss|x-parity-ws\/(.*)|x-parity-wss\/(.*))$/); if (parsed != null) { @@ -54,43 +53,64 @@ const multiaddr_to_ws = (addr) => { // Attempt to dial a multiaddress. const dial = (addr) => { let ws = new WebSocket(multiaddr_to_ws(addr)); + ws.binaryType = "arraybuffer"; let reader = read_queue(); - return new Promise((resolve, reject) => { - // TODO: handle ws.onerror properly after dialing has happened - ws.onerror = (ev) => reject(ev); - ws.onmessage = (ev) => reader.inject_blob(ev.data); - ws.onclose = () => reader.inject_eof(); - ws.onopen = () => resolve({ + return new Promise((open_resolve, open_reject) => { + ws.onerror = (ev) => { + // If `open_resolve` has been called earlier, calling `open_reject` seems to be + // silently ignored. It is easier to unconditionally call `open_reject` rather than + // check in which state the connection is, which would be error-prone. + open_reject(ev); + // Injecting an EOF is how we report to the reading side that the connection has been + // closed. Injecting multiple EOFs is harmless. + reader.inject_eof(); + }; + ws.onclose = (ev) => { + // Same remarks as above. + open_reject(ev); + reader.inject_eof(); + }; + + // We inject all incoming messages into the queue unconditionally. The caller isn't + // supposed to access this queue unless the connection is open. + ws.onmessage = (ev) => reader.inject_array_buffer(ev.data); + + ws.onopen = () => open_resolve({ read: (function*() { while(ws.readyState == 1) { yield reader.next(); } })(), write: (data) => { if (ws.readyState == 1) { ws.send(data); - return promise_when_ws_finished(ws); + return promise_when_send_finished(ws); } else { return Promise.reject("WebSocket is closed"); } }, - shutdown: () => {}, - close: () => ws.close() + shutdown: () => ws.close(), + close: () => {} }); }); } -// Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is 0. -const promise_when_ws_finished = (ws) => { - if (ws.bufferedAmount == 0) { - return Promise.resolve(); - } - +// Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is low enough +// to allow more data to be sent. +const promise_when_send_finished = (ws) => { return new Promise((resolve, reject) => { - setTimeout(function check() { - if (ws.bufferedAmount == 0) { + function check() { + if (ws.readyState != 1) { + reject("WebSocket is closed"); + return; + } + + // We put an arbitrary threshold of 8 kiB of buffered data. + if (ws.bufferedAmount < 8 * 1024) { resolve(); } else { setTimeout(check, 100); } - }, 2); + } + + check(); }) } @@ -108,29 +128,20 @@ const read_queue = () => { return { // Inserts a new Blob in the queue. - inject_blob: (blob) => { + inject_array_buffer: (buffer) => { if (state.resolve != null) { - var resolve = state.resolve; + state.resolve(buffer); state.resolve = null; - - var reader = new FileReader(); - reader.addEventListener("loadend", () => resolve(reader.result)); - reader.readAsArrayBuffer(blob); } else { - state.queue.push(new Promise((resolve, reject) => { - var reader = new FileReader(); - reader.addEventListener("loadend", () => resolve(reader.result)); - reader.readAsArrayBuffer(blob); - })); + state.queue.push(Promise.resolve(buffer)); } }, // Inserts an EOF message in the queue. inject_eof: () => { if (state.resolve != null) { - var resolve = state.resolve; + state.resolve(null); state.resolve = null; - resolve(null); } else { state.queue.push(Promise.resolve(null)); }