From cde9684e4bf7d578285a9b4ea07f1b81a0efaa36 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 13:47:57 -0700 Subject: [PATCH] Clean up atomics/futures + polyfill * Remove now-unneeded `State` enum * Remove timeout argument from polyfill since we don't need it * Call `Atomics.waitAsync` if it's available instead of using our polyfill * Remove some extraneous dead code from the polyfill * Add a `val: i32` argument to the polyfill * Simplify the flow of futures with `Package` since `waitAsync` handles all the heavy lifting for us. * Remove `Arc` and just use `Package` * Remove `RefCell` from inside of `Package` now that it is no longer needed. --- crates/futures/src/atomics.rs | 260 +++++++++++++-------------------- crates/futures/src/polyfill.rs | 151 +++++-------------- 2 files changed, 136 insertions(+), 275 deletions(-) diff --git a/crates/futures/src/atomics.rs b/crates/futures/src/atomics.rs index b18b5309..9914fa15 100644 --- a/crates/futures/src/atomics.rs +++ b/crates/futures/src/atomics.rs @@ -1,4 +1,4 @@ -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::fmt; use std::rc::Rc; use std::sync::atomic::{AtomicI32, Ordering}; @@ -8,8 +8,9 @@ use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; use futures::sync::oneshot; -use js_sys::{Function, Promise}; +use js_sys::Function; use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; /// A Rust `Future` backed by a JavaScript `Promise`. /// @@ -23,14 +24,28 @@ pub struct JsFuture { rx: oneshot::Receiver>, } +// Duplicate a bit here because `then` takes a `JsValue` instead of a `Closure`. +#[wasm_bindgen] +extern "C" { + type Promise; + #[wasm_bindgen(method)] + fn then(this: &Promise, cb: &JsValue) -> Promise; + + type Atomics; + #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync)] + fn wait_async(buf: &JsValue, index: i32, value: i32) -> js_sys::Promise; + #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync, getter)] + fn get_wait_async() -> JsValue; +} + impl fmt::Debug for JsFuture { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "JsFuture {{ ... }}") } } -impl From for JsFuture { - fn from(js: Promise) -> JsFuture { +impl From for JsFuture { + fn from(js: js_sys::Promise) -> JsFuture { // Use the `then` method to schedule two callbacks, one for the // resolved value and one for the rejected value. We're currently // assuming that JS engines will unconditionally invoke precisely one of @@ -112,205 +127,132 @@ impl Future for JsFuture { /// If the `future` provided panics then the returned `Promise` **will not /// resolve**. Instead it will be a leaked promise. This is an unfortunate /// limitation of wasm currently that's hoped to be fixed one day! -pub fn future_to_promise(future: F) -> Promise - where - F: Future + 'static, +pub fn future_to_promise(future: F) -> js_sys::Promise +where + F: Future + 'static, { _future_to_promise(Box::new(future)) } // Implementation of actually transforming a future into a JavaScript `Promise`. // -// The only primitive we have to work with here is `Promise::new`, which gives -// us two callbacks that we can use to either reject or resolve the promise. -// It's our job to ensure that one of those callbacks is called at the -// appropriate time. +// The main primitives used here are `Promise::new` to actually create a JS +// promise to return as well as `Atomics.waitAsync` to create a promise that we +// can asynchronously wait on. The general idea here is that we'll create a +// promise to return and schedule work to happen in `Atomics.waitAsync` +// callbacks. // -// Now we know that JavaScript (in general) can't block and is largely -// notification/callback driven. That means that our future must either have -// synchronous computational work to do, or it's "scheduled a notification" to -// happen. These notifications are likely callbacks to get executed when things -// finish (like a different promise or something like `setTimeout`). The general -// idea here is thus to do as much synchronous work as we can and then otherwise -// translate notifications of a future's task into "let's poll the future!" -// -// This isn't necessarily the greatest future executor in the world, but it -// should get the job done for now hopefully. -fn _future_to_promise(future: Box>) -> Promise { +// After we've created a promise we start polling a future, and whenever it's +// not ready we'll execute `Atomics.waitAsync`. When that resolves we'll keep +// polling the future, and this happens until the future is done. Finally +// when it's all finished we call either resolver or reject depending on the +// result of the future. +fn _future_to_promise(future: Box>) -> js_sys::Promise { let mut future = Some(executor::spawn(future)); - return Promise::new(&mut |resolve, reject| { - Package::poll(&Arc::new(Package { - spawn: RefCell::new(future.take().unwrap()), + return js_sys::Promise::new(&mut |resolve, reject| { + Package { + spawn: future.take().unwrap(), resolve, reject, - notified: Cell::new(State::Notified), - waker: Arc::new(Waker::default()), - })); + waker: Arc::new(Waker { + value: AtomicI32::new(1), // 1 == "notified, ready to poll" + }), + } + .poll(); }); struct Package { // Our "spawned future". This'll have everything we need to poll the // future and continue to move it forward. - spawn: RefCell>>>, - - // The current state of this future, expressed in an enum below. This - // indicates whether we're currently polling the future, received a - // notification and need to keep polling, or if we're waiting for a - // notification to come in (and no one is polling). - notified: Cell, + spawn: Spawn>>, // Our two callbacks connected to the `Promise` that we returned to // JavaScript. We'll be invoking one of these at the end. resolve: Function, reject: Function, - // Struct to wake a future + // Shared state used to communicate waking up this future, this is the + // `Send + Sync` piece needed by the async task system. waker: Arc, } - // The possible states our `Package` (future) can be in, tracked internally - // and used to guide what happens when polling a future. - enum State { - // This future is currently and actively being polled. Attempting to - // access the future will result in a runtime panic and is considered a - // bug. - Polling, - - // This future has been notified, while it was being polled. This marker - // is used in the `Notify` implementation below, and indicates that a - // notification was received that the future is ready to make progress. - // If seen, however, it probably means that the future is also currently - // being polled. - Notified, - - // The future is blocked, waiting for something to happen. Stored here - // is a self-reference to the future itself so we can pull it out in - // `Notify` and continue polling. - // - // Note that the self-reference here is an Arc-cycle that will leak - // memory unless the future completes, but currently that should be ok - // as we'll have to stick around anyway while the future is executing! - // - // This state is removed as soon as a notification comes in, so the leak - // should only be "temporary" - Waiting(Arc), - } - - #[derive(Default)] struct Waker { - // worker will be waiting on this value - // 0 by default, which means not notified value: AtomicI32, }; impl Notify for Waker { fn notify(&self, _id: usize) { - // since we have only value field here - // let it be 1 if notified, 0 if not - if self.value.swap(1, Ordering::SeqCst) == 0 { - let _ = unsafe { - core::arch::wasm32::atomic_notify( - &self.value as *const AtomicI32 as *mut i32, - std::u32::MAX, // number of threads to notify - ) - }; + // Attempt to notify us by storing 1. If we're already 1 then we + // were previously notified and there's nothing to do. Otherwise + // we execute the native `notify` instruction to wake up the + // corresponding `waitAsync` that was waiting for the transition + // from 0 to 1. + let prev = self.value.swap(1, Ordering::SeqCst); + if prev == 1 { + return; + } + debug_assert_eq!(prev, 0); + unsafe { + core::arch::wasm32::atomic_notify( + &self.value as *const AtomicI32 as *mut i32, + 1, // number of threads to notify + ); } } } - fn poll_again(package: Arc) { - let me = match package.notified.replace(State::Notified) { - // we need to schedule polling to resume, so keep going - State::Waiting(me) => { - me - } - - // we were already notified, and were just notified again; - // having now coalesced the notifications we return as it's - // still someone else's job to process this - State::Notified => { - return; - } - - // the future was previously being polled, and we've just - // switched it to the "you're notified" state. We don't have - // access to the future as it's being polled, so the future - // polling process later sees this notification and will - // continue polling. For us, though, there's nothing else to do, - // so we bail out. - // later see - State::Polling => { - return; - } - }; - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. We don't currently poll immediately as it turns out - // `futures` crate adapters aren't compatible with it and it also - // helps avoid blowing the stack by accident. - let promise = - crate::polyfill::wait_async(&package.waker.value).expect("Should create a Promise"); - let closure = Closure::once(move |_| { - Package::poll(&me); - }); - promise.then(&closure); - closure.forget(); - } - impl Package { - // Move the future contained in `me` as far forward as we can. This will - // do as much synchronous work as possible to complete the future, - // ensuring that when it blocks we're scheduled to get notified via some - // callback somewhere at some point (vague, right?) - // - // TODO: this probably shouldn't do as much synchronous work as possible - // as it can starve other computations. Rather it should instead - // yield every so often with something like `setTimeout` with the - // timeout set to zero. - fn poll(me: &Arc) { - loop { - match me.notified.replace(State::Polling) { - // We received a notification while previously polling, or - // this is the initial poll. We've got work to do below! - State::Notified => {} - - // We've gone through this loop once and no notification was - // received while we were executing work. That means we got - // `NotReady` below and we're scheduled to receive a - // notification. Block ourselves and wait for later. - // - // When the notification comes in it'll notify our task, see - // our `Waiting` state, and resume the polling process - State::Polling => { - me.notified.set(State::Waiting(me.clone())); - - poll_again(me.clone()); - - break; - } - - State::Waiting(_) => panic!("shouldn't see waiting state!"), - } - - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) { + fn poll(mut self) { + // Poll in a loop waiting for the future to become ready. Note that + // we probably shouldn't maximize synchronous work here but rather + // we should occasionally yield back to the runtime and schedule + // ourselves to resume this future later on. + // + // Note that 0 here means "need a notification" and 1 means "we got + // a notification". That means we're storing 0 into the `notified` + // slot and we're trying to read 1 to keep on going. + while self.waker.value.swap(0, Ordering::SeqCst) == 1 { + let (val, f) = match self.spawn.poll_future_notify(&self.waker, 0) { // If the future is ready, immediately call the // resolve/reject callback and then return as we're done. - Ok(Async::Ready(value)) => (value, &me.resolve), - Err(value) => (value, &me.reject), + Ok(Async::Ready(value)) => (value, &self.resolve), + Err(value) => (value, &self.reject), - // Otherwise keep going in our loop, if we weren't notified - // we'll break out and start waiting. - Ok(Async::NotReady) => continue, + // ... otherwise let's break out and wait + Ok(Async::NotReady) => break, }; + // Call the resolution function, and then when we're done + // destroy ourselves through `drop` since our future is no + // longer needed. drop(f.call1(&JsValue::undefined(), &val)); - break; + return; } + + // Create a `js_sys::Promise` using `Atomics.waitAsync` (or our + // polyfill) and then register its completion callback as simply + // calling this function again. + let promise = wait_async(&self.waker.value, 0).unchecked_into::(); + let closure = Closure::once_into_js(move || { + self.poll(); + }); + promise.then(&closure); } } } +fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise { + // If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today) + // then we use our fallback, otherwise we use the native function. + if Atomics::get_wait_async().is_undefined() { + crate::polyfill::wait_async(ptr, val) + } else { + let mem = wasm_bindgen::memory().unchecked_into::(); + Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val) + } + +} + /// Converts a Rust `Future` on a local task queue. /// /// The `future` provided must adhere to `'static` because it'll be scheduled @@ -320,8 +262,8 @@ fn _future_to_promise(future: Box>) /// /// This function has the same panic behavior as `future_to_promise`. pub fn spawn_local(future: F) - where - F: Future + 'static, +where + F: Future + 'static, { future_to_promise( future diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index 8a2c3424..42dc84b6 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -36,38 +36,21 @@ * when possible. The worker communicates with its parent using postMessage. */ +use js_sys::{encode_uri_component, Array, Promise}; use std::cell::RefCell; -use std::sync::atomic::{AtomicI32, Ordering}; - -use js_sys::{ - encode_uri_component, Array, Function, Int32Array, JsString, Promise, Reflect, - WebAssembly, -}; +use std::sync::atomic::AtomicI32; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{MessageEvent, Worker}; -const DEFAULT_TIMEOUT: f64 = std::f64::INFINITY; - const HELPER_CODE: &'static str = " onmessage = function (ev) { - try { - switch (ev.data[0]) { - case 'wait': { - let [_, ia, index, value, timeout] = ev.data; - let result = Atomics.wait(ia, index, value, timeout); - postMessage(['ok', result]); - break; - } - default: { - throw new Error('Wrong message sent to wait helper: ' + ev.data.join(',')); - } - } - } catch (e) { - console.log('Exception in wait helper', e); - postMessage(['error', 'Exception']); - } -} + let [ia, index, value] = ev.data; + ia = new Int32Array(ia.buffer); + let result = Atomics.wait(ia, index, value); + console.log('done', result); + postMessage(result); +}; "; thread_local! { @@ -84,103 +67,39 @@ fn alloc_helper() -> Worker { let encoded: String = encode_uri_component(HELPER_CODE).into(); initialization_string.push_str(&encoded); - Worker::new(&initialization_string).expect("Should create a Worker") + Worker::new(&initialization_string).unwrap_or_else(|js| wasm_bindgen::throw_val(js)) }) } fn free_helper(helper: Worker) { HELPERS.with(move |helpers| { - helpers.borrow_mut().push(helper.clone()); + let mut helpers = helpers.borrow_mut(); + helpers.push(helper.clone()); + helpers.truncate(10); // random arbitrary limit chosen here }); } -pub fn wait_async(value: &AtomicI32) -> Result { - wait_async_with_timeout(value, DEFAULT_TIMEOUT) -} - -fn get_array_item(array: &JsValue, index: u32) -> JsValue { - Reflect::get(array, &JsValue::from(index)) - .expect(&format!("Array should contain the index {}", index)) -} - -// Atomics.waitAsync always returns a promise. Throws standard errors -// for parameter validation. The promise is resolved with a string as from -// Atomics.wait, or, in the case something went completely wrong, it is -// rejected with an error string. -pub fn wait_async_with_timeout(value: &AtomicI32, timeout: f64) -> Result { - let memory_buffer = wasm_bindgen::memory() - .dyn_into::() - .expect("Should cast a memory to WebAssembly::Memory") - .buffer(); - - let indexed_array = Int32Array::new(&memory_buffer); - - let index = value as *const AtomicI32 as u32 / 4; - let value_i32 = value.load(Ordering::SeqCst); - - // General case, we must wait. - - Ok(Promise::new( - &mut move |resolve: Function, reject: Function| { - let helper = alloc_helper(); - let helper_ref = helper.clone(); - - let onmessage_callback = Closure::once_into_js(move |e: MessageEvent| { - // Free the helper early so that it can be reused if the resolution - // needs a helper. - free_helper(helper_ref); - match String::from( - get_array_item(&e.data(), 0) - .as_string() - .expect("data[0] should return a String"), - ) - .as_str() - { - "ok" => { - resolve - .call1(&JsValue::NULL, &get_array_item(&e.data(), 1)) - .expect("Should successfully call a resolve callback"); - } - "error" => { - // Note, rejection is not in the spec, it is an artifact of the polyfill. - // The helper already printed an error to the console. - reject - .call1(&JsValue::NULL, &get_array_item(&e.data(), 1)) - .expect("Should successfully call a reject callback"); - } - // it's not specified in the proposal yet - _ => (), - } - }); - helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); - - // onmessage_callback.forget(); - - // It's possible to do better here if the ia is already known to the - // helper. In that case we can communicate the other data through - // shared memory and wake the agent. And it is possible to make ia - // known to the helper by waking it with a special value so that it - // checks its messages, and then posting the ia to the helper. Some - // caching / decay scheme is useful no doubt, to improve performance - // and avoid leaks. - // - // In the event we wake the helper directly, we can micro-wait here - // for a quick result. We'll need to restructure some code to make - // that work out properly, and some synchronization is necessary for - // the helper to know that we've picked up the result and no - // postMessage is necessary. - - let data = Array::of5( - &JsString::from("wait"), - &indexed_array, - &JsValue::from(index), - &JsValue::from(value_i32), - &JsValue::from(timeout), - ); - - helper - .post_message(&data) - .expect("Should successfully post data to a Worker"); - }, - )) +pub fn wait_async(ptr: &AtomicI32, value: i32) -> Promise { + Promise::new(&mut |resolve, _reject| { + let helper = alloc_helper(); + let helper_ref = helper.clone(); + + let onmessage_callback = Closure::once_into_js(move |e: MessageEvent| { + // Our helper is done waiting so it's available to wait on a + // different location, so return it to the free list. + free_helper(helper_ref); + drop(resolve.call1(&JsValue::NULL, &e.data())); + }); + helper.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + + let data = Array::of3( + &wasm_bindgen::memory(), + &JsValue::from(ptr as *const AtomicI32 as i32 / 4), + &JsValue::from(value), + ); + + helper + .post_message(&data) + .unwrap_or_else(|js| wasm_bindgen::throw_val(js)); + }) }