General cleanup and rework

This commit is contained in:
Pierre Krieger
2017-11-15 17:24:28 +01:00
parent a533300e8f
commit aeceb04d50
15 changed files with 910 additions and 338 deletions

View File

@ -5,8 +5,8 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
base64 = "0.7"
chashmap = "2.2"
futures = "0.1"
parking_lot = "0.4"
serde = "1.0"
serde_json = "1.0"
tempfile = "2.2"

View File

@ -18,35 +18,37 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Implementation of `Datastore` that uses a single plain JSON file for storage.
use Datastore;
use chashmap::{CHashMap, WriteGuard};
use futures::Future;
use futures::stream::{Stream, iter_ok};
use parking_lot::Mutex;
use query::{Query, naive_apply_query};
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::{Map, from_value, to_value, from_reader, to_writer};
use serde_json::value::Value;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs;
use std::io::Cursor;
use std::io::Error as IoError;
use std::io::ErrorKind as IoErrorKind;
use std::io::Read;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use tempfile::NamedTempFile;
/// Implementation of `Datastore` that uses a single plain JSON file.
pub struct JsonFileDatastore<T>
where T: Serialize + DeserializeOwned
where T: Serialize + DeserializeOwned + Clone
{
path: PathBuf,
content: Mutex<HashMap<String, T>>,
content: CHashMap<String, T>,
}
impl<T> JsonFileDatastore<T>
where T: Serialize + DeserializeOwned
where T: Serialize + DeserializeOwned + Clone
{
/// Opens or creates the datastore. If the path refers to an existing path, then this function
/// will attempt to load an existing set of values from it (which can result in an error).
@ -59,7 +61,7 @@ impl<T> JsonFileDatastore<T>
if !path.exists() {
return Ok(JsonFileDatastore {
path: path,
content: Mutex::new(HashMap::new()),
content: CHashMap::new(),
});
}
@ -74,12 +76,12 @@ impl<T> JsonFileDatastore<T>
let mut first_byte = [0];
if file.read(&mut first_byte)? == 0 {
// File is empty.
HashMap::new()
CHashMap::new()
} else {
match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) {
Ok(Value::Null) => HashMap::new(),
Ok(Value::Null) => CHashMap::new(),
Ok(Value::Object(map)) => {
let mut out = HashMap::with_capacity(map.len());
let mut out = CHashMap::with_capacity(map.len());
for (key, value) in map.into_iter() {
let value = match from_value(value) {
Ok(v) => v,
@ -99,10 +101,7 @@ impl<T> JsonFileDatastore<T>
}
};
Ok(JsonFileDatastore {
path: path,
content: Mutex::new(content),
})
Ok(JsonFileDatastore { path: path, content: content })
}
/// Flushes the content of the datastore to the disk.
@ -110,22 +109,24 @@ impl<T> JsonFileDatastore<T>
/// This function can only fail in case of a disk access error. If an error occurs, any change
/// to the datastore that was performed since the last successful flush will be lost. No data
/// will be corrupted.
pub fn flush(&self) -> Result<(), IoError> {
pub fn flush(&self) -> Result<(), IoError>
where T: Clone
{
// Create a temporary file in the same directory as the destination, which avoids the
// problem of having a file cleaner delete our file while we use it.
let self_path_parent = self.path
.parent()
.ok_or(IoError::new(
.parent()
.ok_or(IoError::new(
IoErrorKind::Other,
"couldn't get parent directory of destination",
))?;
let mut temporary_file = NamedTempFile::new_in(self_path_parent)?;
let content = self.content.lock();
let content = self.content.clone().into_iter();
to_writer(
&mut temporary_file,
&content.iter().map(|(k, v)| (k.clone(), to_value(v).unwrap())).collect::<Map<_, _>>(),
)?; // TODO: panic!
&content.map(|(k, v)| (k, to_value(v).unwrap())).collect::<Map<_, _>>(),
)?;
temporary_file.sync_data()?;
// Note that `persist` will fail if we try to persist across filesystems. However that
@ -136,60 +137,75 @@ impl<T> JsonFileDatastore<T>
}
}
impl<T> Datastore<T> for JsonFileDatastore<T>
where T: Clone + Serialize + DeserializeOwned + Default + Ord + 'static
impl<'a, T> Datastore<T> for &'a JsonFileDatastore<T>
where T: Clone + Serialize + DeserializeOwned + Default + PartialOrd + 'static
{
type Entry = JsonFileDatastoreEntry<'a, T>;
type QueryResult = Box<Stream<Item = (String, T), Error = IoError> + 'a>;
#[inline]
fn put(&self, key: Cow<str>, value: T) {
let mut content = self.content.lock();
content.insert(key.into_owned(), value);
fn lock(self, key: Cow<str>) -> Option<Self::Entry> {
self.content.get_mut(&key.into_owned()).map(JsonFileDatastoreEntry)
}
fn get(&self, key: &str) -> Option<T> {
let content = self.content.lock();
// If the JSON is malformed, we just ignore the value.
content.get(key).cloned()
#[inline]
fn lock_or_create(self, key: Cow<str>) -> Self::Entry {
loop {
self.content.upsert(key.clone().into_owned(), || Default::default(), |_| {});
// There is a slight possibility that another thread will delete our value in this
// small interval. If this happens, we just loop and reinsert the value again until
// we can acquire a lock.
if let Some(v) = self.content.get_mut(&key.clone().into_owned()) {
return JsonFileDatastoreEntry(v);
}
}
}
fn has(&self, key: &str) -> bool {
let content = self.content.lock();
content.contains_key(key)
#[inline]
fn put(self, key: Cow<str>, value: T) {
self.content.insert(key.into_owned(), value);
}
fn delete(&self, key: &str) -> bool {
let mut content = self.content.lock();
content.remove(key).is_some()
#[inline]
fn get(self, key: &str) -> Option<T> {
self.content.get(&key.to_owned()).map(|v| v.clone())
}
fn query<'a>(
&'a self,
query: Query<T>,
) -> Box<Stream<Item = (String, T), Error = IoError> + 'a> {
let content = self.content.lock();
#[inline]
fn has(self, key: &str) -> bool {
self.content.contains_key(&key.to_owned())
}
#[inline]
fn delete(self, key: &str) -> Option<T> {
self.content.remove(&key.to_owned())
}
fn query(self, query: Query<T>) -> Self::QueryResult {
let content = self.content.clone();
let keys_only = query.keys_only;
let content_stream = iter_ok(content.iter().filter_map(|(key, value)| {
let content_stream = iter_ok(content.into_iter().filter_map(|(key, value)| {
// Skip values that are malformed.
let value = if keys_only { Default::default() } else { value.clone() };
Some((key.clone(), value))
let value = if keys_only { Default::default() } else { value };
Some((key, value))
}));
// `content_stream` reads from the content of the `Mutex`, so we need to clone the data
// into a `Vec` before returning.
let collected = naive_apply_query(content_stream, query)
.collect()
.wait()
.expect("can only fail if either `naive_apply_query` or `content_stream` produce \
an error, which cann't happen");
let collected = naive_apply_query(content_stream, query).collect().wait().expect(
"can only fail if either `naive_apply_query` or `content_stream` produce \
an error, which cann't happen",
);
let output_stream = iter_ok(collected.into_iter());
Box::new(output_stream) as Box<_>
}
}
impl<T> Drop for JsonFileDatastore<T>
where T: Serialize + DeserializeOwned
where T: Serialize + DeserializeOwned + Clone
{
#[inline]
fn drop(&mut self) {
@ -203,6 +219,27 @@ impl<T> Drop for JsonFileDatastore<T>
}
}
/// Implementation of `Datastore` that uses a single plain JSON file.
pub struct JsonFileDatastoreEntry<'a, T>(WriteGuard<'a, String, T>) where T: 'a;
impl<'a, T> Deref for JsonFileDatastoreEntry<'a, T>
where T: 'a
{
type Target = T;
fn deref(&self) -> &T {
&*self.0
}
}
impl<'a, T> DerefMut for JsonFileDatastoreEntry<'a, T>
where T: 'a
{
fn deref_mut(&mut self) -> &mut T {
&mut *self.0
}
}
#[cfg(test)]
mod tests {
use {Query, Order, Filter, FilterTy, FilterOp};
@ -259,9 +296,9 @@ mod tests {
limit: u64::max_value(),
keys_only: false,
})
.collect()
.wait()
.unwrap();
.collect()
.wait()
.unwrap();
assert_eq!(query[0].0, "foo4");
assert_eq!(query[0].1, &[10, 11, 12]);

View File

@ -19,9 +19,9 @@
// DEALINGS IN THE SOFTWARE.
extern crate base64;
extern crate chashmap;
#[macro_use]
extern crate futures;
extern crate parking_lot;
extern crate serde;
extern crate serde_json;
extern crate tempfile;
@ -29,34 +29,67 @@ extern crate tempfile;
use futures::Stream;
use std::borrow::Cow;
use std::io::Error as IoError;
use std::ops::DerefMut;
mod query;
mod json_file;
pub use self::json_file::JsonFileDatastore;
pub use self::json_file::{JsonFileDatastore, JsonFileDatastoreEntry};
pub use self::query::{Query, Order, Filter, FilterTy, FilterOp};
/// Abstraction over any struct that can store `(key, value)` pairs.
pub trait Datastore<T> {
/// Sets the value of a key.
fn put(&self, key: Cow<str>, value: T);
/// Locked entry.
type Entry: DerefMut<Target = T>;
/// Output of a query.
type QueryResult: Stream<Item = (String, T), Error = IoError>;
/// Returns the value corresponding to this key.
// TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data
fn get(&self, key: &str) -> Option<T>;
/// Sets the value of a key.
#[inline]
fn put(self, key: Cow<str>, value: T)
where Self: Sized
{
*self.lock_or_create(key) = value;
}
/// Checks if an entry exists, and if so locks it.
///
/// Trying to lock a value that is already locked will block, therefore you should keep locks
/// for a duration that is as short as possible.
fn lock(self, key: Cow<str>) -> Option<Self::Entry>;
/// Locks an entry if it exists, or creates it otherwise.
///
/// Same as `put` followed with `lock`, except that it is atomic.
fn lock_or_create(self, key: Cow<str>) -> Self::Entry;
/// Returns the value corresponding to this key by cloning it.
#[inline]
fn get(self, key: &str) -> Option<T>
where Self: Sized,
T: Clone
{
self.lock(key.into()).map(|v| v.clone())
}
/// Returns true if the datastore contains the given key.
fn has(&self, key: &str) -> bool;
///
/// > **Note**: Keep in mind that using this operation is probably racy. A secondary thread
/// > can delete a key right after you called `has()`. In other words, this function
/// > returns whether an entry with that key existed in the short past.
#[inline]
fn has(self, key: &str) -> bool
where Self: Sized
{
self.lock(key.into()).is_some()
}
/// Removes the given key from the datastore. Returns true if the key existed.
fn delete(&self, key: &str) -> bool;
/// Removes the given key from the datastore. Returns the old value if the key existed.
fn delete(self, key: &str) -> Option<T>;
/// Executes a query on the key-value store.
///
/// This operation is expensive on some implementations and cheap on others. It is your
/// responsibility to pick the right implementation for the right job.
fn query<'a>(
&'a self,
query: Query<T>,
) -> Box<Stream<Item = (String, T), Error = IoError> + 'a>;
fn query(self, query: Query<T>) -> Self::QueryResult;
}

View File

@ -92,7 +92,7 @@ pub enum Order {
pub fn naive_apply_query<'a, S, V>(stream: S, query: Query<'a, V>)
-> StreamTake<StreamSkip<NaiveKeysOnlyApply<NaiveApplyOrdered<NaiveFiltersApply<'a, NaivePrefixApply<'a, S>, VecIntoIter<Filter<'a, V>>>, V>>>>
where S: Stream<Item = (String, V), Error = IoError> + 'a,
V: Clone + Ord + Default + 'static
V: Clone + PartialOrd + Default + 'static
{
let prefixed = naive_apply_prefix(stream, query.prefix);
let filtered = naive_apply_filters(prefixed, query.filters.into_iter());
@ -129,7 +129,7 @@ pub struct NaiveKeysOnlyApply<S> {
impl<S, T> Stream for NaiveKeysOnlyApply<S>
where S: Stream<Item = (String, T), Error = IoError>,
T: Default
T: Default
{
type Item = (String, T);
type Error = IoError;
@ -188,9 +188,9 @@ impl<'a, S, T> Stream for NaivePrefixApply<'a, S>
/// is empty. Otherwise will need to collect.
pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S, V>
where S: Stream<Item = (String, V), Error = IoError> + 'a,
I: IntoIterator<Item = Order>,
I::IntoIter: 'a,
V: Ord + 'static
I: IntoIterator<Item = Order>,
I::IntoIter: 'a,
V: PartialOrd + 'static
{
let orders_iter = orders_iter.into_iter();
if orders_iter.size_hint().1 == Some(0) {
@ -198,26 +198,26 @@ pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApply
}
let collected = stream.collect()
.and_then(move |mut collected| {
.and_then(move |mut collected| {
for order in orders_iter {
match order {
Order::ByValueAsc => {
collected.sort_by(|a, b| a.1.cmp(&b.1));
collected.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(Ordering::Equal));
}
Order::ByValueDesc => {
collected.sort_by(|a, b| b.1.cmp(&a.1));
collected.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal));
}
Order::ByKeyAsc => {
collected.sort_by(|a, b| a.0.cmp(&b.0));
collected.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
}
Order::ByKeyDesc => {
collected.sort_by(|a, b| b.0.cmp(&a.0));
collected.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(Ordering::Equal));
}
}
}
Ok(iter_ok(collected.into_iter()))
})
.flatten_stream();
.flatten_stream();
NaiveApplyOrdered { inner: NaiveApplyOrderedInner::Collected(Box::new(collected)) }
}
@ -251,8 +251,8 @@ impl<'a, S, V> Stream for NaiveApplyOrdered<'a, S, V>
#[inline]
pub fn naive_apply_filters<'a, S, I, V>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I>
where S: Stream<Item = (String, V), Error = IoError>,
I: Iterator<Item = Filter<'a, V>> + Clone,
V: 'a
I: Iterator<Item = Filter<'a, V>> + Clone,
V: 'a
{
NaiveFiltersApply {
filters: filters,
@ -271,8 +271,8 @@ pub struct NaiveFiltersApply<'a, S, I> {
impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I>
where S: Stream<Item = (String, T), Error = IoError>,
I: Iterator<Item = Filter<'a, T>> + Clone,
T: Ord + 'a
I: Iterator<Item = Filter<'a, T>> + Clone,
T: PartialOrd + 'a
{
type Item = (String, T);
type Error = IoError;
@ -298,7 +298,7 @@ impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I>
#[inline]
fn naive_filter_test<T>(entry: &(String, T), filter: &Filter<T>) -> bool
where T: Ord
where T: PartialOrd
{
let (expected_ordering, revert_expected) = match filter.operation {
FilterOp::Equal => (Ordering::Equal, false),
@ -314,7 +314,7 @@ fn naive_filter_test<T>(entry: &(String, T), filter: &Filter<T>) -> bool
((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected
}
FilterTy::ValueCompare(ref ref_value) => {
(entry.1.cmp(&**ref_value) == expected_ordering) != revert_expected
(entry.1.partial_cmp(&**ref_value) == Some(expected_ordering)) != revert_expected
}
}
}