mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-30 02:01:35 +00:00
Merge pull request #94 from tomaka/fix-race-condition-futmut
Fix the deadlock in futures-mutex
This commit is contained in:
@ -12,4 +12,4 @@ categories = ["asynchronous", "concurrency"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures = "0.1.14"
|
futures = "0.1.14"
|
||||||
crossbeam = "0.2.10"
|
parking_lot = "0.5.3"
|
||||||
|
@ -3,73 +3,14 @@
|
|||||||
//! API is similar to [`futures::sync::BiLock`](https://docs.rs/futures/0.1.11/futures/sync/struct.BiLock.html)
|
//! API is similar to [`futures::sync::BiLock`](https://docs.rs/futures/0.1.11/futures/sync/struct.BiLock.html)
|
||||||
//! However, it can be cloned into as many handles as desired.
|
//! However, it can be cloned into as many handles as desired.
|
||||||
//!
|
//!
|
||||||
//! ```
|
|
||||||
//! extern crate futures;
|
|
||||||
//! extern crate futures_mutex;
|
|
||||||
//!
|
|
||||||
//! use futures::{Future, Poll, Async};
|
|
||||||
//! use futures_mutex::Mutex;
|
|
||||||
//!
|
|
||||||
//! struct AddTwo {
|
|
||||||
//! lock: Mutex<usize>
|
|
||||||
//! }
|
|
||||||
//!
|
|
||||||
//! impl Future for AddTwo {
|
|
||||||
//! type Item = usize;
|
|
||||||
//! type Error = ();
|
|
||||||
//! fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
//! match self.lock.poll_lock() {
|
|
||||||
//! Async::Ready(mut g) => {
|
|
||||||
//! *g += 2;
|
|
||||||
//! Ok(Async::Ready(*g))
|
|
||||||
//! },
|
|
||||||
//! Async::NotReady => Ok(Async::NotReady)
|
|
||||||
//! }
|
|
||||||
//! }
|
|
||||||
//! }
|
|
||||||
//!
|
|
||||||
//! fn main() {
|
|
||||||
//! let lock1: Mutex<usize> = Mutex::new(0);
|
|
||||||
//! let lock2 = lock1.clone();
|
|
||||||
//!
|
|
||||||
//! let future = AddTwo { lock: lock2 };
|
|
||||||
//!
|
|
||||||
//! // This future will return the current value and the recovered lock.
|
|
||||||
//! let used_lock = lock1.into_lock().map(|b| (*b, b.unlock()));
|
|
||||||
//!
|
|
||||||
//! let _ = future.join(used_lock).map(|(add_two, (value, _))| {
|
|
||||||
//! assert_eq!(add_two, value);
|
|
||||||
//! }).wait().unwrap();
|
|
||||||
//! }
|
|
||||||
//! ```
|
|
||||||
|
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate crossbeam;
|
extern crate parking_lot;
|
||||||
|
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
use std::mem;
|
use futures::task::{self, Task};
|
||||||
use std::sync::Arc;
|
|
||||||
use std::sync::atomic::{Ordering, AtomicBool};
|
|
||||||
use std::cell::UnsafeCell;
|
|
||||||
use crossbeam::sync::MsQueue;
|
|
||||||
use futures::task::{current, Task};
|
|
||||||
use futures::{Future, Poll, Async};
|
use futures::{Future, Poll, Async};
|
||||||
|
use parking_lot::{Mutex as RegularMutex, MutexGuard as RegularMutexGuard};
|
||||||
#[derive(Debug)]
|
|
||||||
struct Inner<T> {
|
|
||||||
wait_queue: MsQueue<Task>,
|
|
||||||
locked: AtomicBool,
|
|
||||||
data: UnsafeCell<T>
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Drop for Inner<T> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
assert!(!self.locked.load(Ordering::SeqCst))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unsafe impl<T: Send> Send for Inner<T> {}
|
|
||||||
unsafe impl<T: Send> Sync for Inner<T> {}
|
|
||||||
|
|
||||||
/// A Mutex designed for use inside Futures. Works like `BiLock<T>` from the `futures` crate, but
|
/// A Mutex designed for use inside Futures. Works like `BiLock<T>` from the `futures` crate, but
|
||||||
/// with more than 2 handles.
|
/// with more than 2 handles.
|
||||||
@ -82,20 +23,16 @@ unsafe impl<T: Send> Sync for Inner<T> {}
|
|||||||
/// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.*
|
/// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.*
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Mutex<T> {
|
pub struct Mutex<T> {
|
||||||
inner: Arc<Inner<T>>
|
data: RegularMutex<T>,
|
||||||
|
wait_queue: RegularMutex<Vec<Task>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Mutex<T> {
|
impl<T> Mutex<T> {
|
||||||
/// Create a new Mutex wrapping around a value `t`
|
/// Create a new Mutex wrapping around a value `t`
|
||||||
pub fn new(t: T) -> Mutex<T> {
|
pub fn new(t: T) -> Mutex<T> {
|
||||||
let inner = Arc::new(Inner {
|
|
||||||
wait_queue: MsQueue::new(),
|
|
||||||
locked: AtomicBool::new(false),
|
|
||||||
data: UnsafeCell::new(t)
|
|
||||||
});
|
|
||||||
|
|
||||||
Mutex {
|
Mutex {
|
||||||
inner: inner
|
wait_queue: RegularMutex::new(Vec::new()),
|
||||||
|
data: RegularMutex::new(t),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -112,11 +49,18 @@ impl<T> Mutex<T> {
|
|||||||
///
|
///
|
||||||
/// This function will panic if called outside the context of a future's task.
|
/// This function will panic if called outside the context of a future's task.
|
||||||
pub fn poll_lock(&self) -> Async<MutexGuard<T>> {
|
pub fn poll_lock(&self) -> Async<MutexGuard<T>> {
|
||||||
if self.inner.locked.compare_and_swap(false, true, Ordering::SeqCst) {
|
let mut ext_lock = self.wait_queue.lock();
|
||||||
self.inner.wait_queue.push(current());
|
match self.data.try_lock() {
|
||||||
Async::NotReady
|
Some(guard) => {
|
||||||
} else {
|
Async::Ready(MutexGuard {
|
||||||
Async::Ready(MutexGuard{ inner: self })
|
inner: self,
|
||||||
|
guard: Some(guard),
|
||||||
|
})
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
ext_lock.push(task::current());
|
||||||
|
Async::NotReady
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,41 +74,6 @@ impl<T> Mutex<T> {
|
|||||||
inner: self
|
inner: self
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convert this lock into a future that resolves to a guard that allows access to the data.
|
|
||||||
/// This function returns `MutexAcquire<T>`, which resolves to a `MutexGuard<T>`
|
|
||||||
/// guard type.
|
|
||||||
///
|
|
||||||
/// The returned future will never return an error.
|
|
||||||
pub fn into_lock(self) -> MutexIntoAcquire<T> {
|
|
||||||
MutexIntoAcquire {
|
|
||||||
inner: self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// We unlock the mutex and wait for someone to lock. We try and unpark as many tasks as we
|
|
||||||
/// can to prevents dead tasks from deadlocking the mutex, or tasks that have finished their
|
|
||||||
/// critical section and were awakened.
|
|
||||||
fn unlock(&self) {
|
|
||||||
if !self.inner.locked.swap(false, Ordering::SeqCst) {
|
|
||||||
panic!("Tried to unlock an already unlocked Mutex, something has gone terribly wrong");
|
|
||||||
}
|
|
||||||
|
|
||||||
while !self.inner.locked.load(Ordering::SeqCst) {
|
|
||||||
match self.inner.wait_queue.try_pop() {
|
|
||||||
Some(task) => task.notify(),
|
|
||||||
None => return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Clone for Mutex<T> {
|
|
||||||
fn clone(&self) -> Mutex<T> {
|
|
||||||
Mutex {
|
|
||||||
inner: self.inner.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returned RAII guard from the `poll_lock` method.
|
/// Returned RAII guard from the `poll_lock` method.
|
||||||
@ -172,27 +81,34 @@ impl<T> Clone for Mutex<T> {
|
|||||||
/// This structure acts as a sentinel to the data in the `Mutex<T>` itself,
|
/// This structure acts as a sentinel to the data in the `Mutex<T>` itself,
|
||||||
/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
|
/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
|
||||||
/// unlocked.
|
/// unlocked.
|
||||||
#[derive(Debug)]
|
// TODO: implement Debug
|
||||||
pub struct MutexGuard<'a, T: 'a> {
|
pub struct MutexGuard<'a, T: 'a> {
|
||||||
inner: &'a Mutex<T>
|
inner: &'a Mutex<T>,
|
||||||
|
guard: Option<RegularMutexGuard<'a, T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T> Deref for MutexGuard<'a, T> {
|
impl<'a, T> Deref for MutexGuard<'a, T> {
|
||||||
type Target = T;
|
type Target = T;
|
||||||
|
#[inline]
|
||||||
fn deref(&self) -> &Self::Target {
|
fn deref(&self) -> &Self::Target {
|
||||||
unsafe { &*self.inner.inner.data.get() }
|
self.guard.as_ref().expect("mutex wasn't locked").deref()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T> DerefMut for MutexGuard<'a, T> {
|
impl<'a, T> DerefMut for MutexGuard<'a, T> {
|
||||||
|
#[inline]
|
||||||
fn deref_mut(&mut self) -> &mut T {
|
fn deref_mut(&mut self) -> &mut T {
|
||||||
unsafe { &mut *self.inner.inner.data.get() }
|
self.guard.as_mut().expect("mutex wasn't locked").deref_mut()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, T> Drop for MutexGuard<'a, T> {
|
impl<'a, T> Drop for MutexGuard<'a, T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.inner.unlock();
|
let mut wait_queue_lock = self.inner.wait_queue.lock();
|
||||||
|
let _ = self.guard.take().expect("mutex was already unlocked when guard is dropped");
|
||||||
|
for task in wait_queue_lock.drain(..) {
|
||||||
|
task.notify();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,84 +127,24 @@ impl<'a, T> Future for MutexAcquire<'a, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Future returned by `FutMutex::lock` which resolves to a guard when a lock is acquired.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct MutexIntoAcquire<T> {
|
|
||||||
inner: Mutex<T>
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Future for MutexIntoAcquire<T> {
|
|
||||||
type Item = MutexAcquired<T>;
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
match self.inner.poll_lock() {
|
|
||||||
Async::Ready(r) => {
|
|
||||||
mem::forget(r);
|
|
||||||
Ok(MutexAcquired {
|
|
||||||
inner: Mutex{ inner: self.inner.inner.clone() }
|
|
||||||
}.into())
|
|
||||||
},
|
|
||||||
Async::NotReady => Ok(Async::NotReady)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
/// Resolved value of `FutMutexAcquire<T>` future
|
|
||||||
///
|
|
||||||
/// This value works like `FutMutexGuard<T>`, providing a RAII guard to the value `T` through
|
|
||||||
/// `Deref` and `DerefMut`. Will unlock the lock when dropped; the original `FutMutex` can be
|
|
||||||
/// recovered with `unlock()`.
|
|
||||||
pub struct MutexAcquired<T> {
|
|
||||||
inner: Mutex<T>
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> MutexAcquired<T> {
|
|
||||||
pub fn unlock(self) -> Mutex<T> {
|
|
||||||
Mutex {
|
|
||||||
inner: self.inner.inner.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Deref for MutexAcquired<T> {
|
|
||||||
type Target = T;
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
unsafe { &*self.inner.inner.data.get() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> DerefMut for MutexAcquired<T> {
|
|
||||||
fn deref_mut(&mut self) -> &mut T {
|
|
||||||
unsafe { &mut *self.inner.inner.data.get() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Drop for MutexAcquired<T> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.inner.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use futures::executor::{self, Notify};
|
use futures::executor::{self, Notify};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use futures::stream::{self, Stream};
|
use std::sync::Arc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
struct Foo;
|
struct Foo;
|
||||||
|
|
||||||
impl Notify for Foo {
|
impl Notify for Foo {
|
||||||
fn notify(&self, id: usize) {}
|
fn notify(&self, _: usize) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn simple() {
|
fn simple() {
|
||||||
let future = future::lazy(|| {
|
let future = future::lazy(|| {
|
||||||
let lock1 = Mutex::new(1);
|
let lock1 = Arc::new(Mutex::new(1));
|
||||||
let lock2 = lock1.clone();
|
let lock2 = lock1.clone();
|
||||||
let lock3 = lock1.clone();
|
let lock3 = lock1.clone();
|
||||||
|
|
||||||
@ -330,19 +186,12 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn concurrent() {
|
fn concurrent() {
|
||||||
const N: usize = 10000;
|
const N: usize = 10000;
|
||||||
let lock1 = Mutex::new(0);
|
|
||||||
let lock2 = lock1.clone();
|
|
||||||
|
|
||||||
let a = Increment {
|
let a = Increment {
|
||||||
a: Some(lock1),
|
a: Some(Arc::new(Mutex::new(0))),
|
||||||
remaining: N,
|
remaining: N,
|
||||||
};
|
};
|
||||||
let b = stream::iter_ok::<_, ()>(0..N).fold(lock2, |b, _n| {
|
let b = a.clone();
|
||||||
b.into_lock().map(|mut b| {
|
|
||||||
*b += 1;
|
|
||||||
b.unlock()
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
let t1 = thread::spawn(move || a.wait());
|
let t1 = thread::spawn(move || a.wait());
|
||||||
let b = b.wait().expect("b error");
|
let b = b.wait().expect("b error");
|
||||||
@ -357,16 +206,17 @@ mod tests {
|
|||||||
Async::NotReady => panic!("poll not ready"),
|
Async::NotReady => panic!("poll not ready"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
struct Increment {
|
struct Increment {
|
||||||
remaining: usize,
|
remaining: usize,
|
||||||
a: Option<Mutex<usize>>,
|
a: Option<Arc<Mutex<usize>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for Increment {
|
impl Future for Increment {
|
||||||
type Item = Mutex<usize>;
|
type Item = Arc<Mutex<usize>>;
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Mutex<usize>, ()> {
|
fn poll(&mut self) -> Poll<Arc<Mutex<usize>>, ()> {
|
||||||
loop {
|
loop {
|
||||||
if self.remaining == 0 {
|
if self.remaining == 0 {
|
||||||
return Ok(self.a.take().unwrap().into())
|
return Ok(self.a.take().unwrap().into())
|
||||||
|
Reference in New Issue
Block a user