diff --git a/crates/futures/Cargo.toml b/crates/futures/Cargo.toml index 76aa100a..0b53f0ab 100644 --- a/crates/futures/Cargo.toml +++ b/crates/futures/Cargo.toml @@ -14,6 +14,12 @@ edition = "2018" futures = "0.1.20" js-sys = { path = "../js-sys", version = '0.3.20' } wasm-bindgen = { path = "../..", version = '0.2.43' } +futures-util-preview = { version = "0.3.0-alpha.15", optional = true } +futures-channel-preview = { version = "0.3.0-alpha.15", optional = true } +lazy_static = { version = "1.3.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = { path = '../test', version = '0.2.43' } + +[features] +nightly = ["futures-util-preview", "futures-channel-preview", "lazy_static"] diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 62d5e88d..32a9f9d8 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -103,300 +103,14 @@ #![deny(missing_docs)] -use std::cell::{Cell, RefCell}; -use std::fmt; -use std::rc::Rc; -use std::sync::Arc; +#[cfg(feature = "nightly")] +mod nightly; -use futures::executor::{self, Notify, Spawn}; -use futures::future; -use futures::prelude::*; -use futures::sync::oneshot; -use js_sys::{Function, Promise}; -use wasm_bindgen::prelude::*; +#[cfg(feature = "nightly")] +pub use nightly::*; -/// A Rust `Future` backed by a JavaScript `Promise`. -/// -/// This type is constructed with a JavaScript `Promise` object and translates -/// it to a Rust `Future`. This type implements the `Future` trait from the -/// `futures` crate and will either succeed or fail depending on what happens -/// with the JavaScript `Promise`. -/// -/// Currently this type is constructed with `JsFuture::from`. -pub struct JsFuture { - resolved: oneshot::Receiver, - rejected: oneshot::Receiver, - callbacks: Option<(Closure, Closure)>, -} +#[cfg(not(feature = "nightly"))] +mod stable; -impl fmt::Debug for JsFuture { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "JsFuture {{ ... }}") - } -} - -impl From for JsFuture { - fn from(js: Promise) -> JsFuture { - // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. These two callbacks - // will be connected to oneshot channels which feed back into our - // future. - // - // This may not be the speediest option today but it should work! - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - let mut tx1 = Some(tx1); - let resolve = Closure::wrap(Box::new(move |val| { - drop(tx1.take().unwrap().send(val)); - }) as Box); - let mut tx2 = Some(tx2); - let reject = Closure::wrap(Box::new(move |val| { - drop(tx2.take().unwrap().send(val)); - }) as Box); - - js.then2(&resolve, &reject); - - JsFuture { - resolved: rx1, - rejected: rx2, - callbacks: Some((resolve, reject)), - } - } -} - -impl Future for JsFuture { - type Item = JsValue; - type Error = JsValue; - - fn poll(&mut self) -> Poll { - // Test if either our resolved or rejected side is finished yet. Note - // that they will return errors if they're disconnected which can't - // happen until we drop the `callbacks` field, which doesn't happen - // till we're done, so we dont need to handle that. - if let Ok(Async::Ready(val)) = self.resolved.poll() { - drop(self.callbacks.take()); - return Ok(val.into()); - } - if let Ok(Async::Ready(val)) = self.rejected.poll() { - drop(self.callbacks.take()); - return Err(val); - } - Ok(Async::NotReady) - } -} - -/// Converts a Rust `Future` into a JavaScript `Promise`. -/// -/// This function will take any future in Rust and schedule it to be executed, -/// returning a JavaScript `Promise` which can then be passed back to JavaScript -/// to get plumbed into the rest of a system. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. The -/// returned `Promise` will be resolved or rejected when the future completes, -/// depending on whether it finishes with `Ok` or `Err`. -/// -/// # Panics -/// -/// Note that in wasm panics are currently translated to aborts, but "abort" in -/// this case means that a JavaScript exception is thrown. The wasm module is -/// still usable (likely erroneously) after Rust panics. -/// -/// If the `future` provided panics then the returned `Promise` **will not -/// resolve**. Instead it will be a leaked promise. This is an unfortunate -/// limitation of wasm currently that's hoped to be fixed one day! -pub fn future_to_promise(future: F) -> Promise -where - F: Future + 'static, -{ - _future_to_promise(Box::new(future)) -} - -// Implementation of actually transforming a future into a JavaScript `Promise`. -// -// The only primitive we have to work with here is `Promise::new`, which gives -// us two callbacks that we can use to either reject or resolve the promise. -// It's our job to ensure that one of those callbacks is called at the -// appropriate time. -// -// Now we know that JavaScript (in general) can't block and is largely -// notification/callback driven. That means that our future must either have -// synchronous computational work to do, or it's "scheduled a notification" to -// happen. These notifications are likely callbacks to get executed when things -// finish (like a different promise or something like `setTimeout`). The general -// idea here is thus to do as much synchronous work as we can and then otherwise -// translate notifications of a future's task into "let's poll the future!" -// -// This isn't necessarily the greatest future executor in the world, but it -// should get the job done for now hopefully. -fn _future_to_promise(future: Box>) -> Promise { - let mut future = Some(executor::spawn(future)); - return Promise::new(&mut |resolve, reject| { - Package::poll(&Arc::new(Package { - spawn: RefCell::new(future.take().unwrap()), - resolve, - reject, - notified: Cell::new(State::Notified), - })); - }); - - struct Package { - // Our "spawned future". This'll have everything we need to poll the - // future and continue to move it forward. - spawn: RefCell>>>, - - // The current state of this future, expressed in an enum below. This - // indicates whether we're currently polling the future, received a - // notification and need to keep polling, or if we're waiting for a - // notification to come in (and no one is polling). - notified: Cell, - - // Our two callbacks connected to the `Promise` that we returned to - // JavaScript. We'll be invoking one of these at the end. - resolve: Function, - reject: Function, - } - - // The possible states our `Package` (future) can be in, tracked internally - // and used to guide what happens when polling a future. - enum State { - // This future is currently and actively being polled. Attempting to - // access the future will result in a runtime panic and is considered a - // bug. - Polling, - - // This future has been notified, while it was being polled. This marker - // is used in the `Notify` implementation below, and indicates that a - // notification was received that the future is ready to make progress. - // If seen, however, it probably means that the future is also currently - // being polled. - Notified, - - // The future is blocked, waiting for something to happen. Stored here - // is a self-reference to the future itself so we can pull it out in - // `Notify` and continue polling. - // - // Note that the self-reference here is an Arc-cycle that will leak - // memory unless the future completes, but currently that should be ok - // as we'll have to stick around anyway while the future is executing! - // - // This state is removed as soon as a notification comes in, so the leak - // should only be "temporary" - Waiting(Arc), - } - - // No shared memory right now, wasm is single threaded, no need to worry - // about this! - unsafe impl Send for Package {} - unsafe impl Sync for Package {} - - impl Package { - // Move the future contained in `me` as far forward as we can. This will - // do as much synchronous work as possible to complete the future, - // ensuring that when it blocks we're scheduled to get notified via some - // callback somewhere at some point (vague, right?) - // - // TODO: this probably shouldn't do as much synchronous work as possible - // as it can starve other computations. Rather it should instead - // yield every so often with something like `setTimeout` with the - // timeout set to zero. - fn poll(me: &Arc) { - loop { - match me.notified.replace(State::Polling) { - // We received a notification while previously polling, or - // this is the initial poll. We've got work to do below! - State::Notified => {} - - // We've gone through this loop once and no notification was - // received while we were executing work. That means we got - // `NotReady` below and we're scheduled to receive a - // notification. Block ourselves and wait for later. - // - // When the notification comes in it'll notify our task, see - // our `Waiting` state, and resume the polling process - State::Polling => { - me.notified.set(State::Waiting(me.clone())); - break; - } - - State::Waiting(_) => panic!("shouldn't see waiting state!"), - } - - let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { - // If the future is ready, immediately call the - // resolve/reject callback and then return as we're done. - Ok(Async::Ready(value)) => (value, &me.resolve), - Err(value) => (value, &me.reject), - - // Otherwise keep going in our loop, if we weren't notified - // we'll break out and start waiting. - Ok(Async::NotReady) => continue, - }; - - drop(f.call1(&JsValue::undefined(), &val)); - break; - } - } - } - - impl Notify for Package { - fn notify(&self, _id: usize) { - let me = match self.notified.replace(State::Notified) { - // we need to schedule polling to resume, so keep going - State::Waiting(me) => me, - - // we were already notified, and were just notified again; - // having now coalesced the notifications we return as it's - // still someone else's job to process this - State::Notified => return, - - // the future was previously being polled, and we've just - // switched it to the "you're notified" state. We don't have - // access to the future as it's being polled, so the future - // polling process later sees this notification and will - // continue polling. For us, though, there's nothing else to do, - // so we bail out. - // later see - State::Polling => return, - }; - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. We don't currently poll immediately as it turns out - // `futures` crate adapters aren't compatible with it and it also - // helps avoid blowing the stack by accident. - // - // Note that the `Rc`/`RefCell` trick here is basically to just - // ensure that our `Closure` gets cleaned up appropriately. - let promise = Promise::resolve(&JsValue::undefined()); - let slot = Rc::new(RefCell::new(None)); - let slot2 = slot.clone(); - let closure = Closure::wrap(Box::new(move |_| { - let myself = slot2.borrow_mut().take(); - debug_assert!(myself.is_some()); - Package::poll(&me); - }) as Box); - promise.then(&closure); - *slot.borrow_mut() = Some(closure); - } - } -} - -/// Converts a Rust `Future` on a local task queue. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. -/// -/// # Panics -/// -/// This function has the same panic behavior as `future_to_promise`. -pub fn spawn_local(future: F) -where - F: Future + 'static, -{ - future_to_promise( - future - .map(|()| JsValue::undefined()) - .or_else(|()| future::ok::(JsValue::undefined())), - ); -} +#[cfg(not(feature = "nightly"))] +pub use stable::*; diff --git a/crates/futures/src/nightly.rs b/crates/futures/src/nightly.rs new file mode 100644 index 00000000..1f340634 --- /dev/null +++ b/crates/futures/src/nightly.rs @@ -0,0 +1,252 @@ +use std::fmt; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::future::Future; +use std::task::{Poll, Context}; +use std::collections::VecDeque; + +use futures_util::task::ArcWake; +use futures_util::future::FutureExt; +use futures_channel::oneshot; + +use lazy_static::lazy_static; + +use js_sys::Promise; +use wasm_bindgen::prelude::*; + +/// A Rust `Future` backed by a JavaScript `Promise`. +/// +/// This type is constructed with a JavaScript `Promise` object and translates +/// it to a Rust `Future`. This type implements the `Future` trait from the +/// `futures` crate and will either succeed or fail depending on what happens +/// with the JavaScript `Promise`. +/// +/// Currently this type is constructed with `JsFuture::from`. +pub struct JsFuture { + resolved: oneshot::Receiver, + rejected: oneshot::Receiver, + _cb_resolve: Closure, + _cb_reject: Closure, +} + +impl fmt::Debug for JsFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JsFuture {{ ... }}") + } +} + +impl From for JsFuture { + fn from(js: Promise) -> JsFuture { + // Use the `then` method to schedule two callbacks, one for the + // resolved value and one for the rejected value. These two callbacks + // will be connected to oneshot channels which feed back into our + // future. + // + // This may not be the speediest option today but it should work! + let (tx1, rx1) = oneshot::channel(); + + let cb_resolve = Closure::once(move |val| { + tx1.send(val).unwrap_throw(); + }); + + let (tx2, rx2) = oneshot::channel(); + + let cb_reject = Closure::once(move |val| { + tx2.send(val).unwrap_throw(); + }); + + js.then2(&cb_resolve, &cb_reject); + + JsFuture { + resolved: rx1, + rejected: rx2, + _cb_resolve: cb_resolve, + _cb_reject: cb_reject, + } + } +} + +impl Future for JsFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + // Test if either our resolved or rejected side is finished yet. + if let Poll::Ready(val) = self.resolved.poll_unpin(cx) { + return Poll::Ready(Ok(val.unwrap_throw())); + } + + if let Poll::Ready(val) = self.rejected.poll_unpin(cx) { + return Poll::Ready(Err(val.unwrap_throw())); + } + + Poll::Pending + } +} + +/// Converts a Rust `Future` into a JavaScript `Promise`. +/// +/// This function will take any future in Rust and schedule it to be executed, +/// returning a JavaScript `Promise` which can then be passed back to JavaScript +/// to get plumbed into the rest of a system. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. The +/// returned `Promise` will be resolved or rejected when the future completes, +/// depending on whether it finishes with `Ok` or `Err`. +/// +/// # Panics +/// +/// Note that in wasm panics are currently translated to aborts, but "abort" in +/// this case means that a JavaScript exception is thrown. The wasm module is +/// still usable (likely erroneously) after Rust panics. +/// +/// If the `future` provided panics then the returned `Promise` **will not +/// resolve**. Instead it will be a leaked promise. This is an unfortunate +/// limitation of wasm currently that's hoped to be fixed one day! +pub fn future_to_promise(future: F) -> Promise +where + F: Future> + 'static, +{ + let mut future = Some(future); + + Promise::new(&mut |resolve, reject| { + // TODO change Promise::new to be FnOnce + spawn_local(future.take().unwrap_throw().map(move |val| { + match val { + Ok(val) => { + resolve.call1(&JsValue::undefined(), &val).unwrap_throw(); + }, + Err(val) => { + reject.call1(&JsValue::undefined(), &val).unwrap_throw(); + }, + } + })); + }) +} + +/// Runs a Rust `Future` on a local task queue. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. +/// +/// # Panics +/// +/// This function has the same panic behavior as `future_to_promise`. +pub fn spawn_local(future: F) +where + F: Future + 'static, +{ + struct Task { + future: Mutex + 'static>>>>, + is_queued: AtomicBool, + } + + impl Task { + #[inline] + fn new(future: F) -> Arc where F: Future + 'static { + Arc::new(Self { + future: Mutex::new(Some(Box::pin(future))), + is_queued: AtomicBool::new(false), + }) + } + } + + impl ArcWake for Task { + fn wake_by_ref(arc_self: &Arc) { + // TODO can this be more relaxed ? + if !arc_self.is_queued.swap(true, Ordering::SeqCst) { + let mut lock = EXECUTOR.tasks.lock().unwrap_throw(); + + lock.push_back(arc_self.clone()); + + EXECUTOR.next_tick.schedule(); + } + } + } + + + struct NextTick { + is_spinning: AtomicBool, + promise: Promise, + closure: Closure, + } + + impl NextTick { + fn new(mut f: F) -> Self where F: FnMut() + 'static { + Self { + is_spinning: AtomicBool::new(false), + promise: Promise::resolve(&JsValue::null()), + closure: Closure::wrap(Box::new(move |_| { + f(); + })), + } + } + + fn schedule(&self) { + // TODO can this be more relaxed ? + if !self.is_spinning.swap(true, Ordering::SeqCst) { + // TODO avoid creating a new Promise + self.promise.then(&self.closure); + } + } + + fn done(&self) { + // TODO can this be more relaxed ? + self.is_spinning.store(false, Ordering::SeqCst); + } + } + + + struct Executor { + tasks: Mutex>>, + next_tick: NextTick, + } + + // This is only safe because JS is currently single-threaded + unsafe impl Send for Executor {} + unsafe impl Sync for Executor {} + + lazy_static! { + static ref EXECUTOR: Executor = Executor { + tasks: Mutex::new(VecDeque::new()), + next_tick: NextTick::new(|| { + let tasks = &EXECUTOR.tasks; + + loop { + let mut lock = tasks.lock().unwrap_throw(); + + match lock.pop_front() { + Some(task) => { + // This is necessary because the polled task might queue more tasks + drop(lock); + + let mut future = task.future.lock().unwrap_throw(); + + let poll = future.as_mut().map(|mut future| { + // Clear `is_queued` flag so that it will re-queue if poll calls waker.wake() + task.is_queued.store(false, Ordering::SeqCst); + + // TODO is there some way of saving these so they don't need to be recreated all the time ? + let waker = ArcWake::into_waker(task.clone()); + let cx = &mut Context::from_waker(&waker); + Pin::new(&mut future).poll(cx) + }); + + if let Some(Poll::Ready(_)) = poll { + *future = None; + } + }, + None => { + EXECUTOR.next_tick.done(); + break; + }, + } + } + }), + }; + } + + + ArcWake::wake_by_ref(&Task::new(future)); +} diff --git a/crates/futures/src/stable.rs b/crates/futures/src/stable.rs new file mode 100644 index 00000000..7817718c --- /dev/null +++ b/crates/futures/src/stable.rs @@ -0,0 +1,297 @@ +use std::cell::{Cell, RefCell}; +use std::fmt; +use std::rc::Rc; +use std::sync::Arc; + +use futures::executor::{self, Notify, Spawn}; +use futures::future; +use futures::prelude::*; +use futures::sync::oneshot; +use js_sys::{Function, Promise}; +use wasm_bindgen::prelude::*; + +/// A Rust `Future` backed by a JavaScript `Promise`. +/// +/// This type is constructed with a JavaScript `Promise` object and translates +/// it to a Rust `Future`. This type implements the `Future` trait from the +/// `futures` crate and will either succeed or fail depending on what happens +/// with the JavaScript `Promise`. +/// +/// Currently this type is constructed with `JsFuture::from`. +pub struct JsFuture { + resolved: oneshot::Receiver, + rejected: oneshot::Receiver, + callbacks: Option<(Closure, Closure)>, +} + +impl fmt::Debug for JsFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JsFuture {{ ... }}") + } +} + +impl From for JsFuture { + fn from(js: Promise) -> JsFuture { + // Use the `then` method to schedule two callbacks, one for the + // resolved value and one for the rejected value. These two callbacks + // will be connected to oneshot channels which feed back into our + // future. + // + // This may not be the speediest option today but it should work! + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let mut tx1 = Some(tx1); + let resolve = Closure::wrap(Box::new(move |val| { + drop(tx1.take().unwrap().send(val)); + }) as Box); + let mut tx2 = Some(tx2); + let reject = Closure::wrap(Box::new(move |val| { + drop(tx2.take().unwrap().send(val)); + }) as Box); + + js.then2(&resolve, &reject); + + JsFuture { + resolved: rx1, + rejected: rx2, + callbacks: Some((resolve, reject)), + } + } +} + +impl Future for JsFuture { + type Item = JsValue; + type Error = JsValue; + + fn poll(&mut self) -> Poll { + // Test if either our resolved or rejected side is finished yet. Note + // that they will return errors if they're disconnected which can't + // happen until we drop the `callbacks` field, which doesn't happen + // till we're done, so we dont need to handle that. + if let Ok(Async::Ready(val)) = self.resolved.poll() { + drop(self.callbacks.take()); + return Ok(val.into()); + } + if let Ok(Async::Ready(val)) = self.rejected.poll() { + drop(self.callbacks.take()); + return Err(val); + } + Ok(Async::NotReady) + } +} + +/// Converts a Rust `Future` into a JavaScript `Promise`. +/// +/// This function will take any future in Rust and schedule it to be executed, +/// returning a JavaScript `Promise` which can then be passed back to JavaScript +/// to get plumbed into the rest of a system. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. The +/// returned `Promise` will be resolved or rejected when the future completes, +/// depending on whether it finishes with `Ok` or `Err`. +/// +/// # Panics +/// +/// Note that in wasm panics are currently translated to aborts, but "abort" in +/// this case means that a JavaScript exception is thrown. The wasm module is +/// still usable (likely erroneously) after Rust panics. +/// +/// If the `future` provided panics then the returned `Promise` **will not +/// resolve**. Instead it will be a leaked promise. This is an unfortunate +/// limitation of wasm currently that's hoped to be fixed one day! +pub fn future_to_promise(future: F) -> Promise +where + F: Future + 'static, +{ + _future_to_promise(Box::new(future)) +} + +// Implementation of actually transforming a future into a JavaScript `Promise`. +// +// The only primitive we have to work with here is `Promise::new`, which gives +// us two callbacks that we can use to either reject or resolve the promise. +// It's our job to ensure that one of those callbacks is called at the +// appropriate time. +// +// Now we know that JavaScript (in general) can't block and is largely +// notification/callback driven. That means that our future must either have +// synchronous computational work to do, or it's "scheduled a notification" to +// happen. These notifications are likely callbacks to get executed when things +// finish (like a different promise or something like `setTimeout`). The general +// idea here is thus to do as much synchronous work as we can and then otherwise +// translate notifications of a future's task into "let's poll the future!" +// +// This isn't necessarily the greatest future executor in the world, but it +// should get the job done for now hopefully. +fn _future_to_promise(future: Box>) -> Promise { + let mut future = Some(executor::spawn(future)); + return Promise::new(&mut |resolve, reject| { + Package::poll(&Arc::new(Package { + spawn: RefCell::new(future.take().unwrap()), + resolve, + reject, + notified: Cell::new(State::Notified), + })); + }); + + struct Package { + // Our "spawned future". This'll have everything we need to poll the + // future and continue to move it forward. + spawn: RefCell>>>, + + // The current state of this future, expressed in an enum below. This + // indicates whether we're currently polling the future, received a + // notification and need to keep polling, or if we're waiting for a + // notification to come in (and no one is polling). + notified: Cell, + + // Our two callbacks connected to the `Promise` that we returned to + // JavaScript. We'll be invoking one of these at the end. + resolve: Function, + reject: Function, + } + + // The possible states our `Package` (future) can be in, tracked internally + // and used to guide what happens when polling a future. + enum State { + // This future is currently and actively being polled. Attempting to + // access the future will result in a runtime panic and is considered a + // bug. + Polling, + + // This future has been notified, while it was being polled. This marker + // is used in the `Notify` implementation below, and indicates that a + // notification was received that the future is ready to make progress. + // If seen, however, it probably means that the future is also currently + // being polled. + Notified, + + // The future is blocked, waiting for something to happen. Stored here + // is a self-reference to the future itself so we can pull it out in + // `Notify` and continue polling. + // + // Note that the self-reference here is an Arc-cycle that will leak + // memory unless the future completes, but currently that should be ok + // as we'll have to stick around anyway while the future is executing! + // + // This state is removed as soon as a notification comes in, so the leak + // should only be "temporary" + Waiting(Arc), + } + + // No shared memory right now, wasm is single threaded, no need to worry + // about this! + unsafe impl Send for Package {} + unsafe impl Sync for Package {} + + impl Package { + // Move the future contained in `me` as far forward as we can. This will + // do as much synchronous work as possible to complete the future, + // ensuring that when it blocks we're scheduled to get notified via some + // callback somewhere at some point (vague, right?) + // + // TODO: this probably shouldn't do as much synchronous work as possible + // as it can starve other computations. Rather it should instead + // yield every so often with something like `setTimeout` with the + // timeout set to zero. + fn poll(me: &Arc) { + loop { + match me.notified.replace(State::Polling) { + // We received a notification while previously polling, or + // this is the initial poll. We've got work to do below! + State::Notified => {} + + // We've gone through this loop once and no notification was + // received while we were executing work. That means we got + // `NotReady` below and we're scheduled to receive a + // notification. Block ourselves and wait for later. + // + // When the notification comes in it'll notify our task, see + // our `Waiting` state, and resume the polling process + State::Polling => { + me.notified.set(State::Waiting(me.clone())); + break; + } + + State::Waiting(_) => panic!("shouldn't see waiting state!"), + } + + let (val, f) = match me.spawn.borrow_mut().poll_future_notify(me, 0) { + // If the future is ready, immediately call the + // resolve/reject callback and then return as we're done. + Ok(Async::Ready(value)) => (value, &me.resolve), + Err(value) => (value, &me.reject), + + // Otherwise keep going in our loop, if we weren't notified + // we'll break out and start waiting. + Ok(Async::NotReady) => continue, + }; + + drop(f.call1(&JsValue::undefined(), &val)); + break; + } + } + } + + impl Notify for Package { + fn notify(&self, _id: usize) { + let me = match self.notified.replace(State::Notified) { + // we need to schedule polling to resume, so keep going + State::Waiting(me) => me, + + // we were already notified, and were just notified again; + // having now coalesced the notifications we return as it's + // still someone else's job to process this + State::Notified => return, + + // the future was previously being polled, and we've just + // switched it to the "you're notified" state. We don't have + // access to the future as it's being polled, so the future + // polling process later sees this notification and will + // continue polling. For us, though, there's nothing else to do, + // so we bail out. + // later see + State::Polling => return, + }; + + // Use `Promise.then` on a resolved promise to place our execution + // onto the next turn of the microtask queue, enqueueing our poll + // operation. We don't currently poll immediately as it turns out + // `futures` crate adapters aren't compatible with it and it also + // helps avoid blowing the stack by accident. + // + // Note that the `Rc`/`RefCell` trick here is basically to just + // ensure that our `Closure` gets cleaned up appropriately. + let promise = Promise::resolve(&JsValue::undefined()); + let slot = Rc::new(RefCell::new(None)); + let slot2 = slot.clone(); + let closure = Closure::wrap(Box::new(move |_| { + let myself = slot2.borrow_mut().take(); + debug_assert!(myself.is_some()); + Package::poll(&me); + }) as Box); + promise.then(&closure); + *slot.borrow_mut() = Some(closure); + } + } +} + +/// Converts a Rust `Future` on a local task queue. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. +/// +/// # Panics +/// +/// This function has the same panic behavior as `future_to_promise`. +pub fn spawn_local(future: F) +where + F: Future + 'static, +{ + future_to_promise( + future + .map(|()| JsValue::undefined()) + .or_else(|()| future::ok::(JsValue::undefined())), + ); +}