diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index e17eaaee..1c0f3464 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -110,15 +110,18 @@ pub mod futures_0_3; use std::cell::{Cell, RefCell}; use std::fmt; use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; use futures::sync::oneshot; -use js_sys::{Function, Promise}; +use js_sys::{Atomics, Function, Int32Array, Promise, SharedArrayBuffer}; use wasm_bindgen::prelude::*; +mod polyfill; + /// A Rust `Future` backed by a JavaScript `Promise`. /// /// This type is constructed with a JavaScript `Promise` object and translates @@ -252,6 +255,7 @@ fn _future_to_promise(future: Box>) resolve, reject, notified: Cell::new(State::Notified), + waker: Arc::new(Waker::new(SharedArrayBuffer::new(4), false)), })); }); @@ -270,6 +274,9 @@ fn _future_to_promise(future: Box>) // JavaScript. We'll be invoking one of these at the end. resolve: Function, reject: Function, + + // Struct to wake a future + waker: Arc, } // The possible states our `Package` (future) can be in, tracked internally @@ -300,10 +307,68 @@ fn _future_to_promise(future: Box>) Waiting(Arc), } - // No shared memory right now, wasm is single threaded, no need to worry - // about this! - unsafe impl Send for Package {} - unsafe impl Sync for Package {} + struct Waker { + buffer: SharedArrayBuffer, + notified: AtomicBool, + }; + + impl Waker { + fn new(buffer: SharedArrayBuffer, notified: bool) -> Self { + Self { + buffer, + notified: AtomicBool::new(notified), + } + } + } + + impl Notify for Waker { + fn notify(&self, id: usize) { + if !self.notified.swap(true, Ordering::SeqCst) { + let array = Int32Array::new(&self.buffer); + let _ = Atomics::notify(&array, id as u32); + } + } + } + + fn poll_again(package: Arc, id: usize) { + let me = match package.notified.replace(State::Notified) { + // we need to schedule polling to resume, so keep going + State::Waiting(me) => me, + + // we were already notified, and were just notified again; + // having now coalesced the notifications we return as it's + // still someone else's job to process this + State::Notified => return, + + // the future was previously being polled, and we've just + // switched it to the "you're notified" state. We don't have + // access to the future as it's being polled, so the future + // polling process later sees this notification and will + // continue polling. For us, though, there's nothing else to do, + // so we bail out. + // later see + State::Polling => return, + }; + + // Use `Promise.then` on a resolved promise to place our execution + // onto the next turn of the microtask queue, enqueueing our poll + // operation. We don't currently poll immediately as it turns out + // `futures` crate adapters aren't compatible with it and it also + // helps avoid blowing the stack by accident. + // + // Note that the `Rc`/`RefCell` trick here is basically to just + // ensure that our `Closure` gets cleaned up appropriately. + let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0); + let slot = Rc::new(RefCell::new(None)); + let slot2 = slot.clone(); + let closure = Closure::wrap(Box::new(move |_| { + let myself = slot2.borrow_mut().take(); + debug_assert!(myself.is_some()); + Package::poll(&me); + }) as Box); + promise.then(&closure); + *slot.borrow_mut() = Some(closure); + } impl Package { // Move the future contained in `me` as far forward as we can. This will @@ -331,13 +396,14 @@ fn _future_to_promise(future: Box>) // our `Waiting` state, and resume the polling process State::Polling => { me.notified.set(State::Waiting(me.clone())); + poll_again(me.clone(), 0); break; } State::Waiting(_) => panic!("shouldn't see waiting state!"), } - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) { // If the future is ready, immediately call the // resolve/reject callback and then return as we're done. Ok(Async::Ready(value)) => (value, &me.resolve), @@ -353,48 +419,6 @@ fn _future_to_promise(future: Box>) } } } - - impl Notify for Package { - fn notify(&self, _id: usize) { - let me = match self.notified.replace(State::Notified) { - // we need to schedule polling to resume, so keep going - State::Waiting(me) => me, - - // we were already notified, and were just notified again; - // having now coalesced the notifications we return as it's - // still someone else's job to process this - State::Notified => return, - - // the future was previously being polled, and we've just - // switched it to the "you're notified" state. We don't have - // access to the future as it's being polled, so the future - // polling process later sees this notification and will - // continue polling. For us, though, there's nothing else to do, - // so we bail out. - // later see - State::Polling => return, - }; - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. We don't currently poll immediately as it turns out - // `futures` crate adapters aren't compatible with it and it also - // helps avoid blowing the stack by accident. - // - // Note that the `Rc`/`RefCell` trick here is basically to just - // ensure that our `Closure` gets cleaned up appropriately. - let promise = Promise::resolve(&JsValue::undefined()); - let slot = Rc::new(RefCell::new(None)); - let slot2 = slot.clone(); - let closure = Closure::wrap(Box::new(move |_| { - let myself = slot2.borrow_mut().take(); - debug_assert!(myself.is_some()); - Package::poll(&me); - }) as Box); - promise.then(&closure); - *slot.borrow_mut() = Some(closure); - } - } } /// Converts a Rust `Future` on a local task queue. diff --git a/crates/futures/src/polyfill.js b/crates/futures/src/polyfill.js new file mode 100644 index 00000000..ece6f966 --- /dev/null +++ b/crates/futures/src/polyfill.js @@ -0,0 +1,130 @@ +/* + * The polyfill was kindly borrowed from https://github.com/tc39/proposal-atomics-wait-async + */ + +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Author: Lars T Hansen, lhansen@mozilla.com + */ + +/* Polyfill for Atomics.waitAsync() for web browsers. + * + * Any kind of agent that is able to create a new Worker can use this polyfill. + * + * Load this file in all agents that will use Atomics.waitAsync. + * + * Agents that don't call Atomics.waitAsync need do nothing special. + * + * Any kind of agent can wake another agent that is sleeping in + * Atomics.waitAsync by just calling Atomics.wake for the location being slept + * on, as normal. + * + * The implementation is not completely faithful to the proposed semantics: in + * the case where an agent first asyncWaits and then waits on the same location: + * when it is woken, the two waits will be woken in order, while in the real + * semantics, the sync wait will be woken first. + * + * In this polyfill Atomics.waitAsync is not very fast. + */ + +/* Implementation: + * + * For every wait we fork off a Worker to perform the wait. Workers are reused + * when possible. The worker communicates with its parent using postMessage. + */ + +const helperCode = ` +onmessage = function (ev) { + try { + switch (ev.data[0]) { + case 'wait': { + let [_, ia, index, value, timeout] = ev.data; + let result = Atomics.wait(ia, index, value, timeout) + postMessage(['ok', result]); + break; + } + default: { + throw new Error("Wrong message sent to wait helper: " + ev.data.join(',')); + } + } + } catch (e) { + console.log("Exception in wait helper"); + postMessage(['error', 'Exception']); + } +} +`; + +const helpers = []; + +function allocHelper() { + if (helpers.length > 0) { + return helpers.pop(); + } + return new Worker("data:application/javascript," + encodeURIComponent(helperCode)); +} + +function freeHelper(h) { + helpers.push(h); +} + +// Atomics.waitAsync always returns a promise. Throws standard errors +// for parameter validation. The promise is resolved with a string as from +// Atomics.wait, or, in the case something went completely wrong, it is +// rejected with an error string. +export function waitAsync(ia, index, value, timeout = Infinity) { + if (typeof ia != "object" + || !(ia instanceof Int32Array) + || !(ia.buffer instanceof SharedArrayBuffer) + ) { + throw new TypeError("Expected shared memory"); + } + + // Range checking for the index. + + ia[index]; + + // Optimization, avoid the helper thread in this common case. + + if (Atomics.load(ia, index) !== value) { + return Promise.resolve("not-equal"); + } + + // General case, we must wait. + + return new Promise(function (resolve, reject) { + const h = allocHelper(); + h.onmessage = function (ev) { + // Free the helper early so that it can be reused if the resolution + // needs a helper. + freeHelper(h); + switch (ev.data[0]) { + case 'ok': + resolve(ev.data[1]); + break; + case 'error': + // Note, rejection is not in the spec, it is an artifact of the polyfill. + // The helper already printed an error to the console. + reject(ev.data[1]); + break; + } + }; + + // It's possible to do better here if the ia is already known to the + // helper. In that case we can communicate the other data through + // shared memory and wake the agent. And it is possible to make ia + // known to the helper by waking it with a special value so that it + // checks its messages, and then posting the ia to the helper. Some + // caching / decay scheme is useful no doubt, to improve performance + // and avoid leaks. + // + // In the event we wake the helper directly, we can micro-wait here + // for a quick result. We'll need to restructure some code to make + // that work out properly, and some synchronization is necessary for + // the helper to know that we've picked up the result and no + // postMessage is necessary. + + h.postMessage(['wait', ia, index, value, timeout]); + }) +} diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs new file mode 100644 index 00000000..cd77147b --- /dev/null +++ b/crates/futures/src/polyfill.rs @@ -0,0 +1,16 @@ +use js_sys::{Int32Array, Promise, SharedArrayBuffer}; +use wasm_bindgen::prelude::*; + +#[wasm_bindgen(module = "/src/polyfill.js")] +extern "C" { + #[wasm_bindgen(js_name = waitAsync)] + pub fn wait_async(indexed_array: Int32Array, index: u32, value: i32) -> Promise; + + #[wasm_bindgen(js_name = waitAsync)] + pub fn wait_async_with_timeout( + indexed_array: Int32Array, + index: u32, + value: i32, + timeout: f64, + ) -> Promise; +} diff --git a/crates/js-sys/src/lib.rs b/crates/js-sys/src/lib.rs index b3e92161..137bb51f 100644 --- a/crates/js-sys/src/lib.rs +++ b/crates/js-sys/src/lib.rs @@ -495,6 +495,9 @@ extern "C" { pub fn slice_with_end(this: &SharedArrayBuffer, begin: u32, end: u32) -> SharedArrayBuffer; } +unsafe impl Send for SharedArrayBuffer {} +unsafe impl Sync for SharedArrayBuffer {} + // Array Iterator #[wasm_bindgen] extern "C" { @@ -598,10 +601,19 @@ pub mod Atomics { /// The static `Atomics.notify()` method notifies up some agents that /// are sleeping in the wait queue. /// Note: This operation works with a shared `Int32Array` only. + /// If `count` is not provided, notifies all the agents int the queue. /// /// [MDN documentation](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics/notify) #[wasm_bindgen(js_namespace = Atomics, catch)] - pub fn notify(typed_array: &Int32Array, index: u32, count: u32) -> Result; + pub fn notify(typed_array: &Int32Array, index: u32) -> Result; + + /// Notifies up to `count` agents in the wait queue. + #[wasm_bindgen(js_namespace = Atomics, catch, js_name = notify)] + pub fn notify_with_count( + typed_array: &Int32Array, + index: u32, + count: u32, + ) -> Result; /// The static `Atomics.or()` method computes a bitwise OR with a given value /// at a given position in the array, and returns the old value at that position.