diff --git a/crates/futures/src/futures_0_3.rs b/crates/futures/src/futures_0_3.rs index 9aaeaf7f..508eb9f1 100644 --- a/crates/futures/src/futures_0_3.rs +++ b/crates/futures/src/futures_0_3.rs @@ -3,6 +3,7 @@ use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::pin::Pin; +use std::rc::Rc; use std::sync::Arc; use std::task::{Context, Poll}; @@ -24,10 +25,7 @@ use wasm_bindgen::prelude::*; /// /// Currently this type is constructed with `JsFuture::from`. pub struct JsFuture { - resolved: oneshot::Receiver, - rejected: oneshot::Receiver, - _cb_resolve: Closure, - _cb_reject: Closure, + rx: oneshot::Receiver>, } impl fmt::Debug for JsFuture { @@ -38,31 +36,37 @@ impl fmt::Debug for JsFuture { impl From for JsFuture { fn from(js: Promise) -> JsFuture { - // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. These two callbacks - // will be connected to oneshot channels which feed back into our - // future. - // - // This may not be the speediest option today but it should work! - let (tx1, rx1) = oneshot::channel(); + // See comments in `src/lib.rs` for why we're using one self-contained + // callback here. + let (tx, rx) = oneshot::channel(); + let state = Rc::new(RefCell::new(None)); + let state2 = state.clone(); + let resolve = Closure::once(move |val| finish(&state2, Ok(val))); + let state2 = state.clone(); + let reject = Closure::once(move |val| finish(&state2, Err(val))); - let cb_resolve = Closure::once(move |val| { - tx1.send(val).unwrap_throw(); - }); + js.then2(&resolve, &reject); + *state.borrow_mut() = Some((tx, resolve, reject)); - let (tx2, rx2) = oneshot::channel(); + return JsFuture { rx }; - let cb_reject = Closure::once(move |val| { - tx2.send(val).unwrap_throw(); - }); - - js.then2(&cb_resolve, &cb_reject); - - JsFuture { - resolved: rx1, - rejected: rx2, - _cb_resolve: cb_resolve, - _cb_reject: cb_reject, + fn finish( + state: &RefCell< + Option<( + oneshot::Sender>, + Closure, + Closure, + )>, + >, + val: Result, + ) { + match state.borrow_mut().take() { + // We don't have any guarantee that anyone's still listening at this + // point (the Rust `JsFuture` could have been dropped) so simply + // ignore any errors here. + Some((tx, _, _)) => drop(tx.send(val)), + None => wasm_bindgen::throw_str("cannot finish twice"), + } } } } @@ -71,16 +75,11 @@ impl Future for JsFuture { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - // Test if either our resolved or rejected side is finished yet. - if let Poll::Ready(val) = self.resolved.poll_unpin(cx) { - return Poll::Ready(Ok(val.unwrap_throw())); + match self.rx.poll_unpin(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(val)) => Poll::Ready(val), + Poll::Ready(Err(_)) => wasm_bindgen::throw_str("cannot cancel"), } - - if let Poll::Ready(val) = self.rejected.poll_unpin(cx) { - return Poll::Ready(Err(val.unwrap_throw())); - } - - Poll::Pending } } diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 3cf9b9c0..e17eaaee 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -128,9 +128,7 @@ use wasm_bindgen::prelude::*; /// /// Currently this type is constructed with `JsFuture::from`. pub struct JsFuture { - resolved: oneshot::Receiver, - rejected: oneshot::Receiver, - callbacks: Option<(Closure, Closure)>, + rx: oneshot::Receiver>, } impl fmt::Debug for JsFuture { @@ -142,28 +140,49 @@ impl fmt::Debug for JsFuture { impl From for JsFuture { fn from(js: Promise) -> JsFuture { // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. These two callbacks - // will be connected to oneshot channels which feed back into our - // future. + // resolved value and one for the rejected value. We're currently + // assuming that JS engines will unconditionally invoke precisely one of + // these callbacks, no matter what. // - // This may not be the speediest option today but it should work! - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - let mut tx1 = Some(tx1); - let resolve = Closure::wrap(Box::new(move |val| { - drop(tx1.take().unwrap().send(val)); - }) as Box); - let mut tx2 = Some(tx2); - let reject = Closure::wrap(Box::new(move |val| { - drop(tx2.take().unwrap().send(val)); - }) as Box); + // Ideally we'd have a way to cancel the callbacks getting invoked and + // free up state ourselves when this `JsFuture` is dropped. We don't + // have that, though, and one of the callbacks is likely always going to + // be invoked. + // + // As a result we need to make sure that no matter when the callbacks + // are invoked they are valid to be called at any time, which means they + // have to be self-contained. Through the `Closure::once` and some + // `Rc`-trickery we can arrange for both instances of `Closure`, and the + // `Rc`, to all be destroyed once the first one is called. + let (tx, rx) = oneshot::channel(); + let state = Rc::new(RefCell::new(None)); + let state2 = state.clone(); + let resolve = Closure::once(move |val| finish(&state2, Ok(val))); + let state2 = state.clone(); + let reject = Closure::once(move |val| finish(&state2, Err(val))); js.then2(&resolve, &reject); + *state.borrow_mut() = Some((tx, resolve, reject)); - JsFuture { - resolved: rx1, - rejected: rx2, - callbacks: Some((resolve, reject)), + return JsFuture { rx }; + + fn finish( + state: &RefCell< + Option<( + oneshot::Sender>, + Closure, + Closure, + )>, + >, + val: Result, + ) { + match state.borrow_mut().take() { + // We don't have any guarantee that anyone's still listening at this + // point (the Rust `JsFuture` could have been dropped) so simply + // ignore any errors here. + Some((tx, _, _)) => drop(tx.send(val)), + None => wasm_bindgen::throw_str("cannot finish twice"), + } } } } @@ -173,19 +192,11 @@ impl Future for JsFuture { type Error = JsValue; fn poll(&mut self) -> Poll { - // Test if either our resolved or rejected side is finished yet. Note - // that they will return errors if they're disconnected which can't - // happen until we drop the `callbacks` field, which doesn't happen - // till we're done, so we dont need to handle that. - if let Ok(Async::Ready(val)) = self.resolved.poll() { - drop(self.callbacks.take()); - return Ok(val.into()); + match self.rx.poll() { + Ok(Async::Ready(val)) => val.map(Async::Ready), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => wasm_bindgen::throw_str("cannot cancel"), } - if let Ok(Async::Ready(val)) = self.rejected.poll() { - drop(self.callbacks.take()); - return Err(val); - } - Ok(Async::NotReady) } }