diff --git a/crates/futures/Cargo.toml b/crates/futures/Cargo.toml index 76aa100a..966dc5cb 100644 --- a/crates/futures/Cargo.toml +++ b/crates/futures/Cargo.toml @@ -14,6 +14,12 @@ edition = "2018" futures = "0.1.20" js-sys = { path = "../js-sys", version = '0.3.20' } wasm-bindgen = { path = "../..", version = '0.2.43' } +futures-util-preview = { version = "0.3.0-alpha.15", optional = true } +futures-channel-preview = { version = "0.3.0-alpha.15", optional = true } +lazy_static = { version = "1.3.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = { path = '../test', version = '0.2.43' } + +[features] +futures_0_3 = ["futures-util-preview", "futures-channel-preview", "lazy_static"] diff --git a/crates/futures/src/futures_0_3.rs b/crates/futures/src/futures_0_3.rs new file mode 100644 index 00000000..4c939158 --- /dev/null +++ b/crates/futures/src/futures_0_3.rs @@ -0,0 +1,270 @@ +use std::fmt; +use std::pin::Pin; +use std::cell::{Cell, RefCell}; +use std::sync::Arc; +use std::future::Future; +use std::task::{Poll, Context}; +use std::collections::VecDeque; + +use futures_util::task::ArcWake; +use futures_util::future::FutureExt; +use futures_channel::oneshot; + +use lazy_static::lazy_static; + +use js_sys::Promise; +use wasm_bindgen::prelude::*; + +/// A Rust `Future` backed by a JavaScript `Promise`. +/// +/// This type is constructed with a JavaScript `Promise` object and translates +/// it to a Rust `Future`. This type implements the `Future` trait from the +/// `futures` crate and will either succeed or fail depending on what happens +/// with the JavaScript `Promise`. +/// +/// Currently this type is constructed with `JsFuture::from`. +pub struct JsFuture { + resolved: oneshot::Receiver, + rejected: oneshot::Receiver, + _cb_resolve: Closure, + _cb_reject: Closure, +} + +impl fmt::Debug for JsFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JsFuture {{ ... }}") + } +} + +impl From for JsFuture { + fn from(js: Promise) -> JsFuture { + // Use the `then` method to schedule two callbacks, one for the + // resolved value and one for the rejected value. These two callbacks + // will be connected to oneshot channels which feed back into our + // future. + // + // This may not be the speediest option today but it should work! + let (tx1, rx1) = oneshot::channel(); + + let cb_resolve = Closure::once(move |val| { + tx1.send(val).unwrap_throw(); + }); + + let (tx2, rx2) = oneshot::channel(); + + let cb_reject = Closure::once(move |val| { + tx2.send(val).unwrap_throw(); + }); + + js.then2(&cb_resolve, &cb_reject); + + JsFuture { + resolved: rx1, + rejected: rx2, + _cb_resolve: cb_resolve, + _cb_reject: cb_reject, + } + } +} + +impl Future for JsFuture { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + // Test if either our resolved or rejected side is finished yet. + if let Poll::Ready(val) = self.resolved.poll_unpin(cx) { + return Poll::Ready(Ok(val.unwrap_throw())); + } + + if let Poll::Ready(val) = self.rejected.poll_unpin(cx) { + return Poll::Ready(Err(val.unwrap_throw())); + } + + Poll::Pending + } +} + +/// Converts a Rust `Future` into a JavaScript `Promise`. +/// +/// This function will take any future in Rust and schedule it to be executed, +/// returning a JavaScript `Promise` which can then be passed back to JavaScript +/// to get plumbed into the rest of a system. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. The +/// returned `Promise` will be resolved or rejected when the future completes, +/// depending on whether it finishes with `Ok` or `Err`. +/// +/// # Panics +/// +/// Note that in wasm panics are currently translated to aborts, but "abort" in +/// this case means that a JavaScript exception is thrown. The wasm module is +/// still usable (likely erroneously) after Rust panics. +/// +/// If the `future` provided panics then the returned `Promise` **will not +/// resolve**. Instead it will be a leaked promise. This is an unfortunate +/// limitation of wasm currently that's hoped to be fixed one day! +pub fn future_to_promise(future: F) -> Promise +where + F: Future> + 'static, +{ + let mut future = Some(future); + + Promise::new(&mut |resolve, reject| { + // TODO change Promise::new to be FnOnce + spawn_local(future.take().unwrap_throw().map(move |val| { + match val { + Ok(val) => { + resolve.call1(&JsValue::undefined(), &val).unwrap_throw(); + }, + Err(val) => { + reject.call1(&JsValue::undefined(), &val).unwrap_throw(); + }, + } + })); + }) +} + +/// Runs a Rust `Future` on a local task queue. +/// +/// The `future` provided must adhere to `'static` because it'll be scheduled +/// to run in the background and cannot contain any stack references. +/// +/// # Panics +/// +/// This function has the same panic behavior as `future_to_promise`. +pub fn spawn_local(future: F) +where + F: Future + 'static, +{ + struct Task { + // This is an Option so that the Future can be immediately dropped when it's finished + future: RefCell + 'static>>>>, + + // This is used to ensure that the Task will only be queued once + is_queued: Cell, + } + + impl Task { + #[inline] + fn new(future: F) -> Arc where F: Future + 'static { + Arc::new(Self { + future: RefCell::new(Some(Box::pin(future))), + is_queued: Cell::new(false), + }) + } + } + + impl ArcWake for Task { + fn wake_by_ref(arc_self: &Arc) { + // This ensures that it's only queued once + if arc_self.is_queued.replace(true) { + return; + } + + let mut lock = EXECUTOR.tasks.borrow_mut(); + + lock.push_back(arc_self.clone()); + + // The Task will be polled on the next microtask event tick + EXECUTOR.next_tick.schedule(); + } + } + + + struct NextTick { + is_spinning: Cell, + promise: Promise, + closure: Closure, + } + + impl NextTick { + #[inline] + fn new(mut f: F) -> Self where F: FnMut() + 'static { + Self { + is_spinning: Cell::new(false), + promise: Promise::resolve(&JsValue::null()), + closure: Closure::wrap(Box::new(move |_| { + f(); + })), + } + } + + fn schedule(&self) { + // This ensures that it's only scheduled once + if self.is_spinning.replace(true) { + return; + } + + // TODO avoid creating a new Promise + self.promise.then(&self.closure); + } + + fn done(&self) { + self.is_spinning.set(false); + } + } + + + struct Executor { + // This is a queue of Tasks which will be polled in order + tasks: RefCell>>, + + // This is used to ensure that Tasks are polled on the next microtask event tick + next_tick: NextTick, + } + + // TODO This is only safe because JS is currently single-threaded + unsafe impl Send for Executor {} + unsafe impl Sync for Executor {} + + lazy_static! { + static ref EXECUTOR: Executor = Executor { + tasks: RefCell::new(VecDeque::new()), + + // This closure will only be called on the next microtask event tick + next_tick: NextTick::new(|| { + let tasks = &EXECUTOR.tasks; + + loop { + let mut lock = tasks.borrow_mut(); + + match lock.pop_front() { + Some(task) => { + let mut future = task.future.borrow_mut(); + + let poll = { + // This will only panic if the Future wakes up the Waker after returning Poll::Ready + let mut future = future.as_mut().unwrap_throw(); + + // Clear `is_queued` flag so that it will re-queue if poll calls waker.wake() + task.is_queued.set(false); + + // This is necessary because the polled task might queue more tasks + drop(lock); + + // TODO is there some way of saving these so they don't need to be recreated all the time ? + let waker = ArcWake::into_waker(task.clone()); + let cx = &mut Context::from_waker(&waker); + Pin::new(&mut future).poll(cx) + }; + + if let Poll::Ready(_) = poll { + // Cleanup the Future immediately + *future = None; + } + }, + None => { + // All of the Tasks have been polled, so it's now possible to schedule the NextTick again + EXECUTOR.next_tick.done(); + break; + }, + } + } + }), + }; + } + + + ArcWake::wake_by_ref(&Task::new(future)); +} diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 62d5e88d..c5025e53 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -103,6 +103,10 @@ #![deny(missing_docs)] +#[cfg(feature = "futures_0_3")] +/// Contains a Futures 0.3 implementation of this crate. +pub mod futures_0_3; + use std::cell::{Cell, RefCell}; use std::fmt; use std::rc::Rc;