Update parallel raytrace example to use futures

Use the atomics support now implemented!
This commit is contained in:
Alex Crichton 2019-07-18 10:13:34 -07:00
parent d122bbca13
commit 9f77f8dd00
2 changed files with 27 additions and 104 deletions

View File

@ -1,3 +1,4 @@
use futures::sync::oneshot;
use futures::Future; use futures::Future;
use js_sys::{Promise, Uint8ClampedArray, WebAssembly}; use js_sys::{Promise, Uint8ClampedArray, WebAssembly};
use rayon::prelude::*; use rayon::prelude::*;
@ -69,8 +70,8 @@ impl Scene {
// threads so we don't lock up the main thread, so we ship off a thread // threads so we don't lock up the main thread, so we ship off a thread
// which actually does the whole rayon business. When our returned // which actually does the whole rayon business. When our returned
// future is resolved we can pull out the final version of the image. // future is resolved we can pull out the final version of the image.
let done = pool let (tx, rx) = oneshot::channel();
.run_notify(move || { pool.run(move || {
thread_pool.install(|| { thread_pool.install(|| {
rgb_data rgb_data
.par_chunks_mut(4) .par_chunks_mut(4)
@ -87,9 +88,10 @@ impl Scene {
chunk[3] = result.data[3]; chunk[3] = result.data[3];
}); });
}); });
rgb_data drop(tx.send(rgb_data));
})? })?;
.map(move |_data| image_data(base, len, width, height).into()); let done = rx.map(move |_data| image_data(base, len, width, height).into())
.map_err(|_| JsValue::undefined());
Ok(RenderingScene { Ok(RenderingScene {
promise: wasm_bindgen_futures::future_to_promise(done), promise: wasm_bindgen_futures::future_to_promise(done),

View File

@ -1,13 +1,8 @@
//! A small module that's intended to provide an example of creating a pool of //! A small module that's intended to provide an example of creating a pool of
//! web workers which can be used to execute `rayon`-style work. //! web workers which can be used to execute `rayon`-style work.
use futures::sync::oneshot; use std::cell::RefCell;
use futures::Future;
use std::cell::{RefCell, UnsafeCell};
use std::mem;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::Arc;
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast; use wasm_bindgen::JsCast;
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent}; use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
@ -141,12 +136,11 @@ impl WorkerPool {
/// whatn it's done the worker is ready to execute more work. This method is /// whatn it's done the worker is ready to execute more work. This method is
/// used for all spawned workers to ensure that when the work is finished /// used for all spawned workers to ensure that when the work is finished
/// the worker is reclaimed back into this pool. /// the worker is reclaimed back into this pool.
fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) { fn reclaim_on_message(&self, worker: Worker) {
let state = Rc::downgrade(&self.state); let state = Rc::downgrade(&self.state);
let worker2 = worker.clone(); let worker2 = worker.clone();
let reclaim_slot = Rc::new(RefCell::new(None)); let reclaim_slot = Rc::new(RefCell::new(None));
let slot2 = reclaim_slot.clone(); let slot2 = reclaim_slot.clone();
let mut on_finish = Some(on_finish);
let reclaim = Closure::wrap(Box::new(move |event: Event| { let reclaim = Closure::wrap(Box::new(move |event: Event| {
if let Some(error) = event.dyn_ref::<ErrorEvent>() { if let Some(error) = event.dyn_ref::<ErrorEvent>() {
console_log!("error in worker: {}", error.message()); console_log!("error in worker: {}", error.message());
@ -155,11 +149,9 @@ impl WorkerPool {
return; return;
} }
// If this is a completion event then we can execute our `on_finish` // If this is a completion event then can deallocate our own
// callback and we can also deallocate our own callback by clearing // callback by clearing out `slot2` which contains our own closure.
// out `slot2` which contains our own closure.
if let Some(_msg) = event.dyn_ref::<MessageEvent>() { if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
on_finish.take().unwrap()();
if let Some(state) = state.upgrade() { if let Some(state) = state.upgrade() {
state.push(worker2.clone()); state.push(worker2.clone());
} }
@ -193,80 +185,9 @@ impl WorkerPool {
/// a web worker, that error is returned. /// a web worker, that error is returned.
pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> { pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> {
let worker = self.execute(f)?; let worker = self.execute(f)?;
self.reclaim_on_message(worker, || {}); self.reclaim_on_message(worker);
Ok(()) Ok(())
} }
/// Executes the closure `f` in a web worker, returning a future of the
/// value that `f` produces.
///
/// This method is the same as `run` execept that it allows recovering the
/// return value of the closure `f` in a nonblocking fashion with the future
/// returned.
///
/// # Errors
///
/// If an error happens while spawning a web worker or sending a message to
/// a web worker, that error is returned.
pub fn run_notify<T>(
&self,
f: impl FnOnce() -> T + Send + 'static,
) -> Result<impl Future<Item = T, Error = JsValue> + 'static, JsValue>
where
T: Send + 'static,
{
// FIXME(#1379) we should just use the `oneshot` directly as the future,
// but we have to use JS callbacks to ensure we don't have futures cross
// threads as that's currently not safe to do so.
let (tx, rx) = oneshot::channel();
let storage = Arc::new(AtomicValue::new(None));
let storage2 = storage.clone();
let worker = self.execute(move || {
assert!(storage2.replace(Some(f())).is_ok());
})?;
self.reclaim_on_message(worker, move || match storage.replace(None) {
Ok(Some(val)) => drop(tx.send(val)),
_ => unreachable!(),
});
Ok(rx.map_err(|_| JsValue::undefined()))
}
}
/// A small helper struct representing atomic access to an internal value `T`
///
/// This struct only supports one API, `replace`, which will either succeed and
/// replace the internal value with another (returning the previous one), or it
/// will fail returning the value passed in. Failure happens when two threads
/// try to `replace` at the same time.
///
/// This is only really intended to help safely transfer information between
/// threads, it doesn't provide any synchronization capabilities itself other
/// than a guaranteed safe API.
struct AtomicValue<T> {
modifying: AtomicBool,
slot: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for AtomicValue<T> {}
unsafe impl<T: Send> Sync for AtomicValue<T> {}
impl<T> AtomicValue<T> {
fn new(val: T) -> AtomicValue<T> {
AtomicValue {
modifying: AtomicBool::new(false),
slot: UnsafeCell::new(val),
}
}
fn replace(&self, val: T) -> Result<T, T> {
if self.modifying.swap(true, SeqCst) {
return Err(val);
}
let ret = unsafe { mem::replace(&mut *self.slot.get(), val) };
self.modifying.store(false, SeqCst);
Ok(ret)
}
} }
impl PoolState { impl PoolState {