diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 57313ec4..2e412a97 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -110,18 +110,36 @@ pub mod futures_0_3; use std::cell::{Cell, RefCell}; use std::fmt; use std::rc::Rc; +#[cfg(target_feature = "atomics")] use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +#[cfg(target_feature = "atomics")] +use std::sync::Mutex; use futures::executor::{self, Notify, Spawn}; use futures::future; use futures::prelude::*; use futures::sync::oneshot; -use js_sys::{Atomics, Function, Int32Array, Promise, SharedArrayBuffer}; +use js_sys::{Function, Promise}; +#[cfg(target_feature = "atomics")] +use js_sys::{Atomics, Int32Array, SharedArrayBuffer, WebAssembly}; use wasm_bindgen::prelude::*; +#[cfg(target_feature = "atomics")] +use wasm_bindgen::JsCast; +#[cfg(target_feature = "atomics")] mod polyfill; +macro_rules! console_log { + ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(js_namespace = console)] + fn log(s: &str); +} + /// A Rust `Future` backed by a JavaScript `Promise`. /// /// This type is constructed with a JavaScript `Promise` object and translates @@ -255,7 +273,8 @@ fn _future_to_promise(future: Box>) resolve, reject, notified: Cell::new(State::Notified), - waker: Arc::new(Waker::new(SharedArrayBuffer::new(4), false)), + #[cfg(target_feature = "atomics")] + waker: Arc::new(Waker::new(vec![0; 4], false)), })); }); @@ -275,6 +294,7 @@ fn _future_to_promise(future: Box>) resolve: Function, reject: Function, + #[cfg(target_feature = "atomics")] // Struct to wake a future waker: Arc, } @@ -307,38 +327,59 @@ fn _future_to_promise(future: Box>) Waiting(Arc), } + #[cfg(target_feature = "atomics")] struct Waker { - buffer: SharedArrayBuffer, + array: Vec, notified: AtomicBool, }; + #[cfg(target_feature = "atomics")] impl Waker { - fn new(buffer: SharedArrayBuffer, notified: bool) -> Self { - Self { - buffer, + fn new(array: Vec, notified: bool) -> Self { + Waker { + array, notified: AtomicBool::new(notified), } } } + #[cfg(target_feature = "atomics")] impl Notify for Waker { fn notify(&self, id: usize) { + console_log!("Waker notify"); if !self.notified.swap(true, Ordering::SeqCst) { - let array = Int32Array::new(&self.buffer); + console_log!("Waker, inside if"); + let memory_buffer = wasm_bindgen::memory() + .dyn_into::() + .expect("Should cast a memory to WebAssembly::Memory") + .buffer(); + + let array_location = self.array.as_ptr() as u32 / 4; + let array = Int32Array::new(&memory_buffer) + .subarray(array_location, array_location + self.array.len() as u32); + let _ = Atomics::notify(&array, id as u32); } } } + #[cfg(target_feature = "atomics")] fn poll_again(package: Arc, id: usize) { + console_log!("poll_again called"); let me = match package.notified.replace(State::Notified) { // we need to schedule polling to resume, so keep going - State::Waiting(me) => me, + State::Waiting(me) => { + console_log!("poll_again Waiting"); + me + }, // we were already notified, and were just notified again; // having now coalesced the notifications we return as it's // still someone else's job to process this - State::Notified => return, + State::Notified => { + console_log!("poll_again Notified"); + return; + }, // the future was previously being polled, and we've just // switched it to the "you're notified" state. We don't have @@ -347,9 +388,21 @@ fn _future_to_promise(future: Box>) // continue polling. For us, though, there's nothing else to do, // so we bail out. // later see - State::Polling => return, + State::Polling => { + console_log!("poll_again Polling"); + return; + }, }; + let memory_buffer = wasm_bindgen::memory() + .dyn_into::() + .expect("Should cast a memory to WebAssembly::Memory") + .buffer(); + + let array_location = package.waker.array.as_ptr() as u32 / 4; + let array = Int32Array::new(&memory_buffer) + .subarray(array_location, array_location + package.waker.array.len() as u32); + // Use `Promise.then` on a resolved promise to place our execution // onto the next turn of the microtask queue, enqueueing our poll // operation. We don't currently poll immediately as it turns out @@ -358,7 +411,7 @@ fn _future_to_promise(future: Box>) // // Note that the `Rc`/`RefCell` trick here is basically to just // ensure that our `Closure` gets cleaned up appropriately. - let promise = polyfill::wait_async(Int32Array::new(&package.waker.buffer), id as u32, 0) + let promise = polyfill::wait_async(array, id as u32, 0) .expect("Should create a Promise"); let slot = Rc::new(RefCell::new(None)); let slot2 = slot.clone(); @@ -366,11 +419,18 @@ fn _future_to_promise(future: Box>) let myself = slot2.borrow_mut().take(); debug_assert!(myself.is_some()); Package::poll(&me); - }) as Box); + }) as Box); promise.then(&closure); *slot.borrow_mut() = Some(closure); } + // No shared memory right now, wasm is single threaded, no need to worry + // about this! + #[cfg(not(target_feature = "atomics"))] + unsafe impl Send for Package {} + #[cfg(not(target_feature = "atomics"))] + unsafe impl Sync for Package {} + impl Package { // Move the future contained in `me` as far forward as we can. This will // do as much synchronous work as possible to complete the future, @@ -386,7 +446,9 @@ fn _future_to_promise(future: Box>) match me.notified.replace(State::Polling) { // We received a notification while previously polling, or // this is the initial poll. We've got work to do below! - State::Notified => {} + State::Notified => { + console_log!("Package::poll Notified"); + } // We've gone through this loop once and no notification was // received while we were executing work. That means we got @@ -396,15 +458,31 @@ fn _future_to_promise(future: Box>) // When the notification comes in it'll notify our task, see // our `Waiting` state, and resume the polling process State::Polling => { + console_log!("Package::poll Polling"); + me.notified.set(State::Waiting(me.clone())); + + #[cfg(target_feature = "atomics")] poll_again(me.clone(), 0); + break; } - State::Waiting(_) => panic!("shouldn't see waiting state!"), + State::Waiting(_) => { + console_log!("Package::poll Waiting"); + + panic!("shouldn't see waiting state!") + }, } - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(&me.waker, 0) { + + #[cfg(target_feature = "atomics")] + let waker = &me.waker; + + #[cfg(not(target_feature = "atomics"))] + let waker = me; + + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(waker, 0) { // If the future is ready, immediately call the // resolve/reject callback and then return as we're done. Ok(Async::Ready(value)) => (value, &me.resolve), @@ -420,6 +498,50 @@ fn _future_to_promise(future: Box>) } } } + + #[cfg(not(target_feature = "atomics"))] + impl Notify for Package { + fn notify(&self, _id: usize) { + console_log!("Package::notify Waiting"); + let me = match self.notified.replace(State::Notified) { + // we need to schedule polling to resume, so keep going + State::Waiting(me) => me, + + // we were already notified, and were just notified again; + // having now coalesced the notifications we return as it's + // still someone else's job to process this + State::Notified => return, + + // the future was previously being polled, and we've just + // switched it to the "you're notified" state. We don't have + // access to the future as it's being polled, so the future + // polling process later sees this notification and will + // continue polling. For us, though, there's nothing else to do, + // so we bail out. + // later see + State::Polling => return, + }; + + // Use `Promise.then` on a resolved promise to place our execution + // onto the next turn of the microtask queue, enqueueing our poll + // operation. We don't currently poll immediately as it turns out + // `futures` crate adapters aren't compatible with it and it also + // helps avoid blowing the stack by accident. + // + // Note that the `Rc`/`RefCell` trick here is basically to just + // ensure that our `Closure` gets cleaned up appropriately. + let promise = Promise::resolve(&JsValue::undefined()); + let slot = Rc::new(RefCell::new(None)); + let slot2 = slot.clone(); + let closure = Closure::wrap(Box::new(move |_| { + let myself = slot2.borrow_mut().take(); + debug_assert!(myself.is_some()); + Package::poll(&me); + }) as Box); + promise.then(&closure); + *slot.borrow_mut() = Some(closure); + } + } } /// Converts a Rust `Future` on a local task queue. diff --git a/crates/futures/src/polyfill.rs b/crates/futures/src/polyfill.rs index 2899fe33..3fe46c7f 100644 --- a/crates/futures/src/polyfill.rs +++ b/crates/futures/src/polyfill.rs @@ -47,12 +47,23 @@ use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{MessageEvent, Worker}; +macro_rules! console_log { + ($($t:tt)*) => (log(&format_args!($($t)*).to_string())) +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(js_namespace = console)] + fn log(s: &str); +} + const HELPER_CODE: &'static str = " onmessage = function (ev) { try { switch (ev.data[0]) { case 'wait': { let [_, ia, index, value, timeout] = ev.data; + console.log('wait event inside a worker'); let result = Atomics.wait(ia, index, value, timeout); postMessage(['ok', result]); break; @@ -115,16 +126,20 @@ pub fn wait_async_with_timeout( timeout: f64, ) -> Result { if !indexed_array.buffer().has_type::() { + console_log!("polyfill, not a SharedArrayBuffer"); return Err(Error::new("Indexed array must be created from SharedArrayBuffer").into()); } // Optimization, avoid the helper thread in this common case. if Atomics::load(&indexed_array, index)? != value { + console_log!("polyfill, not-equal"); return Ok(Promise::resolve(&JsString::from("not-equal"))); } // General case, we must wait. + console_log!("polyfill, general case"); + Ok(Promise::new( &mut Box::new(move |resolve: Function, reject: Function| { let helper = alloc_helper(); @@ -161,6 +176,8 @@ pub fn wait_async_with_timeout( .borrow() .set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); + onmessage_callback.forget(); + // It's possible to do better here if the ia is already known to the // helper. In that case we can communicate the other data through // shared memory and wake the agent. And it is possible to make ia diff --git a/crates/futures/tests/tests.rs b/crates/futures/tests/tests.rs index 07a3a04a..3fa5cda4 100755 --- a/crates/futures/tests/tests.rs +++ b/crates/futures/tests/tests.rs @@ -6,6 +6,8 @@ extern crate wasm_bindgen; extern crate wasm_bindgen_futures; extern crate wasm_bindgen_test; +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); + use futures::unsync::oneshot; use futures::Future; use wasm_bindgen::prelude::*;