diff --git a/futures-mutex/Cargo.toml b/futures-mutex/Cargo.toml index 6ba96685..823dc068 100644 --- a/futures-mutex/Cargo.toml +++ b/futures-mutex/Cargo.toml @@ -12,4 +12,4 @@ categories = ["asynchronous", "concurrency"] [dependencies] futures = "0.1.14" -crossbeam = "0.2.10" +parking_lot = "0.5.3" diff --git a/futures-mutex/src/lib.rs b/futures-mutex/src/lib.rs index 0aee079a..43c6e311 100644 --- a/futures-mutex/src/lib.rs +++ b/futures-mutex/src/lib.rs @@ -3,73 +3,14 @@ //! API is similar to [`futures::sync::BiLock`](https://docs.rs/futures/0.1.11/futures/sync/struct.BiLock.html) //! However, it can be cloned into as many handles as desired. //! -//! ``` -//! extern crate futures; -//! extern crate futures_mutex; -//! -//! use futures::{Future, Poll, Async}; -//! use futures_mutex::Mutex; -//! -//! struct AddTwo { -//! lock: Mutex -//! } -//! -//! impl Future for AddTwo { -//! type Item = usize; -//! type Error = (); -//! fn poll(&mut self) -> Poll { -//! match self.lock.poll_lock() { -//! Async::Ready(mut g) => { -//! *g += 2; -//! Ok(Async::Ready(*g)) -//! }, -//! Async::NotReady => Ok(Async::NotReady) -//! } -//! } -//! } -//! -//! fn main() { -//! let lock1: Mutex = Mutex::new(0); -//! let lock2 = lock1.clone(); -//! -//! let future = AddTwo { lock: lock2 }; -//! -//! // This future will return the current value and the recovered lock. -//! let used_lock = lock1.into_lock().map(|b| (*b, b.unlock())); -//! -//! let _ = future.join(used_lock).map(|(add_two, (value, _))| { -//! assert_eq!(add_two, value); -//! }).wait().unwrap(); -//! } -//! ``` extern crate futures; -extern crate crossbeam; +extern crate parking_lot; use std::ops::{Deref, DerefMut}; -use std::mem; -use std::sync::Arc; -use std::sync::atomic::{Ordering, AtomicBool}; -use std::cell::UnsafeCell; -use crossbeam::sync::MsQueue; -use futures::task::{current, Task}; +use futures::task::{self, Task}; use futures::{Future, Poll, Async}; - -#[derive(Debug)] -struct Inner { - wait_queue: MsQueue, - locked: AtomicBool, - data: UnsafeCell -} - -impl Drop for Inner { - fn drop(&mut self) { - assert!(!self.locked.load(Ordering::SeqCst)) - } -} - -unsafe impl Send for Inner {} -unsafe impl Sync for Inner {} +use parking_lot::{Mutex as RegularMutex, MutexGuard as RegularMutexGuard}; /// A Mutex designed for use inside Futures. Works like `BiLock` from the `futures` crate, but /// with more than 2 handles. @@ -82,20 +23,16 @@ unsafe impl Sync for Inner {} /// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.* #[derive(Debug)] pub struct Mutex { - inner: Arc> + data: RegularMutex, + wait_queue: RegularMutex>, } impl Mutex { /// Create a new Mutex wrapping around a value `t` pub fn new(t: T) -> Mutex { - let inner = Arc::new(Inner { - wait_queue: MsQueue::new(), - locked: AtomicBool::new(false), - data: UnsafeCell::new(t) - }); - Mutex { - inner: inner + wait_queue: RegularMutex::new(Vec::new()), + data: RegularMutex::new(t), } } @@ -112,11 +49,18 @@ impl Mutex { /// /// This function will panic if called outside the context of a future's task. pub fn poll_lock(&self) -> Async> { - if self.inner.locked.compare_and_swap(false, true, Ordering::SeqCst) { - self.inner.wait_queue.push(current()); - Async::NotReady - } else { - Async::Ready(MutexGuard{ inner: self }) + let mut ext_lock = self.wait_queue.lock(); + match self.data.try_lock() { + Some(guard) => { + Async::Ready(MutexGuard { + inner: self, + guard: Some(guard), + }) + }, + None => { + ext_lock.push(task::current()); + Async::NotReady + }, } } @@ -130,41 +74,6 @@ impl Mutex { inner: self } } - - /// Convert this lock into a future that resolves to a guard that allows access to the data. - /// This function returns `MutexAcquire`, which resolves to a `MutexGuard` - /// guard type. - /// - /// The returned future will never return an error. - pub fn into_lock(self) -> MutexIntoAcquire { - MutexIntoAcquire { - inner: self - } - } - - /// We unlock the mutex and wait for someone to lock. We try and unpark as many tasks as we - /// can to prevents dead tasks from deadlocking the mutex, or tasks that have finished their - /// critical section and were awakened. - fn unlock(&self) { - if !self.inner.locked.swap(false, Ordering::SeqCst) { - panic!("Tried to unlock an already unlocked Mutex, something has gone terribly wrong"); - } - - while !self.inner.locked.load(Ordering::SeqCst) { - match self.inner.wait_queue.try_pop() { - Some(task) => task.notify(), - None => return - } - } - } -} - -impl Clone for Mutex { - fn clone(&self) -> Mutex { - Mutex { - inner: self.inner.clone() - } - } } /// Returned RAII guard from the `poll_lock` method. @@ -172,27 +81,34 @@ impl Clone for Mutex { /// This structure acts as a sentinel to the data in the `Mutex` itself, /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be /// unlocked. -#[derive(Debug)] +// TODO: implement Debug pub struct MutexGuard<'a, T: 'a> { - inner: &'a Mutex + inner: &'a Mutex, + guard: Option>, } impl<'a, T> Deref for MutexGuard<'a, T> { type Target = T; + #[inline] fn deref(&self) -> &Self::Target { - unsafe { &*self.inner.inner.data.get() } + self.guard.as_ref().expect("mutex wasn't locked").deref() } } impl<'a, T> DerefMut for MutexGuard<'a, T> { + #[inline] fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.inner.inner.data.get() } + self.guard.as_mut().expect("mutex wasn't locked").deref_mut() } } impl<'a, T> Drop for MutexGuard<'a, T> { fn drop(&mut self) { - self.inner.unlock(); + let mut wait_queue_lock = self.inner.wait_queue.lock(); + let _ = self.guard.take().expect("mutex was already unlocked when guard is dropped"); + for task in wait_queue_lock.drain(..) { + task.notify(); + } } } @@ -211,84 +127,24 @@ impl<'a, T> Future for MutexAcquire<'a, T> { } } -/// Future returned by `FutMutex::lock` which resolves to a guard when a lock is acquired. -#[derive(Debug)] -pub struct MutexIntoAcquire { - inner: Mutex -} - -impl Future for MutexIntoAcquire { - type Item = MutexAcquired; - type Error = (); - - fn poll(&mut self) -> Poll { - match self.inner.poll_lock() { - Async::Ready(r) => { - mem::forget(r); - Ok(MutexAcquired { - inner: Mutex{ inner: self.inner.inner.clone() } - }.into()) - }, - Async::NotReady => Ok(Async::NotReady) - } - } -} - -#[derive(Debug)] -/// Resolved value of `FutMutexAcquire` future -/// -/// This value works like `FutMutexGuard`, providing a RAII guard to the value `T` through -/// `Deref` and `DerefMut`. Will unlock the lock when dropped; the original `FutMutex` can be -/// recovered with `unlock()`. -pub struct MutexAcquired { - inner: Mutex -} - -impl MutexAcquired { - pub fn unlock(self) -> Mutex { - Mutex { - inner: self.inner.inner.clone() - } - } -} - -impl Deref for MutexAcquired { - type Target = T; - fn deref(&self) -> &Self::Target { - unsafe { &*self.inner.inner.data.get() } - } -} - -impl DerefMut for MutexAcquired { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.inner.inner.data.get() } - } -} - -impl Drop for MutexAcquired { - fn drop(&mut self) { - self.inner.unlock(); - } -} - #[cfg(test)] mod tests { use super::*; use futures::executor::{self, Notify}; use futures::future; - use futures::stream::{self, Stream}; + use std::sync::Arc; use std::thread; struct Foo; impl Notify for Foo { - fn notify(&self, id: usize) {} + fn notify(&self, _: usize) {} } #[test] fn simple() { let future = future::lazy(|| { - let lock1 = Mutex::new(1); + let lock1 = Arc::new(Mutex::new(1)); let lock2 = lock1.clone(); let lock3 = lock1.clone(); @@ -330,19 +186,12 @@ mod tests { #[test] fn concurrent() { const N: usize = 10000; - let lock1 = Mutex::new(0); - let lock2 = lock1.clone(); let a = Increment { - a: Some(lock1), + a: Some(Arc::new(Mutex::new(0))), remaining: N, }; - let b = stream::iter_ok::<_, ()>(0..N).fold(lock2, |b, _n| { - b.into_lock().map(|mut b| { - *b += 1; - b.unlock() - }) - }); + let b = a.clone(); let t1 = thread::spawn(move || a.wait()); let b = b.wait().expect("b error"); @@ -357,16 +206,17 @@ mod tests { Async::NotReady => panic!("poll not ready"), } + #[derive(Clone)] struct Increment { remaining: usize, - a: Option>, + a: Option>>, } impl Future for Increment { - type Item = Mutex; + type Item = Arc>; type Error = (); - fn poll(&mut self) -> Poll, ()> { + fn poll(&mut self) -> Poll>, ()> { loop { if self.remaining == 0 { return Ok(self.a.take().unwrap().into())