From b82f56ac3e3b922db038342c147cfbfa4f77ecdc Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 15 Nov 2017 17:25:06 +0100 Subject: [PATCH] Allow datastore to store values of arbitrary type --- datastore/Cargo.toml | 1 + datastore/src/json_file.rs | 99 ++++++++++++++++++++--------------- datastore/src/lib.rs | 11 ++-- datastore/src/query.rs | 102 ++++++++++++++++++++----------------- 4 files changed, 118 insertions(+), 95 deletions(-) diff --git a/datastore/Cargo.toml b/datastore/Cargo.toml index a65c2f8c..e2e7e1f7 100644 --- a/datastore/Cargo.toml +++ b/datastore/Cargo.toml @@ -7,5 +7,6 @@ authors = ["Parity Technologies "] base64 = "0.7" futures = "0.1" parking_lot = "0.4" +serde = "1.0" serde_json = "1.0" tempfile = "2.2" diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs index a6c1b2c5..1a2087e5 100644 --- a/datastore/src/json_file.rs +++ b/datastore/src/json_file.rs @@ -19,14 +19,15 @@ // DEALINGS IN THE SOFTWARE. use Datastore; -use base64; use futures::Future; use futures::stream::{Stream, iter_ok}; use query::{Query, naive_apply_query}; -use serde_json::{from_reader, to_writer}; -use serde_json::map::Map; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_json::{Map, from_value, to_value, from_reader, to_writer}; use serde_json::value::Value; use std::borrow::Cow; +use std::collections::HashMap; use std::fs; use std::io::Cursor; use std::io::Error as IoError; @@ -37,16 +38,20 @@ use parking_lot::Mutex; use tempfile::NamedTempFile; /// Implementation of `Datastore` that uses a single plain JSON file. -pub struct JsonFileDatastore { +pub struct JsonFileDatastore + where T: Serialize + DeserializeOwned +{ path: PathBuf, - content: Mutex>, + content: Mutex>, } -impl JsonFileDatastore { +impl JsonFileDatastore + where T: Serialize + DeserializeOwned +{ /// Opens or creates the datastore. If the path refers to an existing path, then this function /// will attempt to load an existing set of values from it (which can result in an error). /// Otherwise if the path doesn't exist, a new empty datastore will be created. - pub fn new

(path: P) -> Result + pub fn new

