Fix concerns

This commit is contained in:
Pierre Krieger
2018-01-15 12:56:35 +01:00
parent e57f3059db
commit 156b971f34

View File

@ -8,18 +8,10 @@ extern crate futures;
extern crate parking_lot; extern crate parking_lot;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::mem; use futures::task::{self, Task};
use std::sync::Arc;
use futures::task::{current, Task};
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use parking_lot::{Mutex as RegularMutex, MutexGuard as RegularMutexGuard}; use parking_lot::{Mutex as RegularMutex, MutexGuard as RegularMutexGuard};
#[derive(Debug)]
struct Inner<T> {
data: RegularMutex<T>,
wait_queue: RegularMutex<Vec<Task>>,
}
/// A Mutex designed for use inside Futures. Works like `BiLock<T>` from the `futures` crate, but /// A Mutex designed for use inside Futures. Works like `BiLock<T>` from the `futures` crate, but
/// with more than 2 handles. /// with more than 2 handles.
/// ///
@ -31,19 +23,16 @@ struct Inner<T> {
/// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.* /// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.*
#[derive(Debug)] #[derive(Debug)]
pub struct Mutex<T> { pub struct Mutex<T> {
inner: Arc<Inner<T>>, data: RegularMutex<T>,
wait_queue: RegularMutex<Vec<Task>>,
} }
impl<T> Mutex<T> { impl<T> Mutex<T> {
/// Create a new Mutex wrapping around a value `t` /// Create a new Mutex wrapping around a value `t`
pub fn new(t: T) -> Mutex<T> { pub fn new(t: T) -> Mutex<T> {
let inner = Arc::new(Inner { Mutex {
wait_queue: RegularMutex::new(Vec::new()), wait_queue: RegularMutex::new(Vec::new()),
data: RegularMutex::new(t), data: RegularMutex::new(t),
});
Mutex {
inner: inner
} }
} }
@ -60,16 +49,16 @@ impl<T> Mutex<T> {
/// ///
/// This function will panic if called outside the context of a future's task. /// This function will panic if called outside the context of a future's task.
pub fn poll_lock(&self) -> Async<MutexGuard<T>> { pub fn poll_lock(&self) -> Async<MutexGuard<T>> {
let mut ext_lock = self.inner.wait_queue.lock(); let mut ext_lock = self.wait_queue.lock();
match self.inner.data.try_lock() { match self.data.try_lock() {
Some(guard) => { Some(guard) => {
Async::Ready(MutexGuard { Async::Ready(MutexGuard {
inner: &self.inner, inner: self,
guard: Some(guard), guard: Some(guard),
}) })
}, },
None => { None => {
ext_lock.push(current()); ext_lock.push(task::current());
Async::NotReady Async::NotReady
}, },
} }
@ -87,14 +76,6 @@ impl<T> Mutex<T> {
} }
} }
impl<T> Clone for Mutex<T> {
fn clone(&self) -> Mutex<T> {
Mutex {
inner: self.inner.clone()
}
}
}
/// Returned RAII guard from the `poll_lock` method. /// Returned RAII guard from the `poll_lock` method.
/// ///
/// This structure acts as a sentinel to the data in the `Mutex<T>` itself, /// This structure acts as a sentinel to the data in the `Mutex<T>` itself,
@ -102,7 +83,7 @@ impl<T> Clone for Mutex<T> {
/// unlocked. /// unlocked.
// TODO: implement Debug // TODO: implement Debug
pub struct MutexGuard<'a, T: 'a> { pub struct MutexGuard<'a, T: 'a> {
inner: &'a Inner<T>, inner: &'a Mutex<T>,
guard: Option<RegularMutexGuard<'a, T>>, guard: Option<RegularMutexGuard<'a, T>>,
} }
@ -124,7 +105,7 @@ impl<'a, T> DerefMut for MutexGuard<'a, T> {
impl<'a, T> Drop for MutexGuard<'a, T> { impl<'a, T> Drop for MutexGuard<'a, T> {
fn drop(&mut self) { fn drop(&mut self) {
let mut wait_queue_lock = self.inner.wait_queue.lock(); let mut wait_queue_lock = self.inner.wait_queue.lock();
let _ = self.guard.take().expect("mutex was unlocked"); let _ = self.guard.take().expect("mutex was already unlocked when guard is dropped");
for task in wait_queue_lock.drain(..) { for task in wait_queue_lock.drain(..) {
task.notify(); task.notify();
} }
@ -151,19 +132,19 @@ mod tests {
use super::*; use super::*;
use futures::executor::{self, Notify}; use futures::executor::{self, Notify};
use futures::future; use futures::future;
use futures::stream::{self, Stream}; use std::sync::Arc;
use std::thread; use std::thread;
struct Foo; struct Foo;
impl Notify for Foo { impl Notify for Foo {
fn notify(&self, id: usize) {} fn notify(&self, _: usize) {}
} }
#[test] #[test]
fn simple() { fn simple() {
let future = future::lazy(|| { let future = future::lazy(|| {
let lock1 = Mutex::new(1); let lock1 = Arc::new(Mutex::new(1));
let lock2 = lock1.clone(); let lock2 = lock1.clone();
let lock3 = lock1.clone(); let lock3 = lock1.clone();
@ -205,11 +186,9 @@ mod tests {
#[test] #[test]
fn concurrent() { fn concurrent() {
const N: usize = 10000; const N: usize = 10000;
let lock1 = Mutex::new(0);
let lock2 = lock1.clone();
let a = Increment { let a = Increment {
a: Some(lock1), a: Some(Arc::new(Mutex::new(0))),
remaining: N, remaining: N,
}; };
let b = a.clone(); let b = a.clone();
@ -230,14 +209,14 @@ mod tests {
#[derive(Clone)] #[derive(Clone)]
struct Increment { struct Increment {
remaining: usize, remaining: usize,
a: Option<Mutex<usize>>, a: Option<Arc<Mutex<usize>>>,
} }
impl Future for Increment { impl Future for Increment {
type Item = Mutex<usize>; type Item = Arc<Mutex<usize>>;
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Mutex<usize>, ()> { fn poll(&mut self) -> Poll<Arc<Mutex<usize>>, ()> {
loop { loop {
if self.remaining == 0 { if self.remaining == 0 {
return Ok(self.a.take().unwrap().into()) return Ok(self.a.take().unwrap().into())