diff --git a/futures-mutex/Cargo.toml b/futures-mutex/Cargo.toml index 6ba96685..823dc068 100644 --- a/futures-mutex/Cargo.toml +++ b/futures-mutex/Cargo.toml @@ -12,4 +12,4 @@ categories = ["asynchronous", "concurrency"] [dependencies] futures = "0.1.14" -crossbeam = "0.2.10" +parking_lot = "0.5.3" diff --git a/futures-mutex/src/lib.rs b/futures-mutex/src/lib.rs index e146e4a3..18cba0e9 100644 --- a/futures-mutex/src/lib.rs +++ b/futures-mutex/src/lib.rs @@ -5,33 +5,21 @@ //! extern crate futures; -extern crate crossbeam; +extern crate parking_lot; use std::ops::{Deref, DerefMut}; use std::mem; use std::sync::Arc; -use std::sync::atomic::{Ordering, AtomicBool}; -use std::cell::UnsafeCell; -use crossbeam::sync::MsQueue; use futures::task::{current, Task}; use futures::{Future, Poll, Async}; +use parking_lot::{Mutex as RegularMutex, MutexGuard as RegularMutexGuard}; #[derive(Debug)] struct Inner { - wait_queue: MsQueue, - locked: AtomicBool, - data: UnsafeCell + data: RegularMutex, + wait_queue: RegularMutex>, } -impl Drop for Inner { - fn drop(&mut self) { - assert!(!self.locked.load(Ordering::SeqCst)) - } -} - -unsafe impl Send for Inner {} -unsafe impl Sync for Inner {} - /// A Mutex designed for use inside Futures. Works like `BiLock` from the `futures` crate, but /// with more than 2 handles. /// @@ -43,16 +31,15 @@ unsafe impl Sync for Inner {} /// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.* #[derive(Debug)] pub struct Mutex { - inner: Arc> + inner: Arc>, } impl Mutex { /// Create a new Mutex wrapping around a value `t` pub fn new(t: T) -> Mutex { let inner = Arc::new(Inner { - wait_queue: MsQueue::new(), - locked: AtomicBool::new(false), - data: UnsafeCell::new(t) + wait_queue: RegularMutex::new(Vec::new()), + data: RegularMutex::new(t), }); Mutex { @@ -73,11 +60,18 @@ impl Mutex { /// /// This function will panic if called outside the context of a future's task. pub fn poll_lock(&self) -> Async> { - if self.inner.locked.compare_and_swap(false, true, Ordering::SeqCst) { - self.inner.wait_queue.push(current()); - Async::NotReady - } else { - Async::Ready(MutexGuard{ inner: self }) + let mut ext_lock = self.inner.wait_queue.lock(); + match self.inner.data.try_lock() { + Some(guard) => { + Async::Ready(MutexGuard { + inner: &self.inner, + guard: Some(guard), + }) + }, + None => { + ext_lock.push(current()); + Async::NotReady + }, } } @@ -91,22 +85,6 @@ impl Mutex { inner: self } } - - /// We unlock the mutex and wait for someone to lock. We try and unpark as many tasks as we - /// can to prevents dead tasks from deadlocking the mutex, or tasks that have finished their - /// critical section and were awakened. - fn unlock(&self) { - if !self.inner.locked.swap(false, Ordering::SeqCst) { - panic!("Tried to unlock an already unlocked Mutex, something has gone terribly wrong"); - } - - while !self.inner.locked.load(Ordering::SeqCst) { - match self.inner.wait_queue.try_pop() { - Some(task) => task.notify(), - None => return - } - } - } } impl Clone for Mutex { @@ -122,27 +100,34 @@ impl Clone for Mutex { /// This structure acts as a sentinel to the data in the `Mutex` itself, /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be /// unlocked. -#[derive(Debug)] +// TODO: implement Debug pub struct MutexGuard<'a, T: 'a> { - inner: &'a Mutex + inner: &'a Inner, + guard: Option>, } impl<'a, T> Deref for MutexGuard<'a, T> { type Target = T; + #[inline] fn deref(&self) -> &Self::Target { - unsafe { &*self.inner.inner.data.get() } + self.guard.as_ref().expect("mutex wasn't locked").deref() } } impl<'a, T> DerefMut for MutexGuard<'a, T> { + #[inline] fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.inner.inner.data.get() } + self.guard.as_mut().expect("mutex wasn't locked").deref_mut() } } impl<'a, T> Drop for MutexGuard<'a, T> { fn drop(&mut self) { - self.inner.unlock(); + let mut wait_queue_lock = self.inner.wait_queue.lock(); + let _ = self.guard.take().expect("mutex was unlocked"); + for task in wait_queue_lock.drain(..) { + task.notify(); + } } }