diff --git a/crates/futures/src/futures_0_3.rs b/crates/futures/src/futures_0_3.rs index d0e7a711..4c939158 100644 --- a/crates/futures/src/futures_0_3.rs +++ b/crates/futures/src/futures_0_3.rs @@ -140,6 +140,8 @@ where struct Task { // This is an Option so that the Future can be immediately dropped when it's finished future: RefCell + 'static>>>>, + + // This is used to ensure that the Task will only be queued once is_queued: Cell, } @@ -155,6 +157,7 @@ where impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc) { + // This ensures that it's only queued once if arc_self.is_queued.replace(true) { return; } @@ -163,6 +166,7 @@ where lock.push_back(arc_self.clone()); + // The Task will be polled on the next microtask event tick EXECUTOR.next_tick.schedule(); } } @@ -175,6 +179,7 @@ where } impl NextTick { + #[inline] fn new(mut f: F) -> Self where F: FnMut() + 'static { Self { is_spinning: Cell::new(false), @@ -186,6 +191,7 @@ where } fn schedule(&self) { + // This ensures that it's only scheduled once if self.is_spinning.replace(true) { return; } @@ -201,17 +207,22 @@ where struct Executor { + // This is a queue of Tasks which will be polled in order tasks: RefCell>>, + + // This is used to ensure that Tasks are polled on the next microtask event tick next_tick: NextTick, } - // This is only safe because JS is currently single-threaded + // TODO This is only safe because JS is currently single-threaded unsafe impl Send for Executor {} unsafe impl Sync for Executor {} lazy_static! { static ref EXECUTOR: Executor = Executor { tasks: RefCell::new(VecDeque::new()), + + // This closure will only be called on the next microtask event tick next_tick: NextTick::new(|| { let tasks = &EXECUTOR.tasks; @@ -220,17 +231,18 @@ where match lock.pop_front() { Some(task) => { - // This is necessary because the polled task might queue more tasks - drop(lock); - let mut future = task.future.borrow_mut(); let poll = { + // This will only panic if the Future wakes up the Waker after returning Poll::Ready let mut future = future.as_mut().unwrap_throw(); // Clear `is_queued` flag so that it will re-queue if poll calls waker.wake() task.is_queued.set(false); + // This is necessary because the polled task might queue more tasks + drop(lock); + // TODO is there some way of saving these so they don't need to be recreated all the time ? let waker = ArcWake::into_waker(task.clone()); let cx = &mut Context::from_waker(&waker); @@ -238,10 +250,12 @@ where }; if let Poll::Ready(_) = poll { + // Cleanup the Future immediately *future = None; } }, None => { + // All of the Tasks have been polled, so it's now possible to schedule the NextTick again EXECUTOR.next_tick.done(); break; },