(path: P) -> Result, IoError> where P: Into, { @@ -55,7 +60,7 @@ impl JsonFileDatastore { if !path.exists() { return Ok(JsonFileDatastore { path: path, - content: Mutex::new(Map::new()), + content: Mutex::new(HashMap::new()), }); } @@ -70,18 +75,33 @@ impl JsonFileDatastore { let mut first_byte = [0]; if file.read(&mut first_byte)? == 0 { // File is empty. - Map::new() + HashMap::new() } else { match from_reader::<_, Value>(Cursor::new(first_byte).chain(file)) { - Ok(Value::Null) => Map::new(), - Ok(Value::Object(map)) => map, + Ok(Value::Null) => HashMap::new(), + Ok(Value::Object(map)) => { + let mut out = HashMap::with_capacity(map.len()); + for (key, value) in map.into_iter() { + let value = match from_value(value) { + Ok(v) => v, + Err(err) => return Err(IoError::new( + IoErrorKind::InvalidData, + err, + )), + }; + out.insert(key, value); + } + out + }, Ok(_) => { return Err(IoError::new( IoErrorKind::InvalidData, "expected JSON object", )); } - Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), + Err(err) => { + return Err(IoError::new(IoErrorKind::InvalidData, err)); + }, } } }; @@ -107,7 +127,8 @@ impl JsonFileDatastore { let mut temporary_file = NamedTempFile::new_in(self_path_parent)?; let content = self.content.lock(); - to_writer(&mut temporary_file, &*content)?; + to_writer(&mut temporary_file, + &content.iter().map(|(k, v)| (k.clone(), to_value(v).unwrap())).collect::>())?; // TODO: panic! temporary_file.sync_data()?; // Note that `persist` will fail if we try to persist across filesystems. However that @@ -118,19 +139,19 @@ impl JsonFileDatastore { } } -impl Datastore for JsonFileDatastore { - fn put(&self, key: Cow, value: Vec) { +impl Datastore for JsonFileDatastore + where T: Clone + Serialize + DeserializeOwned + Default + Ord + 'static +{ + #[inline] + fn put(&self, key: Cow, value: T) { let mut content = self.content.lock(); - content.insert(key.into_owned(), Value::String(base64::encode(&value))); + content.insert(key.into_owned(), value); } - fn get(&self, key: &str) -> Option> { + fn get(&self, key: &str) -> Option { let content = self.content.lock(); // If the JSON is malformed, we just ignore the value. - content.get(key).and_then(|val| match val { - &Value::String(ref s) => base64::decode(s).ok(), - _ => None, - }) + content.get(key).cloned() } fn has(&self, key: &str) -> bool { @@ -145,8 +166,8 @@ impl Datastore for JsonFileDatastore { fn query<'a>( &'a self, - query: Query, - ) -> Box), Error = IoError> + 'a> { + query: Query, + ) -> Box + 'a> { let content = self.content.lock(); let keys_only = query.keys_only; @@ -154,17 +175,9 @@ impl Datastore for JsonFileDatastore { let content_stream = iter_ok(content.iter().filter_map(|(key, value)| { // Skip values that are malformed. let value = if keys_only { - Vec::with_capacity(0) + Default::default() } else { - match value { - &Value::String(ref s) => { - match base64::decode(s) { - Ok(s) => s, - Err(_) => return None, - } - } - _ => return None, - } + value.clone() }; Some((key.clone(), value)) @@ -182,7 +195,9 @@ impl Datastore for JsonFileDatastore { } } -impl Drop for JsonFileDatastore { +impl Drop for JsonFileDatastore + where T: Serialize + DeserializeOwned +{ #[inline] fn drop(&mut self) { // Unfortunately there's not much we can do here in case of an error, as panicking would be @@ -206,7 +221,7 @@ mod tests { #[test] fn open_and_flush() { let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); datastore.flush().unwrap(); } @@ -214,13 +229,13 @@ mod tests { fn values_store_and_reload() { let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); datastore.put("foo".into(), vec![1, 2, 3]); datastore.put("bar".into(), vec![0, 255, 127]); datastore.flush().unwrap(); drop(datastore); - let reload = JsonFileDatastore::new(temp_file.path()).unwrap(); + let reload = JsonFileDatastore::>::new(temp_file.path()).unwrap(); assert_eq!(reload.get("bar").unwrap(), &[0, 255, 127]); assert_eq!(reload.get("foo").unwrap(), &[1, 2, 3]); } @@ -229,9 +244,9 @@ mod tests { fn query_basic() { let temp_file = NamedTempFile::new().unwrap(); - let datastore = JsonFileDatastore::new(temp_file.path()).unwrap(); - datastore.put("foo1".into(), vec![1, 2, 3]); - datastore.put("foo2".into(), vec![4, 5, 6]); + let datastore = JsonFileDatastore::>::new(temp_file.path()).unwrap(); + datastore.put("foo1".into(), vec![6, 7, 8]); + datastore.put("foo2".into(), vec![6, 7, 8]); datastore.put("foo3".into(), vec![7, 8, 9]); datastore.put("foo4".into(), vec![10, 11, 12]); datastore.put("foo5".into(), vec![13, 14, 15]); @@ -243,8 +258,8 @@ mod tests { prefix: "fo".into(), filters: vec![ Filter { - ty: FilterTy::ValueCompare(vec![6, 7, 8].into()), - operation: FilterOp::Greater, + ty: FilterTy::ValueCompare(&vec![6, 7, 8].into()), + operation: FilterOp::NotEqual, }, ], orders: vec![Order::ByKeyDesc], diff --git a/datastore/src/lib.rs b/datastore/src/lib.rs index 227c78fd..85281013 100644 --- a/datastore/src/lib.rs +++ b/datastore/src/lib.rs @@ -22,6 +22,7 @@ extern crate base64; #[macro_use] extern crate futures; extern crate parking_lot; +extern crate serde; extern crate serde_json; extern crate tempfile; @@ -36,13 +37,13 @@ pub use self::json_file::JsonFileDatastore; pub use self::query::{Query, Order, Filter, FilterTy, FilterOp}; /// Abstraction over any struct that can store `(key, value)` pairs. -pub trait Datastore { +pub trait Datastore { /// Sets the value of a key. - fn put(&self, key: Cow, value: Vec); + fn put(&self, key: Cow, value: T); /// Returns the value corresponding to this key. // TODO: use higher-kinded stuff once stable to provide a more generic "accessor" for the data - fn get(&self, key: &str) -> Option>; + fn get(&self, key: &str) -> Option; /// Returns true if the datastore contains the given key. fn has(&self, key: &str) -> bool; @@ -56,6 +57,6 @@ pub trait Datastore { /// responsibility to pick the right implementation for the right job. fn query<'a>( &'a self, - query: Query, - ) -> Box), Error = IoError> + 'a>; + query: Query, + ) -> Box + 'a>; } diff --git a/datastore/src/query.rs b/datastore/src/query.rs index 2f57dc32..7af87752 100644 --- a/datastore/src/query.rs +++ b/datastore/src/query.rs @@ -31,11 +31,11 @@ use std::vec::IntoIter as VecIntoIter; /// The various modifications of the dataset are applied in the same order as the fields (prefix, /// filters, orders, skip, limit). #[derive(Debug, Clone)] -pub struct Query<'a> { +pub struct Query<'a, T: 'a> { /// Only the keys that start with `prefix` will be returned. pub prefix: Cow<'a, str>, /// Filters to apply on the results. - pub filters: Vec>, + pub filters: Vec>, /// How to order the keys. Applied sequentially. pub orders: Vec, /// Number of elements to skip from at the start of the results. @@ -48,30 +48,29 @@ pub struct Query<'a> { /// A filter to apply to the results set. #[derive(Debug, Clone)] -pub struct Filter<'a> { +pub struct Filter<'a, T: 'a> { /// Type of filter and value to compare with. - pub ty: FilterTy<'a>, + pub ty: FilterTy<'a, T>, /// Comparison operation. pub operation: FilterOp, } /// Type of filter and value to compare with. #[derive(Debug, Clone)] -pub enum FilterTy<'a> { +pub enum FilterTy<'a, T: 'a> { /// Compare the key with a reference value. KeyCompare(Cow<'a, str>), /// Compare the value with a reference value. - ValueCompare(Cow<'a, [u8]>), + ValueCompare(&'a T), } -/// Filtering operation. Keep in mind that anything else than `Equal` and `NotEqual` is a bit -/// blurry. +/// Filtering operation. #[derive(Debug, Copy, Clone)] pub enum FilterOp { - Less, - LessOrEqual, Equal, NotEqual, + Less, + LessOrEqual, Greater, GreaterOrEqual, } @@ -90,9 +89,10 @@ pub enum Order { } /// Naively applies a query on a set of results. -pub fn naive_apply_query<'a, S>(stream: S, query: Query<'a>) - -> StreamTake, VecIntoIter>>>>>> - where S: Stream), Error = IoError> + 'a +pub fn naive_apply_query<'a, S, V>(stream: S, query: Query<'a, V>) + -> StreamTake, VecIntoIter>>, V>>>> + where S: Stream + 'a, + V: Clone + Ord + Default + 'static { let prefixed = naive_apply_prefix(stream, query.prefix); let filtered = naive_apply_filters(prefixed, query.filters.into_iter()); @@ -103,18 +103,18 @@ pub fn naive_apply_query<'a, S>(stream: S, query: Query<'a>) /// Skips the `skip` first element of a stream and only returns `limit` elements. #[inline] -pub fn naive_apply_skip_limit(stream: S, skip: u64, limit: u64) -> StreamTake> +pub fn naive_apply_skip_limit(stream: S, skip: u64, limit: u64) -> StreamTake> where - S: Stream), Error = IoError>, + S: Stream, { stream.skip(skip).take(limit) } /// Filters the result of a stream to empty values if `keys_only` is true. #[inline] -pub fn naive_apply_keys_only(stream: S, keys_only: bool) -> NaiveKeysOnlyApply +pub fn naive_apply_keys_only(stream: S, keys_only: bool) -> NaiveKeysOnlyApply where - S: Stream), Error = IoError>, + S: Stream, { NaiveKeysOnlyApply { keys_only: keys_only, @@ -129,18 +129,19 @@ pub struct NaiveKeysOnlyApply { stream: S, } -impl Stream for NaiveKeysOnlyApply +impl Stream for NaiveKeysOnlyApply where - S: Stream), Error = IoError>, + S: Stream, + T: Default { - type Item = (String, Vec); + type Item = (String, T); type Error = IoError; #[inline] fn poll(&mut self) -> Poll, Self::Error> { if self.keys_only { Ok(Async::Ready(try_ready!(self.stream.poll()).map(|mut v| { - v.1 = Vec::new(); + v.1 = Default::default(); v }))) } else { @@ -151,9 +152,9 @@ where /// Filters the result of a stream to only keep the results with a prefix. #[inline] -pub fn naive_apply_prefix<'a, S>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S> +pub fn naive_apply_prefix<'a, S, T>(stream: S, prefix: Cow<'a, str>) -> NaivePrefixApply<'a, S> where - S: Stream), Error = IoError>, + S: Stream, { NaivePrefixApply { prefix: prefix, @@ -168,11 +169,11 @@ pub struct NaivePrefixApply<'a, S> { stream: S, } -impl<'a, S> Stream for NaivePrefixApply<'a, S> +impl<'a, S, T> Stream for NaivePrefixApply<'a, S> where - S: Stream), Error = IoError>, + S: Stream, { - type Item = (String, Vec); + type Item = (String, T); type Error = IoError; #[inline] @@ -193,11 +194,12 @@ where /// Applies orderings on the stream data. Will simply pass data through if the list of orderings /// is empty. Otherwise will need to collect. -pub fn naive_apply_ordered<'a, S, I>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S> +pub fn naive_apply_ordered<'a, S, I, V>(stream: S, orders_iter: I) -> NaiveApplyOrdered<'a, S, V> where - S: Stream), Error = IoError> + 'a, + S: Stream + 'a, I: IntoIterator, I::IntoIter: 'a, + V: Ord + 'static, { let orders_iter = orders_iter.into_iter(); if orders_iter.size_hint().1 == Some(0) { @@ -231,20 +233,20 @@ where } /// Returned by `naive_apply_ordered`. -pub struct NaiveApplyOrdered<'a, S> { - inner: NaiveApplyOrderedInner<'a, S>, +pub struct NaiveApplyOrdered<'a, S, T> { + inner: NaiveApplyOrderedInner<'a, S, T>, } -enum NaiveApplyOrderedInner<'a, S> { +enum NaiveApplyOrderedInner<'a, S, T> { PassThrough(S), - Collected(Box), Error = IoError> + 'a>), + Collected(Box + 'a>), } -impl<'a, S> Stream for NaiveApplyOrdered<'a, S> +impl<'a, S, V> Stream for NaiveApplyOrdered<'a, S, V> where - S: Stream), Error = IoError>, + S: Stream, { - type Item = (String, Vec); + type Item = (String, V); type Error = IoError; #[inline] @@ -258,10 +260,11 @@ where /// Filters the result of a stream to apply a set of filters. #[inline] -pub fn naive_apply_filters<'a, S, I>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> +pub fn naive_apply_filters<'a, S, I, V>(stream: S, filters: I) -> NaiveFiltersApply<'a, S, I> where - S: Stream), Error = IoError>, - I: Iterator> + Clone, + S: Stream, + I: Iterator> + Clone, + V: 'a { NaiveFiltersApply { filters: filters, @@ -278,15 +281,16 @@ pub struct NaiveFiltersApply<'a, S, I> { marker: PhantomData<&'a ()>, } -impl<'a, S, I> Stream for NaiveFiltersApply<'a, S, I> +impl<'a, S, I, T> Stream for NaiveFiltersApply<'a, S, I> where S: Stream< - Item = (String, Vec), + Item = (String, T), Error = IoError, >, - I: Iterator> + Clone, + I: Iterator> + Clone, + T: Ord + 'a, { - type Item = (String, Vec); + type Item = (String, T); type Error = IoError; #[inline] @@ -309,14 +313,16 @@ where } #[inline] -fn naive_filter_test(entry: &(String, Vec), filter: &Filter) -> bool { +fn naive_filter_test(entry: &(String, T), filter: &Filter) -> bool + where T: Ord +{ let (expected_ordering, revert_expected) = match filter.operation { + FilterOp::Equal => (Ordering::Equal, false), + FilterOp::NotEqual => (Ordering::Equal, true), FilterOp::Less => (Ordering::Less, false), - FilterOp::LessOrEqual => (Ordering::Greater, true), - FilterOp::Equal => (Ordering::Less, false), - FilterOp::NotEqual => (Ordering::Less, true), - FilterOp::Greater => (Ordering::Greater, false), FilterOp::GreaterOrEqual => (Ordering::Less, true), + FilterOp::Greater => (Ordering::Greater, false), + FilterOp::LessOrEqual => (Ordering::Greater, true), }; match filter.ty { @@ -324,7 +330,7 @@ fn naive_filter_test(entry: &(String, Vec), filter: &Filter) -> bool { ((&*entry.0).cmp(&**ref_value) == expected_ordering) != revert_expected } FilterTy::ValueCompare(ref ref_value) => { - ((&*entry.1).cmp(&**ref_value) == expected_ordering) != revert_expected + (entry.1.cmp(&**ref_value) == expected_ordering) != revert_expected } } }