From c2d4b75988bfeff20eb49e40e618d73732af2f98 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 5 Jan 2018 11:58:27 +0100 Subject: [PATCH 1/3] Remove Mutex::into_lock --- futures-mutex/src/lib.rs | 118 +-------------------------------------- 1 file changed, 2 insertions(+), 116 deletions(-) diff --git a/futures-mutex/src/lib.rs b/futures-mutex/src/lib.rs index 0aee079a..e146e4a3 100644 --- a/futures-mutex/src/lib.rs +++ b/futures-mutex/src/lib.rs @@ -3,45 +3,6 @@ //! API is similar to [`futures::sync::BiLock`](https://docs.rs/futures/0.1.11/futures/sync/struct.BiLock.html) //! However, it can be cloned into as many handles as desired. //! -//! ``` -//! extern crate futures; -//! extern crate futures_mutex; -//! -//! use futures::{Future, Poll, Async}; -//! use futures_mutex::Mutex; -//! -//! struct AddTwo { -//! lock: Mutex -//! } -//! -//! impl Future for AddTwo { -//! type Item = usize; -//! type Error = (); -//! fn poll(&mut self) -> Poll { -//! match self.lock.poll_lock() { -//! Async::Ready(mut g) => { -//! *g += 2; -//! Ok(Async::Ready(*g)) -//! }, -//! Async::NotReady => Ok(Async::NotReady) -//! } -//! } -//! } -//! -//! fn main() { -//! let lock1: Mutex = Mutex::new(0); -//! let lock2 = lock1.clone(); -//! -//! let future = AddTwo { lock: lock2 }; -//! -//! // This future will return the current value and the recovered lock. -//! let used_lock = lock1.into_lock().map(|b| (*b, b.unlock())); -//! -//! let _ = future.join(used_lock).map(|(add_two, (value, _))| { -//! assert_eq!(add_two, value); -//! }).wait().unwrap(); -//! } -//! ``` extern crate futures; extern crate crossbeam; @@ -131,17 +92,6 @@ impl Mutex { } } - /// Convert this lock into a future that resolves to a guard that allows access to the data. - /// This function returns `MutexAcquire`, which resolves to a `MutexGuard` - /// guard type. - /// - /// The returned future will never return an error. - pub fn into_lock(self) -> MutexIntoAcquire { - MutexIntoAcquire { - inner: self - } - } - /// We unlock the mutex and wait for someone to lock. We try and unpark as many tasks as we /// can to prevents dead tasks from deadlocking the mutex, or tasks that have finished their /// critical section and were awakened. @@ -211,66 +161,6 @@ impl<'a, T> Future for MutexAcquire<'a, T> { } } -/// Future returned by `FutMutex::lock` which resolves to a guard when a lock is acquired. -#[derive(Debug)] -pub struct MutexIntoAcquire { - inner: Mutex -} - -impl Future for MutexIntoAcquire { - type Item = MutexAcquired; - type Error = (); - - fn poll(&mut self) -> Poll { - match self.inner.poll_lock() { - Async::Ready(r) => { - mem::forget(r); - Ok(MutexAcquired { - inner: Mutex{ inner: self.inner.inner.clone() } - }.into()) - }, - Async::NotReady => Ok(Async::NotReady) - } - } -} - -#[derive(Debug)] -/// Resolved value of `FutMutexAcquire` future -/// -/// This value works like `FutMutexGuard`, providing a RAII guard to the value `T` through -/// `Deref` and `DerefMut`. Will unlock the lock when dropped; the original `FutMutex` can be -/// recovered with `unlock()`. -pub struct MutexAcquired { - inner: Mutex -} - -impl MutexAcquired { - pub fn unlock(self) -> Mutex { - Mutex { - inner: self.inner.inner.clone() - } - } -} - -impl Deref for MutexAcquired { - type Target = T; - fn deref(&self) -> &Self::Target { - unsafe { &*self.inner.inner.data.get() } - } -} - -impl DerefMut for MutexAcquired { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.inner.inner.data.get() } - } -} - -impl Drop for MutexAcquired { - fn drop(&mut self) { - self.inner.unlock(); - } -} - #[cfg(test)] mod tests { use super::*; @@ -337,12 +227,7 @@ mod tests { a: Some(lock1), remaining: N, }; - let b = stream::iter_ok::<_, ()>(0..N).fold(lock2, |b, _n| { - b.into_lock().map(|mut b| { - *b += 1; - b.unlock() - }) - }); + let b = a.clone(); let t1 = thread::spawn(move || a.wait()); let b = b.wait().expect("b error"); @@ -357,6 +242,7 @@ mod tests { Async::NotReady => panic!("poll not ready"), } + #[derive(Clone)] struct Increment { remaining: usize, a: Option>, From e57f3059dbed256a703ed76a25f0a0d819e6709c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 10 Jan 2018 12:44:22 +0100 Subject: [PATCH 2/3] Fix the deadlock in futures-mutex --- futures-mutex/Cargo.toml | 2 +- futures-mutex/src/lib.rs | 77 ++++++++++++++++------------------------ 2 files changed, 32 insertions(+), 47 deletions(-) diff --git a/futures-mutex/Cargo.toml b/futures-mutex/Cargo.toml index 6ba96685..823dc068 100644 --- a/futures-mutex/Cargo.toml +++ b/futures-mutex/Cargo.toml @@ -12,4 +12,4 @@ categories = ["asynchronous", "concurrency"] [dependencies] futures = "0.1.14" -crossbeam = "0.2.10" +parking_lot = "0.5.3" diff --git a/futures-mutex/src/lib.rs b/futures-mutex/src/lib.rs index e146e4a3..18cba0e9 100644 --- a/futures-mutex/src/lib.rs +++ b/futures-mutex/src/lib.rs @@ -5,33 +5,21 @@ //! extern crate futures; -extern crate crossbeam; +extern crate parking_lot; use std::ops::{Deref, DerefMut}; use std::mem; use std::sync::Arc; -use std::sync::atomic::{Ordering, AtomicBool}; -use std::cell::UnsafeCell; -use crossbeam::sync::MsQueue; use futures::task::{current, Task}; use futures::{Future, Poll, Async}; +use parking_lot::{Mutex as RegularMutex, MutexGuard as RegularMutexGuard}; #[derive(Debug)] struct Inner { - wait_queue: MsQueue, - locked: AtomicBool, - data: UnsafeCell + data: RegularMutex, + wait_queue: RegularMutex>, } -impl Drop for Inner { - fn drop(&mut self) { - assert!(!self.locked.load(Ordering::SeqCst)) - } -} - -unsafe impl Send for Inner {} -unsafe impl Sync for Inner {} - /// A Mutex designed for use inside Futures. Works like `BiLock` from the `futures` crate, but /// with more than 2 handles. /// @@ -43,16 +31,15 @@ unsafe impl Sync for Inner {} /// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.* #[derive(Debug)] pub struct Mutex { - inner: Arc> + inner: Arc>, } impl Mutex { /// Create a new Mutex wrapping around a value `t` pub fn new(t: T) -> Mutex { let inner = Arc::new(Inner { - wait_queue: MsQueue::new(), - locked: AtomicBool::new(false), - data: UnsafeCell::new(t) + wait_queue: RegularMutex::new(Vec::new()), + data: RegularMutex::new(t), }); Mutex { @@ -73,11 +60,18 @@ impl Mutex { /// /// This function will panic if called outside the context of a future's task. pub fn poll_lock(&self) -> Async> { - if self.inner.locked.compare_and_swap(false, true, Ordering::SeqCst) { - self.inner.wait_queue.push(current()); - Async::NotReady - } else { - Async::Ready(MutexGuard{ inner: self }) + let mut ext_lock = self.inner.wait_queue.lock(); + match self.inner.data.try_lock() { + Some(guard) => { + Async::Ready(MutexGuard { + inner: &self.inner, + guard: Some(guard), + }) + }, + None => { + ext_lock.push(current()); + Async::NotReady + }, } } @@ -91,22 +85,6 @@ impl Mutex { inner: self } } - - /// We unlock the mutex and wait for someone to lock. We try and unpark as many tasks as we - /// can to prevents dead tasks from deadlocking the mutex, or tasks that have finished their - /// critical section and were awakened. - fn unlock(&self) { - if !self.inner.locked.swap(false, Ordering::SeqCst) { - panic!("Tried to unlock an already unlocked Mutex, something has gone terribly wrong"); - } - - while !self.inner.locked.load(Ordering::SeqCst) { - match self.inner.wait_queue.try_pop() { - Some(task) => task.notify(), - None => return - } - } - } } impl Clone for Mutex { @@ -122,27 +100,34 @@ impl Clone for Mutex { /// This structure acts as a sentinel to the data in the `Mutex` itself, /// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be /// unlocked. -#[derive(Debug)] +// TODO: implement Debug pub struct MutexGuard<'a, T: 'a> { - inner: &'a Mutex + inner: &'a Inner, + guard: Option>, } impl<'a, T> Deref for MutexGuard<'a, T> { type Target = T; + #[inline] fn deref(&self) -> &Self::Target { - unsafe { &*self.inner.inner.data.get() } + self.guard.as_ref().expect("mutex wasn't locked").deref() } } impl<'a, T> DerefMut for MutexGuard<'a, T> { + #[inline] fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.inner.inner.data.get() } + self.guard.as_mut().expect("mutex wasn't locked").deref_mut() } } impl<'a, T> Drop for MutexGuard<'a, T> { fn drop(&mut self) { - self.inner.unlock(); + let mut wait_queue_lock = self.inner.wait_queue.lock(); + let _ = self.guard.take().expect("mutex was unlocked"); + for task in wait_queue_lock.drain(..) { + task.notify(); + } } } From 156b971f343a5c737693609955073a1b606ee489 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 15 Jan 2018 12:56:35 +0100 Subject: [PATCH 3/3] Fix concerns --- futures-mutex/src/lib.rs | 55 +++++++++++++--------------------------- 1 file changed, 17 insertions(+), 38 deletions(-) diff --git a/futures-mutex/src/lib.rs b/futures-mutex/src/lib.rs index 18cba0e9..43c6e311 100644 --- a/futures-mutex/src/lib.rs +++ b/futures-mutex/src/lib.rs @@ -8,18 +8,10 @@ extern crate futures; extern crate parking_lot; use std::ops::{Deref, DerefMut}; -use std::mem; -use std::sync::Arc; -use futures::task::{current, Task}; +use futures::task::{self, Task}; use futures::{Future, Poll, Async}; use parking_lot::{Mutex as RegularMutex, MutexGuard as RegularMutexGuard}; -#[derive(Debug)] -struct Inner { - data: RegularMutex, - wait_queue: RegularMutex>, -} - /// A Mutex designed for use inside Futures. Works like `BiLock` from the `futures` crate, but /// with more than 2 handles. /// @@ -31,19 +23,16 @@ struct Inner { /// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.* #[derive(Debug)] pub struct Mutex { - inner: Arc>, + data: RegularMutex, + wait_queue: RegularMutex>, } impl Mutex { /// Create a new Mutex wrapping around a value `t` pub fn new(t: T) -> Mutex { - let inner = Arc::new(Inner { + Mutex { wait_queue: RegularMutex::new(Vec::new()), data: RegularMutex::new(t), - }); - - Mutex { - inner: inner } } @@ -60,16 +49,16 @@ impl Mutex { /// /// This function will panic if called outside the context of a future's task. pub fn poll_lock(&self) -> Async> { - let mut ext_lock = self.inner.wait_queue.lock(); - match self.inner.data.try_lock() { + let mut ext_lock = self.wait_queue.lock(); + match self.data.try_lock() { Some(guard) => { Async::Ready(MutexGuard { - inner: &self.inner, + inner: self, guard: Some(guard), }) }, None => { - ext_lock.push(current()); + ext_lock.push(task::current()); Async::NotReady }, } @@ -87,14 +76,6 @@ impl Mutex { } } -impl Clone for Mutex { - fn clone(&self) -> Mutex { - Mutex { - inner: self.inner.clone() - } - } -} - /// Returned RAII guard from the `poll_lock` method. /// /// This structure acts as a sentinel to the data in the `Mutex` itself, @@ -102,7 +83,7 @@ impl Clone for Mutex { /// unlocked. // TODO: implement Debug pub struct MutexGuard<'a, T: 'a> { - inner: &'a Inner, + inner: &'a Mutex, guard: Option>, } @@ -124,7 +105,7 @@ impl<'a, T> DerefMut for MutexGuard<'a, T> { impl<'a, T> Drop for MutexGuard<'a, T> { fn drop(&mut self) { let mut wait_queue_lock = self.inner.wait_queue.lock(); - let _ = self.guard.take().expect("mutex was unlocked"); + let _ = self.guard.take().expect("mutex was already unlocked when guard is dropped"); for task in wait_queue_lock.drain(..) { task.notify(); } @@ -151,19 +132,19 @@ mod tests { use super::*; use futures::executor::{self, Notify}; use futures::future; - use futures::stream::{self, Stream}; + use std::sync::Arc; use std::thread; struct Foo; impl Notify for Foo { - fn notify(&self, id: usize) {} + fn notify(&self, _: usize) {} } #[test] fn simple() { let future = future::lazy(|| { - let lock1 = Mutex::new(1); + let lock1 = Arc::new(Mutex::new(1)); let lock2 = lock1.clone(); let lock3 = lock1.clone(); @@ -205,11 +186,9 @@ mod tests { #[test] fn concurrent() { const N: usize = 10000; - let lock1 = Mutex::new(0); - let lock2 = lock1.clone(); let a = Increment { - a: Some(lock1), + a: Some(Arc::new(Mutex::new(0))), remaining: N, }; let b = a.clone(); @@ -230,14 +209,14 @@ mod tests { #[derive(Clone)] struct Increment { remaining: usize, - a: Option>, + a: Option>>, } impl Future for Increment { - type Item = Mutex; + type Item = Arc>; type Error = (); - fn poll(&mut self) -> Poll, ()> { + fn poll(&mut self) -> Poll>, ()> { loop { if self.remaining == 0 { return Ok(self.a.take().unwrap().into())