From 9f77f8dd0057a13c0b4c811fcdba278634adcf55 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 18 Jul 2019 10:13:34 -0700 Subject: [PATCH] Update parallel raytrace example to use futures Use the atomics support now implemented! --- examples/raytrace-parallel/src/lib.rs | 42 ++++++------ examples/raytrace-parallel/src/pool.rs | 89 ++------------------------ 2 files changed, 27 insertions(+), 104 deletions(-) diff --git a/examples/raytrace-parallel/src/lib.rs b/examples/raytrace-parallel/src/lib.rs index 8f99ed92..cfc05d61 100644 --- a/examples/raytrace-parallel/src/lib.rs +++ b/examples/raytrace-parallel/src/lib.rs @@ -1,3 +1,4 @@ +use futures::sync::oneshot; use futures::Future; use js_sys::{Promise, Uint8ClampedArray, WebAssembly}; use rayon::prelude::*; @@ -69,27 +70,28 @@ impl Scene { // threads so we don't lock up the main thread, so we ship off a thread // which actually does the whole rayon business. When our returned // future is resolved we can pull out the final version of the image. - let done = pool - .run_notify(move || { - thread_pool.install(|| { - rgb_data - .par_chunks_mut(4) - .enumerate() - .for_each(|(i, chunk)| { - let i = i as u32; - let x = i % width; - let y = i / width; - let ray = raytracer::Ray::create_prime(x, y, &scene); - let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba(); - chunk[0] = result.data[0]; - chunk[1] = result.data[1]; - chunk[2] = result.data[2]; - chunk[3] = result.data[3]; - }); - }); + let (tx, rx) = oneshot::channel(); + pool.run(move || { + thread_pool.install(|| { rgb_data - })? - .map(move |_data| image_data(base, len, width, height).into()); + .par_chunks_mut(4) + .enumerate() + .for_each(|(i, chunk)| { + let i = i as u32; + let x = i % width; + let y = i / width; + let ray = raytracer::Ray::create_prime(x, y, &scene); + let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba(); + chunk[0] = result.data[0]; + chunk[1] = result.data[1]; + chunk[2] = result.data[2]; + chunk[3] = result.data[3]; + }); + }); + drop(tx.send(rgb_data)); + })?; + let done = rx.map(move |_data| image_data(base, len, width, height).into()) + .map_err(|_| JsValue::undefined()); Ok(RenderingScene { promise: wasm_bindgen_futures::future_to_promise(done), diff --git a/examples/raytrace-parallel/src/pool.rs b/examples/raytrace-parallel/src/pool.rs index 921d3c16..b5cd4dd7 100644 --- a/examples/raytrace-parallel/src/pool.rs +++ b/examples/raytrace-parallel/src/pool.rs @@ -1,13 +1,8 @@ //! A small module that's intended to provide an example of creating a pool of //! web workers which can be used to execute `rayon`-style work. -use futures::sync::oneshot; -use futures::Future; -use std::cell::{RefCell, UnsafeCell}; -use std::mem; +use std::cell::RefCell; use std::rc::Rc; -use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; -use std::sync::Arc; use wasm_bindgen::prelude::*; use wasm_bindgen::JsCast; use web_sys::{DedicatedWorkerGlobalScope, MessageEvent}; @@ -141,12 +136,11 @@ impl WorkerPool { /// whatn it's done the worker is ready to execute more work. This method is /// used for all spawned workers to ensure that when the work is finished /// the worker is reclaimed back into this pool. - fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) { + fn reclaim_on_message(&self, worker: Worker) { let state = Rc::downgrade(&self.state); let worker2 = worker.clone(); let reclaim_slot = Rc::new(RefCell::new(None)); let slot2 = reclaim_slot.clone(); - let mut on_finish = Some(on_finish); let reclaim = Closure::wrap(Box::new(move |event: Event| { if let Some(error) = event.dyn_ref::() { console_log!("error in worker: {}", error.message()); @@ -155,11 +149,9 @@ impl WorkerPool { return; } - // If this is a completion event then we can execute our `on_finish` - // callback and we can also deallocate our own callback by clearing - // out `slot2` which contains our own closure. + // If this is a completion event then can deallocate our own + // callback by clearing out `slot2` which contains our own closure. if let Some(_msg) = event.dyn_ref::() { - on_finish.take().unwrap()(); if let Some(state) = state.upgrade() { state.push(worker2.clone()); } @@ -193,80 +185,9 @@ impl WorkerPool { /// a web worker, that error is returned. pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> { let worker = self.execute(f)?; - self.reclaim_on_message(worker, || {}); + self.reclaim_on_message(worker); Ok(()) } - - /// Executes the closure `f` in a web worker, returning a future of the - /// value that `f` produces. - /// - /// This method is the same as `run` execept that it allows recovering the - /// return value of the closure `f` in a nonblocking fashion with the future - /// returned. - /// - /// # Errors - /// - /// If an error happens while spawning a web worker or sending a message to - /// a web worker, that error is returned. - pub fn run_notify( - &self, - f: impl FnOnce() -> T + Send + 'static, - ) -> Result + 'static, JsValue> - where - T: Send + 'static, - { - // FIXME(#1379) we should just use the `oneshot` directly as the future, - // but we have to use JS callbacks to ensure we don't have futures cross - // threads as that's currently not safe to do so. - let (tx, rx) = oneshot::channel(); - let storage = Arc::new(AtomicValue::new(None)); - let storage2 = storage.clone(); - let worker = self.execute(move || { - assert!(storage2.replace(Some(f())).is_ok()); - })?; - self.reclaim_on_message(worker, move || match storage.replace(None) { - Ok(Some(val)) => drop(tx.send(val)), - _ => unreachable!(), - }); - - Ok(rx.map_err(|_| JsValue::undefined())) - } -} - -/// A small helper struct representing atomic access to an internal value `T` -/// -/// This struct only supports one API, `replace`, which will either succeed and -/// replace the internal value with another (returning the previous one), or it -/// will fail returning the value passed in. Failure happens when two threads -/// try to `replace` at the same time. -/// -/// This is only really intended to help safely transfer information between -/// threads, it doesn't provide any synchronization capabilities itself other -/// than a guaranteed safe API. -struct AtomicValue { - modifying: AtomicBool, - slot: UnsafeCell, -} - -unsafe impl Send for AtomicValue {} -unsafe impl Sync for AtomicValue {} - -impl AtomicValue { - fn new(val: T) -> AtomicValue { - AtomicValue { - modifying: AtomicBool::new(false), - slot: UnsafeCell::new(val), - } - } - - fn replace(&self, val: T) -> Result { - if self.modifying.swap(true, SeqCst) { - return Err(val); - } - let ret = unsafe { mem::replace(&mut *self.slot.get(), val) }; - self.modifying.store(false, SeqCst); - Ok(ret) - } } impl PoolState {